【问题标题】:How to write grok pattern in logstash如何在logstash中编写grok模式
【发布时间】:2017-05-03 09:55:10
【问题描述】:

我正在尝试从 logstash 开始,我的应用程序具有以下类型的日志。这里 5 表示后面还有 5 行,这些行是为不同的相关事物收集的统计信息。

这些基本上是应用程序的统计数据,每一行表示一个资源。

有没有办法使用 logstash 正确解析它,以便它可以用于弹性搜索?

[20170502 01:57:26.209 EDT (thread-name) package-name.classname#MethodName INFO] Some info line (5 stats):
[fieldA: strvalue1| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue2| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue3| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue4| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue5| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]

编辑

这是我正在使用的配置,第一组统计数据被正确解析,但在该管道卡住之后。请注意有 150 个这样的日志,但如果我只保留 2-3 个,那么它工作正常。你能帮我找出这里的问题吗?

# [20170513 06:08:29.734 EDT (StatsCollector-1) deshaw.tools.jms.ActiveMQLoggingPlugin$ActiveMQDestinationStatsCollector#logPerDestinationStats INFO] ActiveMQ Destination Stats (97 destinations):
# [destName: topic://darts.metaDataChangeTopic | enqueueCount: 1 | dequeueCount: 1 | dispatchCount: 1 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 0 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 536870912 | avgEnqueueTimeMs: 0.0 | maxEnqueueTimeMs: 0 | minEnqueueTimeMs: 0 | currentConsumers: 1 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 2392 | maxMsgSize: 2392 | avgMsgSize: 2392.0 | totalMsgSize: 2392]

input {
  file {
    path => "/u/bansalp/activemq_primary_plugin.stats.log.1"
### For testing and continual process of the same file, remove these before produciton
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[destName:"
      negate => false
      what => "previous"
    }
  }
}

filter {
    if ([message] =~ /^\s*$/ ){
        drop{}
    }
    if ([message] =~ /^[^\[]/) {
            drop{}
    }

    if ([message] =~ /logMemoryInfo|logProcessInfo|logSystemInfo|logThreadBreakdown|logBrokerStats/) {
            drop{}
    }
    if [message] =~ "logPerDestinationStats" {
        grok {
                match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*"
                }
        }
        split { 
            field => "message"
        }
        grok {
                match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" }
        }
        mutate {
            convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
}
output {
  stdout {codec => rubydebug}
}

【问题讨论】:

    标签: elasticsearch logstash elastic-stack logstash-grok


    【解决方案1】:

    当然有。

    您需要做的是在输入过滤器上使用multiline codec

    按照示例:

    input {
      file {
        path => "/var/log/someapp.log"
        codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}"
          negate => true
          what => previous
        }
      }
    }
    

    这基本上表明任何不以 YYYYMMDD HH:mi:ss.000 开头的行都将与前一行合并

    您现在可以从那里将 Grok 模式应用到第一行(以获取高级数据)。

    一旦您对第一行所需的所有数据感到满意,您就可以在 \r 或 \n 上拆分并使用单个 grok 模式(基于您上面给出的示例)获取单独的统计数据。

    希望对你有帮助

    D

    2017-05-08 11:54 更新:

    完整的 logstash conf 可能看起来像这样,您需要考虑更改 grok 模式以更好地满足您的要求(只有您知道您的数据)。

    注意,这还没有经过测试,我把它留给你。

    input {
      file {
        path => "/var/log/someapp.log"
    ### For testing and continual process of the same file, remove these before produciton
        start_position => "beginning"
        sincedb_path => "/dev/null"
    ### Lets read the logfile and recombine multi line details
        codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}"
          negate => true
          what => previous
        }
      }
    }
    filter {
    ### Let's get some high level data before we split the line (note: anything you grab before the split gets copied)
        grok {
            match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]"
            }
        }
    ### Split the lines back out to being a single line now. (this may be a \r or \n, test which one)
        split { 
            "field" => "message"
            "terminator" => "\r" 
        }
    ### Ok, the lines should now be independent, lets add another grok here to get the patterns as dictated by your example [fieldA: str | field2: 0...] etc.
    ### Note: you should look to change the grok pattern to better suit your requirements, I used DATA here to quickly capture your content
        grok {
            break_on_match => false
            match => { "message" => "^\[%{DATA}:\s*%{DATA:fieldA}\|%{DATA}:\s*%{DATA:field2}\|%{DATA}:\s*%{DATA:field3}\|%{DATA}:\s*%{DATA:field4}\|%{DATA}:\s*%{DATA:field5}\|%{DATA}:\s*%{DATA:field6}\|%{DATA}:\s*%{DATA:field7}\]$" }
        }
        mutate {
        convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
    output {
      stdout { codec => rubydebug }
    }
    

    编辑 2017-05-15

    Logstash 是一个复杂的解析器,它希望作为一个进程保持并持续监控日志文件(因此您必须将其崩溃)

    匹配中断意味着您可能对同一行有多个匹配要求,如果找不到匹配项,它将尝试列表中的下一个(总是从复杂到简单)

    您的输入过滤器,将路径更改为以 .log* 结尾,此外,根据您的原始示例,模式是否不必与所需的日期格式匹配(以便将所有关联放在一行上)

    您的过滤器应该指定我认为的分割字符(否则我认为默认是逗号)。

    input {
      file {
        path => "/u/bansalp/activemq_primary_plugin.stats.log*"
    ### For testing and continual process of the same file, remove these before production
        start_position => "beginning"
        sincedb_path => "/dev/null"
    ### Lets read the logfile and recombine multi line details
        codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^\[destName:"
          negate => false
          what => "previous"
        }
      }
    }
    
    filter {
        if "logPerDestinationStats" in [message] {
            grok {
                    match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*"
                    }
            }
            split { 
                field => "message"
                terminator => "\r”
                }
            grok {
                    match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" }
            }
            mutate {
                convert => { "message" => "string" }
                add_field => {
                    "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                    "load_timestamp" => "%{@timestamp}"
                }
                remove_field => ["yr","mnt", "daynum", "time", "timezone"]
            }
        }
       else {
          drop{}
        }
    }
    

    请原谅我目前正在从手机更新此格式,我很高兴有人代替我更新格式。

    【讨论】:

    • 谢谢丹。我对 logstash 有点陌生,您提到“您现在可以将 Grok 模式应用于第一行”。我将如何做到这一点,模式不是我们指定模式以提取各种字段的模式吗?
    • 你现在需要做的是合并一个过滤器插件,(所以一个标准的logstash文件有一个输入、过滤器和输出插件),在过滤器插件部分你可以应用多个过滤器、grok、mutate 、xml、json、ruby 等。有关 grok 的帮助,请查看grokdebug.herokuapp.com
    • 我的文件大小为 50 MB,logstash 需要很长时间来解析它。请注意,我使用 if 语句来匹配不同日志行的不同 grk 模式,因为同一个日志文件有 3-4 种具有不同字段的统计日志。
    • 在过滤输出之后,它也会在屏幕上打印,但它会一直等待,我必须给 ctrl-C 才能杀死它。这可以避免吗,因为我将设置它以将事件发送到弹性搜索。
    • 还有没有办法从上次解析的位置开始解析文件?
    猜你喜欢
    • 2018-11-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-13
    • 1970-01-01
    • 2016-05-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多