【问题标题】:How to use aggregate filter in logstash config file?如何在 logstash 配置文件中使用聚合过滤器?
【发布时间】:2019-11-01 05:50:00
【问题描述】:

我正在尝试在 logstash 配置文件中使用aggregate 过滤器来组合来自两个 sql 表的结果,但无法找出问题所在。

我当前的 logstash 配置文件如下所示:

input {
    jdbc {

        jdbc_connection_string => "jdbc:postgresql://localhost:5432/school"
        jdbc_user => "postgres"
        jdbc_password => "postgres"
        jdbc_driver_library => "/Users/karangupta/Downloads/postgresql-42.2.8.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_paging_enabled => true
        statement => "select 
                          s.id as "sch_id", 
                          s.udise_sch_code as "sch_code",
                          tch.teacher_id as "tch_id", 
                          tch.name as "tch_name",
                          tch.social_category as "social_cat" 
                      from mst_school s 
                      inner join teacher_profile tch on s.id = tch.id limit 100 " 
        }

filter {
    aggregate {
        task_id => "%{sch_id}"
        code => “
        map[sch_id] = event.get(sch_id) 
        map[sch_code] = event.get(sch_code) 
        map[‘teachers’] ||= []
        map[‘teachers’] << {‘tch_id’ => event.get(‘tch_id’),’tch_name’ => event.get(‘tch_name’),’social_cat’ => event.get(‘social_cat’)}
        event.cancel()
        "
    push_previous_map_as_event => true
    timeout => 30

    }
}


output {
     stdout { codec => json_lines }
 }

#output {
#   elasticsearch {
#       index => "detfac"
#       hosts => "http://localhost:9200"
#           }
#       }

这是我得到的错误:

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, {, } at line 11, column 36 (byte 409) after input {\n    jdbc {\n       \n        jdbc_connection_string => \"jdbc:postgresql://localhost:5432/school\"\n        jdbc_user => \"postgres\"\n        jdbc_password => \"postgres\"\n        jdbc_driver_library => \"/Users/karangupta/Downloads/postgresql-42.2.8.jar\"\n        jdbc_driver_class => \"org.postgresql.Driver\"\n        jdbc_paging_enabled => true\n        statement => \"select \n                          s.id as \"", :backtrace=>["/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:24:in `initialize'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/agent.rb:325:in `block in converge_state'"]}

还有其他方法可以做到这一点吗?或对此有何帮助?

【问题讨论】:

    标签: postgresql logstash


    【解决方案1】:

    查看您的错误消息:“Expected one of #, {, } at line 11, column 36 (byte 409) after input...”。如果您在配置文件中查看这些坐标,您会注意到您使用的引号会干扰 statement 属性的引号。尝试删除语句中的所有引号,或转义它们(参见Logstash documentation 上的操作方法)。

    【讨论】:

      【解决方案2】:

      试试下面的代码,

      input {
      jdbc {
      
          jdbc_connection_string => "jdbc:postgresql://localhost:5432/school"
          jdbc_user => "postgres"
          jdbc_password => "postgres"
          jdbc_driver_library => "/Users/karangupta/Downloads/postgresql-42.2.8.jar"
          jdbc_driver_class => "org.postgresql.Driver"
          jdbc_paging_enabled => true
          statement => "select 
                            s.id as sch_id, 
                            s.udise_sch_code as sch_code,
                            tch.teacher_id as tch_id, 
                            tch.name as tch_name,
                            tch.social_category as social_cat 
                        from mst_school s 
                        inner join teacher_profile tch on s.id = tch.id limit 100 " 
          }
      filter {
      aggregate {
          task_id => "%{sch_id}"
          code => "
          map[sch_id] = event.get(sch_id) 
          map[sch_code] = event.get(sch_code) 
          map['teachers'] ||= []
          map['teachers'] << {'tch_id' => event.get('tch_id'),'tch_name' => event.get('tch_name'),'social_cat' => event.get('social_cat')}
          event.cancel()
          "
      push_previous_map_as_event => true
      timeout => 30
      
      } }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-12-13
        • 2014-08-24
        • 1970-01-01
        • 1970-01-01
        • 2023-03-27
        • 2020-03-23
        相关资源
        最近更新 更多