我们知道elastisearch节点分五种类型:master(主节点)、data(数据存储,CRUD)、ingest(预处理)、coordinating(协调节点)、tribe(部落节点,用于跨集群)。有些时候我们需要对原始的日志做一些加工(比如格式转换、内容提取等),这个时候就需要使用pipeline,pipeline是在ingest节点上执行的。       

       在我们的集群中,一条日志可能可能被多个日志节点记录, 每个节点启动一个filebeat负责收集日志到elasticsearch。我们希望同一条日志在elasticsearch中只出现一次,同时我们希望通过日志的产生时间而不是收集时间来过滤日志。日志的唯一性,由_id决定;日志的产生时间写入到日志本身中。如果我们不做任何预处理,elasticsearch会自动生成_id, @timestamp会自动表示为日志收集时间。如果我们能直接从日志中提取_id 和timestamp就好了。比如有如下一条日志:

{"2019-12-04T11:24:45.406Z", "12422271230000354151", '[email protected]', "<7421.6089.0>", cluster_event, cluster_event, handle_event, handle_event, [ {event, stop},{msg, ""}]}.
 

一开始打算自己写正则匹配,一是正则表达式比较复制,二是自己定义的格式不能被pipeline所识别。好在有grok可以直接使用,一番调研发现基本满足我们的要求。pipeline的定义:

PUT _ingest/pipeline/info-pipeline
{
  "description" : "describe pipeline",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": ["{\"%{TIMESTAMP_ISO8601:@timestamp}\", %{QUOTEDSTRING:_id}, '%{EMAILLOCALPART:slave}@%{IP:slave_ip}', %{QUOTEDSTRING:pid}, %{WORD:mod}"]
      }
    }
  ]
}

 filebeat.yml 配置

filebeat+elasticsearch日志内容提取

同一个filebeat可以收集不同格式的日志,不同日志的格式可以适配不同的pipeline。注意原始日志的时间格式,如果是grok不能匹配的格式,会导致在kibana的logs菜单中无法通过时间轴查看。

ps: 本人的验证环境:centos7+elasticsearch6.8+ filebeat6.8

相关文章:

  • 2022-12-23
  • 2021-11-06
  • 2021-09-23
  • 2021-09-04
  • 2022-12-23
  • 2021-06-15
  • 2022-12-23
  • 2021-05-28
猜你喜欢
  • 2022-12-23
  • 2021-07-19
  • 2021-08-07
  • 2022-02-11
  • 2021-07-22
  • 2022-02-27
  • 2021-06-25
相关资源
相似解决方案