【问题标题】:how to use elapsed filter- logstash如何使用经过的过滤器-logstash
【发布时间】:2019-05-06 10:27:29
【问题描述】:

我正在使用 Elapsed 过滤器。我在 logstash 中阅读了 Elapsed filter 的指南。然后我制作了一个示例配置文件和 csv 来测试 Elapsed 过滤器的工作。但它似乎不起作用。将数据上传到 ES 没有变化。我附上了 csv 文件和配置代码。你能举一些例子来说明如何使用 elapsed 过滤器吗?

这是我的 csv 数据:

这是我的配置文件:

input {
     file {
      path => "/home/paulsteven/log_cars/aggreagate.csv"
      start_position => "beginning"
      sincedb_path => "/dev/null"
   }
}
filter {
    csv {
        separator => ","
        quote_char => "%"
        columns => ["state","city","haps","ads","num_id","serial"]
    }
    elapsed {
        start_tag => "taskStarted"
        end_tag => "taskEnded"
        unique_id_field => "num_id"
    }

}
output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "el03"
    document_type => "details"
  }
  stdout{}
}

ES 中的输出:

{
          "city" => "tirunelveli",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "tamil nadu,tirunelveli,hap0,ad1,2345-1002-4501,1",
      "@version" => "1",
        "serial" => "1",
          "haps" => "hap0",
         "state" => "tamil nadu",
          "host" => "smackcoders",
           "ads" => "ad1",
    "@timestamp" => 2019-05-06T10:03:51.443Z
}
{
          "city" => "chennai",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "tamil nadu,chennai,hap0,ad1,2345-1002-4501,5",
      "@version" => "1",
        "serial" => "5",
          "haps" => "hap0",
         "state" => "tamil nadu",
          "host" => "smackcoders",
           "ads" => "ad1",
    "@timestamp" => 2019-05-06T10:03:51.447Z
}
{
          "city" => "kottayam",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "kerala,kottayam,hap1,ad2,2345-1002-4501,9",
      "@version" => "1",
        "serial" => "9",
          "haps" => "hap1",
         "state" => "kerala",
          "host" => "smackcoders",
           "ads" => "ad2",
    "@timestamp" => 2019-05-06T10:03:51.449Z
}
{
          "city" => "Jalna",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "mumbai,Jalna,hap2,ad3,2345-1002-4501,13",
      "@version" => "1",
        "serial" => "13",
          "haps" => "hap2",
         "state" => "mumbai",
          "host" => "smackcoders",
           "ads" => "ad3",
    "@timestamp" => 2019-05-06T10:03:51.452Z
}

【问题讨论】:

    标签: logstash kibana elastic-stack logstash-configuration logstash-file


    【解决方案1】:

    您必须标记您的事件,以便 Logstash 可以找到开始/结束标签。 基本上,您必须知道何时将事件视为开始事件以及何时将其视为结束事件。

    Elapsed 过滤器插件仅适用于两个事件(例如请求事件和响应事件,以获取它们之间的延迟) 这两种事件都需要拥有一个唯一标识该特定任务的 ID 字段。该字段的名称存储在 unique_id_field 中。

    对于您的示例,您必须确定开始和结束事件的模式,假设您在 csv 中有一个列 type(请参阅下面的代码)当 type 包含“START”,该行被认为是开始事件,如果它包含“END”,则它是一个结束事件,非常简单,并且是一个存储唯一标识符的列 id

    filter {
      csv {
        separator => ","
        quote_char => "%"
        columns => ["state","city","haps","ads","num_id","serial", "type", "id"]
        }
      grok {
        match => { "type" => ".*START.*" }
        add_tag => [ "taskStarted" ]
      }grok {
      match => { "type" => ".*END*" }
      add_tag => [ "taskTerminated" ]
    }  elapsed {
        start_tag => "taskStarted"
        end_tag => "taskTerminated"
        unique_id_field => "id"
      }
    }
    

    我觉得您的需求有所不同。 如果要聚合两个以上的事件,例如所有列状态值相同的事件,请查看this plugin

    【讨论】:

    • 感谢您的回答。根据您的回答,它需要 csv 文件中的响应和请求事件。是不是计算这两个事件,并将结果赋给唯一的id字段???
    • 当接收到与先前收集的“开始事件”匹配的“结束事件”时,存在匹配。配置属性 new_event_on_match 告诉在哪里插入经过的信息:可以将它们添加到“结束事件”或可以创建新的“匹配事件”。该插件有据可查:elastic.co/guide/en/logstash/current/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-08
    相关资源
    最近更新 更多