当前位置:首页 > ES > 正文内容

logstash增量同步mysql到es配置详解

phpmianshi1年前 (2019-07-03)ES111

配置详解

input {
  jdbc {
    # mysql相关jdbc配置
    jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "123456"
 
    # jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
    jdbc_driver_library => "./config/mysql-connector-java-5.1.46.jar"
    # the name of the driver class for mysql
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => "50000"
 
    jdbc_default_timezone =>"Asia/Shanghai"
 
    # mysql文件, 也可以直接写SQL语句在此处,后面一定跟上排序,否则有可能遗漏数据或者一直处理同一批数据,如下:
     statement => "select * from user where update_time >= :sql_last_value order by update_time asc,id asc;"
     #statement_filepath => "./config/jdbc.sql"
 
    #启用追踪,则需要指定tracking_column,默认是timestamp()
    use_column_value => true
 
    # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
    tracking_column => "update_time"
    #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型()
    tracking_column_type => "timestamp"
    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    last_run_metadata_path => "./logstash_capital_last_id"
 
    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false
 
    #是否将 字段(column) 名称转小写
    lowercase_column_names => false
     # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
    schedule => "* * * * *"
    #如果配置多个数据源需要用type来区分
    type => "user"
  }
}
filter {}
output {
  #使用if语句判断type来指定输出的块()
  if[type]=="testdb"{
      elasticsearch {
        hosts => "localhost:9200"
        index => "user"
        document_id => "%{id}"
        template_overwrite => true
      }
  }
 
  # 这里输出调试,正式运行时可以注释掉
  stdout {
      codec => json_lines
  }
}


注意事项

后面一定跟上排序update_time asc,id asc,并开启分页,否则在update_time相等的数据过多时,有可能一直处理同一批数据,造成死循环,如下:

jdbc_paging_enabled => true
jdbc_page_size => "50000"
statement => "select * from user where update_time >= :sql_last_value order by update_time asc,id asc;"
版权声明:本文由PHP面试资料网发布,如需转载请注明出处。
分享给朋友:

相关文章

elasticsearch中最重要的查询过滤语句

term 过滤term 主要用于精确匹配哪些值,比如数字,日期,布尔值或 not_analyzed 的字符串(未经分析的文本数据类型){ "term": { "age&q...

logstash中同步mysql到elastic常见问题总结

1. mysql查询字段中有 type字段问题原因select语句中查到了type, 但es中会默认有一个type, 这使得两个type冲突.会导致同步失败,且没有报错GET my_inde...

logstash过滤器--mutate

概念filters/mutate 插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。Plugin version: v3.5.0执行顺...

使用logstash-input-jdbc同步mysql到es

数据同步方式全量同步与增量同步全量同步是指全部将数据同步到es,通常是刚建立es,第一次同步时使用。增量同步是指将后续的更新、插入记录同步到es。(删除记录没有办法同步,只能两边执行自己的删除命令)常...

elasticsearch中分析器的原理和使用

概念分析(analysis)机制用于进行全文文本(Full Text)的分词,以建立供搜索用的反向索引。原理分析器的工作过程大概分成两步:分词(Tokenization):根据停止词把文本分割成很多的...

elasticsearch用于html去标签化搜索

场景elasticsearch用于html去标签化搜索:即在Index的时候忽略html tag,同时又存储了完整的html,在使用的时候可以正常读出来。示例假设我们给content字段自定义anal...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。