【问题标题】:Setting variables in logstash config and referencing them在 logstash 配置中设置变量并引用它们
【发布时间】:2021-06-09 06:56:24
【问题描述】:

一周前我开始使用 ELK 来存储多个 CSV 并将它们传送到 kibana 以便于分析它们。一种情况将涉及多台机器,一台机器将生成许多 CSV。现在这些 CSV 具有特定的命名模式。我正在使用一个特定文件( BrowsingHistoryView_DMZ-machine1.csv )作为参考并将案例设置为索引。为了定义一个索引,我选择将文件重命名为前缀为 '__case_number __' 。所以文件名将是 -
__1__BrowsingHistoryView_DMZ-machine1.csv

现在我想从中得出两件事。
1。获取案例编号 __1 __ 并使用 1 作为索引。 1 , 2 , 3 等将用作案例编号。
2。获取文件类型(例如 BrowsingHistoryView)并为上传的文件添加标签名称。
3。获取机器名称​​DMZ-machine1(还不知道我会在哪里使用它)。

我为它创建了一个配置文件,如下-

    file {
        path => "/home/kriss/Documents/*.csv"  # get the files from Documents
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}
filter {
    csv {
        separator => ","
        }
        if [path] =~ "BrowsingHistory" { mutate { add_tag => ["Browsinghistory"] }    # define a new tag for browser history, this worked
            grok {  match =>    ["path", "__(?<case>[0-9]+)__(?<category>\w+_)(?<machine>(.+)).csv"]     # This regex pattern is to get category(browsingHistory), MachineName
            }           
        }
        if [path] =~ "Inbound_RDP_Events" { mutate { add_tag => {"Artifact" => "RDP" } } }          
        }  # This tagging worked

output {
        elasticsearch  {
            hosts => "localhost"
            index => "%{category}"  # This referencing the category variable didn't work
        }
    
stdout {}
}

当我在 logstash 上运行此配置时,生成的索引是 %category 。我需要它来为该文件的索引捕获 browser_history。另外,如果我可以将类别转换为小写字母,因为有时大写在索引中效果不佳。
我尝试按照官方文档进行操作,但没有获得所需的完整信息。

【问题讨论】:

    标签: elasticsearch logstash


    【解决方案1】:

    在 Kibana 的开发工具中有一个 grok 调试器,您可以使用它来解决这些类型的问题,或者在 https://grokdebug.herokuapp.com/ 上在线调试器 - 很棒。

    以下是您的配置稍作修改的版本。我已经删除了你的 cmets 并插入了我自己的。

    变化如下:

    • 您的配置中的path 正则表达式与您提供的示例文件名不匹配。您可能希望将其改回,具体取决于您的示例的准确程度。
    • grok 模式已经过调整
    • 已将您的 Artifact 标记更改为一个字段,因为看起来您正在尝试创建一个字段

    我试图遵守你的间距约定 :)

    input {
        file {
            path => "/home/kriss/Documents/*.csv"
            start_position => "beginning"
            sincedb_path => "/dev/null"
        }
    }
    
    filter {
        csv {
            separator => ","
            }
            # I replaced your  regex with something matches your example 
            # filename, but given that you said you already had this
            # working, you might want to change it back.
            if [path] =~ "browser_history" {
                mutate { add_tag => ["Browsinghistory"] }
                grok {
                    # I replaced custom captures with a more grokish style, and
                    # use GREEDYDATA to capture everything up to the last '_'
                    match => [ "path", "^_+%{NUMBER:case}_+%{GREEDYDATA:category}_+%{DATA:case}\.csv$" ]
                }           
            }
            # Replaced `add_tag` with `add_field` so that the syntax makes sense
            if [path] =~ "Inbound_RDP_Events" { mutate { add_field => {"Artifact" => "RDP" } } }
            # Added the `mutate` filter's `lowercase` function for "category"
            mutate {
                lowercase => "category"
            }
        }
    
    output {
            elasticsearch  {
                hosts => "localhost"
                index => "%{category}"
            }
        
    stdout {}
    }
    

    未测试,但我希望它能给你足够的线索。

    【讨论】:

    • 感谢@tomr 的更新。我对其进行了测试,但变量值没有被传递到这里的索引- index => "%{category}" 当这个日志文件被应用到logstash时,有没有办法详细检查它?
    • 抱歉配置错误。我现在意识到我向 GROK 传递了错误的数据。路径的价值从来都不是我想要的。所以我把文件名的字面值和变量值放在输出部分的索引中。
    【解决方案2】:

    因此,供尝试在 logstash 配置文件中使用自定义变量的任何人参考。以下是工作配置-

    input {
            file {
                    path => "/home/user1/Documents/__1__BrowsingHistoryView_DMZ-machine1.csv"   # Getting the absolte path (necessary)
                    start_position => "beginning"
                    sincedb_path => "/dev/null"
            }
    }
    filter {
            csv {
                    separator => ","
                    }
                    if [path] =~ "BrowsingHistory" { mutate { add_field => {"artifact" => "Browsinghistory"} }     # if BrowsingHistory is found in path, add a tag called Browsinghistory
                            grok {  match => ["path", "__(?<case>[0-9]+)__(?<category>\w+_)(?<machine>(.+)).csv"]    # get the caseNumber, logCategory, machineName into variables
                            }
                    }
                    if [path] =~ "Inbound_RDP_Events" { mutate { add_field => {"artifact" => "RDP"} } }     # another tag if RDP event file found in path
                    }
    
    output {
                    elasticsearch  {
                            hosts => "localhost"
                            index => "%{case}"          # passing the variable value derived from regex
                            # index => "%{category}"    # another regex variable
                            # index => "%{machine}"     # another regex variable
                    }
    
    stdout {}
    }
    

    我不太确定是添加新标签还是添加新字段 (add_field => {"artifact" => "Browsinghistory"}) 以便在 kibana 中轻松识别文件。如果有人可以提供一些有关如何从中选择一个的信息。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-08-14
      • 2013-03-09
      • 1970-01-01
      • 1970-01-01
      • 2013-04-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多