【问题标题】:Combining log entries with logstash将日志条目与 logstash 相结合
【发布时间】:2016-04-01 15:53:58
【问题描述】:

我想从 dnsmasq 收集和处理日志,我决定使用 ELK。 Dnsmasq 用作 DHCP 服务器和 DNS 解析器,因此它为这两种服务创建日志条目。

我的目标是向 Elasticsearch 发送带有请求者 IP、请求者主机名(如果可用)和请求者 mac 地址的所有 DNS 查询。这将允许我将每个 MAC 地址的请求分组,无论设备 IP 是否更改,并显示主机名。

我想做的是:

1) 阅读以下条目:

Mar 30 21:55:34 dnsmasq-dhcp[346]: 3806132383 DHCPACK(eth0)  192.168.0.80 04:0c:ce:d1:af:18 air

2) 暂存关系:

192.168.0.80 => 04:0c:ce:d1:af:18

192.168.0.80 => 空气

3) 丰富如下条目,添加mac地址和主机名。如果主机名是空的,我会添加mac地址。

Mar 30 22:13:05 dnsmasq[346]: query[A] imap.gmail.com from 192.168.0.80

我找到了一个名为 “memorize” 的模块,可以让我存储它们,但不幸的是,它不适用于最新版本的 Logstash

我正在使用的版本:

ElastiSearch 2.3.0
Kibana 4.4.2
Logstash 2.2.2

还有 logstash 过滤器(这是我第一次尝试使用 logstash,因此我确信配置文件可以改进)

input {
  file {
    path => "/var/log/dnsmasq.log"
    start_position => "beginning"
    type => "dnsmasq"
  }
}  

filter {
  if [type] == "dnsmasq" {
    grok {
      match =>  [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: ?(%{NONNEGINT:num} )?%{NOTSPACE:action} %{IP:clientip} %{MAC:clientmac} ?(%{HOSTNAME:clientname})?"]
      match =>  [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: ?(%{NONNEGINT:num} )?%{USER:action}?(\[%{USER:subaction}\])? %{NOTSPACE:domain} %{NOTSPACE:function} %{IP:clientip}"]
      match =>  [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: %{NOTSPACE:action} %{DATA:data}"]
    }

    if [action] =~ "DHCPACK" {

    }else if [action] == "query" {

    }else
    {
      drop{}
    }
  }
}
output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

问题:

1) 插件“memorize” 是否可以使用最新的logstash 版本?另一个插件或不同的程序。

2)我应该将logstash降级到2之前的版本(我认为之前是1.5.4)?如果是这样,是否有任何已知的服务器问题或与 elasticsearch 2.2.1 不兼容?

3) 或者我应该修改允许logstash 2.x 的“记忆”插件(如果是这样,我会感谢任何关于如何开始的指针)?

【问题讨论】:

  • 看起来记忆过滤器只保留事件的最后一个值。这对你有用吗?
  • 记忆会很完美,但不适用于logstash 2.x
  • 通过改变依赖号可以安装memorize。或者直接从:https://github.com/jofrep/logstash-filter-memorize 安装它。我试过了,它似乎有效。
  • 这是我的尝试。是我的回购。但我没有安装它。很高兴知道你是怎么做到的
  • 有点有趣 :) 我用我让它工作的方式来回答它。希望有所帮助。

标签: elasticsearch logstash elastic-stack logstash-configuration


【解决方案1】:

在我看来,没有必要为此重新打包memorize 插件。您可以使用aggregate filter 来实现您想要的。

...

# record host/mac in temporary map
if [action] =~ "DHCPACK" {
  aggregate {
     task_id => "%{clientip}"
     code => "map['clientmac'] = event['clientmac']; map['clientname'] = event['clientname'];"
     map_action => "create_or_update"
     # timeout set to 48h
     timeout => 172800
  }
}

# add host/mac where/when needed
else if [action] == "query" {
   aggregate {
     task_id => "%{clientip}"
     code => "event['clientmac'] = map['clientmac']; event['clientname'] = map['clientname']"
     map_action => "update"
   }
}

【讨论】:

  • 我使用了“聚合”插件并进行了一些修改以解决拼写错误并确保映射是永久性的。尽管如此,尽管我将超时设置为 0 并使用单个线程,但映射通常会丢失,并且日志条目不会填充映射值(直到看到新的 DHCPACK)。有什么建议吗?
  • 抱歉有错别字。您可以尝试添加aggregate_maps_path => /path/to/maps 并监视该文件以查看映射随着时间的推移的样子吗?
  • 查看结果,我发现映射在 DHCPACK 后持续了 30 分钟。在某些地方,它说地图在 1800 秒(30 分钟)后被删除,在其他地方它说默认情况下这些地图永远不会被删除。无论如何,我将其设置为“0”(从不删除)。
  • 所以使用timeout=>0,地图永远不会被驱逐,对吧?您可以使用--debug 启动logstash,以便我们更深入地了解logstash 正在做什么?
  • 我无法使用 aggregate_maps_path 因为它失败了。我创建了一个 kibana 报告,查看 DHCP 调用和变量 clientname,并在我看来 timeout=>0 被忽略了。 30 分钟后,不再填充该变量。我会检查--debug
【解决方案2】:

所以要在logstash >2.0 中使用memorize

  • 克隆存储库。
  • 打开文件logstash-filter-memorize.gemspec
  • s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0' 更改为s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 3.0.0'
  • 通过以下方式构建插件:gem build logstash-filter-memorize.gemspec
  • 通过以下方式安装它:$ bin/logstash-plugin install /path/to/memorize/logstash-filter-memorize-0.9.1.gem

我试过了,似乎可以。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-03-09
    • 2019-10-29
    • 1970-01-01
    • 2015-11-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多