【问题标题】:How to process multiline log entry with logstash filter?如何使用 logstash 过滤器处理多行日志条目?
【发布时间】:2014-03-04 13:10:04
【问题描述】:

背景:

我有一个自定义生成的日志文件,它具有以下模式:

[2014-03-02 17:34:20] - 127.0.0.1|ERROR| E:\xampp\htdocs\test.php|123|subject|The error message goes here ; array (
  'create' => 
  array (
    'key1' => 'value1',
    'key2' => 'value2',
    'key3' => 'value3'
  ),
)
[2014-03-02 17:34:20] - 127.0.0.1|DEBUG| flush_multi_line

第二个条目[2014-03-02 17:34:20] - 127.0.0.1|DEBUG| flush_multi_line 是一个虚拟行,只是为了让logstash 知道多行事件已经结束,此行稍后会被删除。

我的配置文件如下:

input {
  stdin{}
}

filter{
  multiline{
      pattern => "^\["
      what => "previous"
      negate=> true
  }
  grok{
    match => ['message',"\[.+\] - %{IP:ip}\|%{LOGLEVEL:loglevel}"]
  }

  if [loglevel] == "DEBUG"{ # the event flush  line
    drop{}
  }else if [loglevel] == "ERROR"  { # the first line of multievent
    grok{
      match => ['message',".+\|.+\| %{PATH:file}\|%{NUMBER:line}\|%{WORD:tag}\|%{GREEDYDATA:content}"] 
    }
  }else{ # its a new line (from the multi line event)
    mutate{
      replace => ["content", "%{content} %{message}"] # Supposing each new line will override the message field
    }
  }  
}

output {
  stdout{ debug=>true }
}

content 字段的输出是:The error message goes here ; array (

问题:

我的问题是我想将多行的其余部分存储到内容字段:

The error message goes here ; array (
  'create' => 
  array (
    'key1' => 'value1',
    'key2' => 'value2',
    'key3' => 'value3'
  ),
)

所以我可以稍后删除消息字段。

@message 字段包含整个多行事件,因此我尝试了 mutate 过滤器,并使用了 replace 功能,但我'我只是无法让它工作:(。

我不了解 Multiline 过滤器的工作方式,如果有人能对此有所了解,将不胜感激。

谢谢,

阿卜杜。

【问题讨论】:

    标签: regex logstash multiline logstash-grok


    【解决方案1】:

    我浏览了源代码,发现:

    • 多行过滤器将取消所有被认为是待处理事件的后续事件,然后将该行附加到原始消息字段,这意味着任何在这种情况下,多行过滤器之后的过滤器将不适用
    • 唯一会通过过滤器的事件是被认为是新事件的事件(在我的情况下以 [ 开头)

    这是工作代码:

    input {
       stdin{}
    }  
    
    filter{
          if "|ERROR|" in [message]{ #if this is the 1st message in many lines message
          grok{
            match => ['message',"\[.+\] - %{IP:ip}\|%{LOGLEVEL:loglevel}\| %{PATH:file}\|%{NUMBER:line}\|%{WORD:tag}\|%{GREEDYDATA:content}"]
          }
    
          mutate {
            replace => [ "message", "%{content}" ] #replace the message field with the content field ( so it auto append later in it )
            remove_field => ["content"] # we no longer need this field
          }
        }
    
        multiline{ #Nothing will pass this filter unless it is a new event ( new [2014-03-02 1.... )
            pattern => "^\["
            what => "previous"
            negate=> true
        }
    
        if "|DEBUG| flush_multi_line" in [message]{
          drop{} # We don't need the dummy line so drop it
        }
    }
    
    output {
      stdout{ debug=>true }
    }
    

    干杯,

    阿卜杜

    【讨论】:

    • 否决了这个答案,但赞成你的问题。此信息可能曾经是正确的,但现在不再正确。 (至少对于 1.4.2 来说不是)考虑改为接受 @sbange 的答案 - 这是唯一对我有用的答案。
    【解决方案2】:

    本期提到了grok和多行处理https://logstash.jira.com/browse/LOGSTASH-509

    只需在您的 grok 正则表达式前添加“(?m)”,您就不需要突变。问题示例:

    pattern => "(?m)<%{POSINT:syslog_pri}>(?:%{SPACE})%{GREEDYDATA:message_remainder}"
    

    【讨论】:

    • 是的!当没有其他方法时,这对我有用。我认为您的pattern =&gt; 位应该改为grok{ match =&gt;。为了完整起见,请考虑编辑以包含@Thales Ceolin 的评论以及原始问题中的实际multiline 块。这样,人们就可以在此答案中获得一揽子解决方案。
    【解决方案3】:

    多行过滤器会将“\n”添加到消息中。例如:

    "[2014-03-02 17:34:20] - 127.0.0.1|ERROR| E:\\xampp\\htdocs\\test.php|123|subject|The error message goes here ; array (\n  'create' => \n  array (\n    'key1' => 'value1',\n    'key2' => 'value2',\n    'key3' => 'value3'\n  ),\n)"
    

    但是,grok 过滤器无法解析“\n”。因此,您需要将 \n 替换为另一个字符,例如空格。

    mutate {
        gsub => ['message', "\n", " "]
    }
    

    然后,grok 模式可以解析消息。例如:

     "content" => "The error message goes here ; array (   'create' =>    array (     'key1' => 'value1',     'key2' => 'value2',     'key3' => 'value3'   ), )"
    

    【讨论】:

    • 感谢您的回答 Ben,但是由于我在回答中所述的原因,您的代码将无法正常工作
    • 其实我已经使用了你的配置和日志,它对我有用!您需要在多行之后添加 gsub 过滤器
    • 您的代码确实有效,感谢您提供有关 grok 技巧的信息。但我宁愿在我的答案中使用代码,因为在附加消息之前我需要对消息进行更多控制和编辑。所以这就是我将标记为答案的那个,太糟糕了,我没有足够的代表来投票给你的答案:(感谢你的帮助,谢谢
    • 欢迎您。 :)。如果您有任何问题,请来这里询问。我已经为你投票了。你的回答太棒了!
    【解决方案4】:

    问题不只是过滤器的顺序。顺序对于记录存储非常重要。您不需要另一行来表明您已完成输出多行日志行。只需确保多行过滤器首先出现在 grok 之前(见下文)

    附:我已经成功地解析了一个多行日志行,其中 xml 被附加到日志行的末尾并且它跨越了多行,但我仍然在我的内容等效变量中得到了一个干净的 xml 对象(下面命名为 xmlrequest)。在你说任何关于在日志中记录 xml 的事情之前......我知道......它并不理想......但那是另一场辩论:)):

    filter { 
    multiline{
            pattern => "^\["
            what => "previous"
            negate=> true
        }
    
    mutate {
        gsub => ['message', "\n", " "]
    }
    
    mutate {
        gsub => ['message', "\r", " "]
    }
    
    grok{
            match => ['message',"\[%{WORD:ONE}\] \[%{WORD:TWO}\] \[%{WORD:THREE}\] %{GREEDYDATA:xmlrequest}"]
        }
    
    xml {
    source => xmlrequest
    remove_field => xmlrequest
    target => "request"
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多