【问题标题】:Hierarchical data matching and displaying分层数据匹配和显示
【发布时间】:2016-03-23 21:02:15
【问题描述】:

在我的日志文件中,我有代表项目层次结构的数据,就像 http 日志文件可能显示网站的层次结构一样。

我可能有这样的数据

41 2016-01-01 01:41:32-500 show:category:all
41 2016-01-01 04:11:20-500 show:category:animals
42 2016-01-02 01:41:32-500 show:item:wallaby
42 2016-01-02 01:41:32-500 show:home

我会在这里有 3 件物品...%{NUMBER:terminal} %{TIMESTAMP_ISO8601:ts}(?<info>([^\r])*)

我使用mutatesplitinfo 数据解析为一个数组,将lvl1:lvl2:lvl3 转换为['lvl1','lvl2','lvl3']

我有兴趣聚合数据以轻松获得各个级别的计数,例如计算info[0] 相同或info[0]info[1] 相同的所有记录。 (并且可以选择时间范围和终端)

有没有办法设置 kibana 来可视化这类信息? 或者我应该更改过滤器匹配数据的方式以使数据更易于访问? 级别的深度各不相同,但我可以确定最大级别为 5,因此我可以将文本解析为各个字段 lvl1 lvl2 lvl3 lvl4 lvl5 而不是将它们放入数组中。

【问题讨论】:

    标签: elasticsearch logstash kibana elastic-stack


    【解决方案1】:

    根据您的问题,我同意您解析数据的方式。但我想添加更多内容,使其可以使用 Kibana 直接聚合和可视化。

    方法应该是:-

    • 使用 %{NUMBER:terminal} %{TIMESTAMP_ISO8601:ts} 和 (?([^\r])*) {根据您提供的信息}
    • 过滤数据
    • 变异
    • 过滤器

    然后在使用 mutate & filter 之后,您将获得数组形式的数据{正如您所提到的}

    • 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][0]}" ]​​i> 将字段添加为级别 1
    • 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][1]}" ]​​i> 将字段添加为级别 2
    • 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][2]}" ]​​i> 将字段添加为级别 3

    那么你可以直接使用 Kibana 来可视化这些信息。

    【讨论】:

    • 这就是我最终所做的。我还为每个数组字段 if [info_1] =~ /%\{info\[0\]\}/ {mutate {remove_field => [ "info_1" ]}} 添加了另一个过滤器,以删除为空的字段
    • 酷。如果您可以将完整的 logstash 配置文件分享给正在寻找此类解决方案的其他人,那就太好了。
    【解决方案2】:

    我的解决方案

    input {
      file {
          path => "C:/Temp/zipped/*.txt"
          start_position => beginning 
          ignore_older => 0 
          sincedb_path => "C:/temp/logstash_temp2.sincedb"
      }
    }
    
    filter {
      grok {
        match => ["message","^%{NOTSPACE}\[%{NUMBER:terminal_id}\] %{NUMBER:log_level} %{NUMBER} %{TIMESTAMP_ISO8601:ts} \[(?<facility>([^\]]*))\] (?<lvl>([^$|\r])*)"]
      }
      mutate {
           split => ["lvl", ":"]
           add_field => {"lvl_1" => "%{lvl[0]}"}
           add_field => {"lvl_2" => "%{lvl[1]}"}
           add_field => {"lvl_3" => "%{lvl[2]}"}
           add_field => {"lvl_4" => "%{lvl[3]}"}
           add_field => {"lvl_5" => "%{lvl[4]}"}
           add_field => {"lvl_6" => "%{lvl[5]}"}
           add_field => {"lvl_7" => "%{lvl[6]}"}
           add_field => {"lvl_8" => "%{lvl[7]}"}
           lowercase => [ "terminal_id" ] # set to lowercase so that it can be used for index - additional filtering may be required
      }
      date {
         match => ["ts", "YYYY-MM-DD HH:mm:ssZZ"]
      }
    }
    filter {
      if [lvl_1] =~ /%\{lvl\[0\]\}/ {mutate {remove_field => [ "lvl_1" ]}}
      if [lvl_2] =~ /%\{lvl\[1\]\}/ {mutate {remove_field => [ "lvl_2" ]}}
      if [lvl_3] =~ /%\{lvl\[2\]\}/ {mutate {remove_field => [ "lvl_3" ]}}
      if [lvl_4] =~ /%\{lvl\[3\]\}/ {mutate {remove_field => [ "lvl_4" ]}}
      if [lvl_5] =~ /%\{lvl\[4\]\}/ {mutate {remove_field => [ "lvl_5" ]}}
      if [lvl_6] =~ /%\{lvl\[5\]\}/ {mutate {remove_field => [ "lvl_6" ]}}
      if [lvl_7] =~ /%\{lvl\[6\]\}/ {mutate {remove_field => [ "lvl_7" ]}}
      if [lvl_8] =~ /%\{lvl\[7\]\}/ {mutate {remove_field => [ "lvl_8" ]}}
      mutate{
        remove_field => [ "lvl","host","ts" ] # do not keep this data
      }
    }
    
    
    output {
      if [facility] == "mydata" {
        elasticsearch {
          hosts => ["localhost:9200"]
          index => "logstash-mydata-%{terminal_id}-%{+YYYY.MM.DD}"
        }
      } else {
        elasticsearch {
          hosts => ["localhost:9200"]
          index => "logstash-other-%{terminal_id}-%{+YYYY.MM.DD}"
        }
      }
      # stdout { codec => rubydebug }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-10-11
      • 2012-12-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多