首页 > ELK > logstash多配置文件的使用和常见问题
2019
08-24

logstash多配置文件的使用和常见问题

背景

假设现在给Logstash的pipeline配置了2个conf,也就是2个输入源。如果不做任何处理,那么所有的Filter和Output都会同时触发,如下图:

图片.png

这显然跟我们期望的不同,我们希望Logstash按以下的方式来处理,也就是各自区分,独立处理:

图片.png


解决方案

官方提供有 type 和tags 配置项进行区分,type 和 tags 是 logstash 事件中两个特殊的字段。

通常来说我们会在输入区段中通过 type 来标记事件类型 —— 我们肯定是提前能知道这个事件属于什么类型的。而tags 则是在数据处理过程中,由具体的插件来添加或者删除的。

如下所示:

input {
        jdbc {
                jdbc_connection_string => "jdbc:mysql://localhost:3306/igp?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"
                        jdbc_user => "es_igper"
                        jdbc_password => "password"
                        jdbc_driver_library => "/usr/local/logstash/tools/mysql-connector-java-8.0.23.jar"
                        jdbc_driver_class => "com.mysql.jdbc.Driver"
                        jdbc_paging_enabled => "true"
                        jdbc_page_size => "10000"
                        plugin_timezone => "local"

                        clean_run => false
                        use_column_value => true
                        tracking_column => update_time
                        tracking_column_type => "timestamp"
                        record_last_run => true
                        last_run_metadata_path => "/usr/local/logstash/config/igp/run/live_v4_run"

                        statement => "select live.* from live where live.update_time>:sql_last_value order by live.update_time asc,live.id asc"

                        schedule => "* * * * *"

                        type => "live"
        }
}
filter {
        if [type]=="live" {
                mutate {
                        remove_field => ["username","passwd","wx_openid"]
                }
        }
}
output {
        if [type]=="live" {
                elasticsearch {
                        hosts => ["localhost:9200"]
                                index => "live_v4"
                                document_id => "%{id}"
                                user => "elastic"
                                password => "password"
                }
                stdout {
                        codec => json_lines
                }
        }
}


以上是通过type区分,传入的数据中带有type字段/tags字段,会覆盖logstash配置文件设置的type属性值,这是个大坑,注意

input {
        jdbc {
                jdbc_connection_string => "jdbc:mysql://localhost:3306/igp?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"
                        jdbc_user => "es_igper"
                        jdbc_password => "password"
                        jdbc_driver_library => "/usr/local/logstash/tools/mysql-connector-java-8.0.23.jar"
                        jdbc_driver_class => "com.mysql.jdbc.Driver"
                        jdbc_paging_enabled => "true"
                        jdbc_page_size => "10000"
                        plugin_timezone => "local"

                        clean_run => false
                        use_column_value => true
                        tracking_column => update_time
                        tracking_column_type => "timestamp"
                        record_last_run => true
                        last_run_metadata_path => "/usr/local/logstash/config/igp/run/live_v4_run"

                        statement => "select live.* from live where live.update_time>:sql_last_value order by live.update_time asc,live.id asc"

                        schedule => "* * * * *"

                        tags=> ["live"]
        }
}
filter {
        if "live" in [tags]{
                mutate {
                        remove_field => ["username","passwd","wx_openid"]
                }
        }
}
output {
       if "live" in [tags]{
                elasticsearch {
                        hosts => ["localhost:9200"]
                                index => "live_v4"
                                document_id => "%{id}"
                                user => "elastic"
                                password => "password"
                }
                stdout {
                        codec => json_lines
                }
        }
         if "user" in [tags]{
                elasticsearch {
                        hosts => ["localhost:9200"]
                                index => "user_v4"
                                document_id => "%{id}"
                                user => "elastic"
                                password => "password"
                }
                stdout {
                        codec => json_lines
                }
        }
}


通过以上方式配置,就能将pipeline各自区分开。大家可以试下,如果把type参数去掉,最后的elasticsearch数据会混或者数据重复等奇怪问题


原因

logstash启动后会把多个配置文件自动合并成一个,比如有两个conf文件,每个都有 filter、ouput,写入一条数据,会执行两次filter和output,重复写入



本文》有 0 条评论

留下一个回复