【问题标题】:Kibana - How to extract fields from existing Kubernetes logsKibana - 如何从现有 Kubernetes 日志中提取字段
【发布时间】:2017-02-20 01:12:14
【问题描述】:

我有一种 ELK 堆栈,使用 fluentd 而不是 logstash,在 Kubernetes 集群上作为 DaemonSet 运行,并将所有容器中的所有日志以 logstash 格式发送到 Elasticsearch 服务器。

在 Kubernetes 集群上运行的众多容器中,有一些是 nginx 容器,它们输出以下格式的日志:

121.29.251.188 - [16/Feb/2017:09:31:35 +0000] host="subdomain.site.com" req="GET /data/schedule/update?date=2017-03-01&type=monthly&blocked=0 HTTP/1.1" status=200 body_bytes=4433 referer="https://subdomain.site.com/schedule/2589959/edit?location=23092&return=monthly" user_agent="Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0" time=0.130 hostname=webapp-3188232752-ly36o

在 Kibana 中可见的字段如下图所示:

这种类型的日志被索引后是否可以提取字段?

fluentd 收集器配置了以下源,它处理所有容器,因此由于来自不同容器的输出非常不同,因此无法在此阶段强制执行格式:

<source>
  type tail
  path /var/log/containers/*.log
  pos_file /var/log/es-containers.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  tag kubernetes.*
  format json
  read_from_head true
</source>

在理想情况下,我想用“log”字段中的元字段来丰富上面屏幕截图中可见的字段,例如“host”、“req”、“status”等。

【问题讨论】:

    标签: logging kubernetes kibana elastic-stack fluentd


    【解决方案1】:

    经过几天的研究并习惯了EFK stack,我得出了一个 EFK 特定的解决方案,而不是 Darth_Vader 的答案,这只能在 ELK 堆栈上实现。

    总而言之,我使用的是 Fluentd 而不是 Logstash,所以如果您还安装了 Fluentd Grok Plugin,任何 grok 解决方案都可以工作,我决定不这样做,因为:

    事实证明,Fluentd 通过使用parser filters 拥有自己的字段提取功能。为了解决我的问题,就在 &lt;match **&gt; 行之前,因此在日志行对象已经丰富了 kubernetes 元数据字段和标签之后,我添加了以下内容:

    <filter kubernetes.var.log.containers.webapp-**.log>
      type parser
      key_name log
      reserve_data yes
      format /^(?<ip>[^-]*) - \[(?<datetime>[^\]]*)\] host="(?<hostname>[^"]*)" req="(?<method>[^ ]*) (?<uri>[^ ]*) (?<http_version>[^"]*)" status=(?<status_code>[^ ]*) body_bytes=(?<body_bytes>[^ ]*) referer="(?<referer>[^"]*)" user_agent="(?<user_agent>[^"]*)" time=(?<req_time>[^ ]*)/
    </filter>
    

    解释一下:

    &lt;filter kubernetes.var.log.containers.webapp-**.log&gt; - 将块应用于与该标签匹配的所有行;在我的例子中,Web 服务器组件的容器被称为 webapp-{something}

    type parser - 告诉 fluentd 应用解析器过滤器

    key_name log - 仅在日志行的log 属性上应用模式,而不是整行,这是一个 json 字符串

    reserve_data yes - 非常重要,如果未指定,整个日志行对象将仅替换为从 format 提取的属性,因此如果您已经有其他属性,例如由 kubernetes_metadata 过滤器添加的属性,这些不添加reserve_data选项时会被移除

    format - 一个正则表达式,应用于log 键的值以提取命名属性

    请注意,我使用的是 Fluentd 1.12,因此此语法与较新的 1.14 语法不完全兼容,但只要对解析器声明稍作调整即可使用该原理。

    【讨论】:

    • 我们如何使用 fluent-bit 做同样的事情?
    • @bedeabza 你介意分享你的配置图以及它是如何设置的吗?我不得不稍微更新一下,并使用 Fluentular 使过滤器正确,并且能够验证它,但它没有将过滤器应用于我们的日志。谢谢
    • 我也可以使用elasticsearch摄取节点
    【解决方案2】:

    为了将日志行提取到字段中,您可能必须使用grok 过滤器。您可以做的是拥有一个 regex 模式,以匹配您需要的日志行的确切部分。 Grok 过滤器可能如下所示:

    grok {
        patterns_dir => ["pathto/patterns"]
        match => { "message" => "^%{LOGTIMESTAMP:logtimestamp}%{GREEDYDATA:data}" }         
    }                                                 ^-----------------------^ are the fields you would see in ES when log is being indexed
    

    ----------------------------------------------- -----^ LOGTIMESTAMP 应该在你的模式文件中定义如下:

    LOGTIMESTAMP %{YEAR}%{MONTHNUM}%{MONTHDAY} %{TIME}
    

    一旦您有了匹配的字段,您就可以简单地将它们用于filtering 目的,或者您仍然可以保持原样,如果主要原因是它从日志行中提取字段。

    if "something" in [message]{
         mutate {
             add_field => { "new_field" => %{logtimestamp} }
         }          
    }
    

    以上只是一个示例,您可以根据需要对其进行复制。您可以使用this 工具来测试您的模式以及您想要匹配的字符串!

    Blog post,可能会很方便!希望这会有所帮助。

    【讨论】:

    • 感谢@Darth_vader 的回答。我知道这可以通过logstash来完成,但我使用fluentd进行日志收集,请参阅我的问题中的配置。您知道开箱即用的 fluentd 是如何做到这一点的吗?
    猜你喜欢
    • 2019-04-06
    • 1970-01-01
    • 1970-01-01
    • 2021-01-13
    • 1970-01-01
    • 1970-01-01
    • 2020-01-09
    • 2021-01-14
    • 2020-06-16
    相关资源
    最近更新 更多