【问题标题】:How to parse IP within fields in ELK如何在 ELK 的字段中解析 IP
【发布时间】:2018-09-28 14:18:49
【问题描述】:

我正在尝试自动化/简化审查 ELK(ElasticSearch、Logstash、Kibana)中防火墙规则的程序。 我有一些从 CSV 获得的数据,其结构如下:

Source;Destination;Service;Action;Comment
10.0.0.0/8 172.16.0.0/16 192.168.0.0/24 23.2.20.6;10.0.0.1 10.0.0.2 10.0.0.3;udp:53 
tcp:53;accept;No.10: ID: INC0000000001

我的目标是通过解析每个字段(用于子网和/或 IP 地址)在 ELK 中导入这些数据,如果可能,添加一个包含每个字段的顺序字段(IP_Source1、IP_Destination2 等)。

据您所知,这可能吗?怎么样?

感谢您提供的任何提示

【问题讨论】:

  • IP_Source 和 IP_Destincation 的数量可以不同吗?我想会的。

标签: csv logstash kibana firewall elastic-stack


【解决方案1】:

您可以创建一个将输入作为文件的 logstash 配置。然后使用第一个 csv 过滤器。 CSV 过滤器应如下所示。

filter {
    csv {
        columns => ["source", "destination", "service", "action", "comment"]
        separator => ";"
    }
}

下一个过滤器需要是红宝石过滤器。

filter { 
    ruby { 
       code => "
          arr = event.get(source).split('')
          arr.each.with_index(1) do |a, index|
               event.set(ip_source+index, a)
          end
       "
    }
}

Final 会输出到 elasticsearch。

我没有测试过代码。但我希望这能给你很好的提示。

【讨论】:

  • 非常感谢 nitzien,我会测试您的脚本并让您知道它是否有效。是的,Sources 和 Destinations 的数量应该因字段而异。再次感谢。
  • 我没有测试过代码。因此,可能需要进行一些小的调整。 :)
  • 你好,尼兹恩。我已经测试了您的代码,但是它在第 26 行(Ruby 代码的开头)返回了一个错误。不幸的是,我不熟悉 Ruby 语言。你知道我可以快速学习它的任何来源吗?非常感谢。
  • 这里是:执行动作失败 {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected #, => 之一,在第 26 行,第 8 列(字节 537)过滤后 {\nruby { \n init => \"\n def split_str source, target event\n arr = event.get(source).split( \" \") \n arr.each.with_index(1) do |a, index|\n event.set(target+index, a)\n end\n end\n \"\n "
  • 你能不能把这个 arr = event.get(source).split(" ") 改成 arr = event.get(source).split(' ')。基本上将双引号替换为单引号。
猜你喜欢
  • 2019-05-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多