【问题标题】:How to remove all fields with NULL value in Logstash filter如何在 Logstash 过滤器中删除所有具有 NULL 值的字段
【发布时间】:2023-04-06 19:43:01
【问题描述】:

我正在使用 logstash 读取 csv 格式的检查点日志文件 并且某些字段具有空值。

我想删除所有值为空的字段。

我无法准确预测哪些字段(键)将具有空值,因为我在 csv 文件中有 150 列,我不想检查每一列。

是否可以在 logstash 中进行动态过滤,以删除任何具有空值的字段?

我的 logstash 配置文件如下所示:

input {
  stdin { tags => "checkpoint" } 
   file {
   type => "file-input"
   path =>  "D:\Browser Downloads\logstash\logstash-1.4.2\bin\checkpoint.csv"
   sincedb_path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\sincedb-access2"
   start_position => "beginning"
   tags => ["checkpoint","offline"]
  }
}
filter {
 if "checkpoint" in [tags] {
        csv {
        columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
      #  remove_field => [ any fields with null value] how to do it please 
        separator => "|"
        }
    # drop csv header
        if [num] == "num" and [date] == "date" and [time] == "time" and [orig] == "orig" {
        drop { }
    }
    }
  }

}
output {
   stdout {
    codec => rubydebug 
  }
   file {
      path => "output.txt"
   }

我在这里附上一些日志示例:

num|date|time|orig|type|action|alert|i/f_name|i/f_dir|product|Internal_CA:|serial_num:|dn:|sys_message:|inzone|outzone|rule|rule_uid|rule_name|service_id|src|dst|proto|service|s_port|dynamic object|change type|message_info|StormAgentName|StormAgentAction|TCP packet out of state|tcp_flags|xlatesrc|xlatedst|NAT_rulenum|NAT_addtnl_rulenum|xlatedport|xlatesport|fw_message|ICMP|ICMP Type|ICMP Code|DCE-RPC Interface UUID|rpc_prog|log_sys_message|scheme:|Validation log:|Reason:|Serial num:|Instruction:|fw_subproduct|vpn_feature_name|srckeyid|dstkeyid|user|methods:|peer gateway|IKE:|CookieI|CookieR|msgid|IKE notification:|Certificate DN:|IKE IDs:|partner|community|Session:|L2TP:|PPP:|MAC:|OM:|om_method:|assigned_IP:|machine:|reject_category|message:|VPN internal source IP|start_time|connection_uid|encryption failure:|vpn_user|Log ID|message|old IP|old port|new IP|new port|elapsed|connectivity_state|ctrl_category|description|description |severity|auth_status|identity_src|snid|src_user_name|endpoint_ip|src_machine_name|src_user_group|src_machine_group|auth_method|identity_type|Authentication trial|roles|dst_user_name|dst_machine_name|spi|encryption fail reason:|information|error_description|domain_name|termination_reason|duration
0|8Jun2012|16:33:35|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 1|8Jun2012|16:36:34|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 3|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Initiated certificate is now valid|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 4|8Jun2012|16:55:44|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Issued empty CRL 1|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
20|8Jun2012|16:58:28|10.0.0.1|log|accept||eth1|inbound|VPN-1 & FireWall-1|||||Internal|External|1|{2A42C8CD-148D-4809-A480-3171108AD6C7}||domain-udp|192.168.100.1|198.32.64.12|udp|53|1036|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

【问题讨论】:

    标签: logging elasticsearch logstash checkpoint


    【解决方案1】:
    ruby {
                init => "
                    def removeEmptyField(event,h,name)
                        h.each do |k,v|
                                if (v.is_a?(Hash) || v.is_a?(Array)) && v.to_s != '{}'
                                    removeEmptyField(event,v,String.new(name.to_s) << '[' << k.to_s << ']')
                                else
                                if v == '' || v.to_s == '{}'
                                    event.remove(String.new(name.to_s) << '[' << k.to_s << ']')
                                end
                            end
                        end
                    end
                "
                code => "
                    removeEmptyField event,event.to_hash,''
                "
        }
    

    【讨论】:

    • 我可以确认上面的代码符合预期 - 删除具有空值的字段。使用logstash 6.8.1。我也对其进行了一些增强,以删除带有“-”值的字段。只需将第二行 if 设为:if v == '' || v.to_s == '{}' || v == '-'
    • 其实要删除null值的字段,还需要加上|| v == nil
    【解决方案2】:

    如果您需要递归删除所有 null、空白和空字段(0false 保留),此功能可能会有所帮助。它使用 Logstash 中的 Ruby 过滤器。它绝不是优雅的,但似乎非常有效。

    filter {
        ruby {
            init => "
                def Compact(key)
                    modifiedKey = nil
                    parentKey = nil
    
                    if key.kind_of?(String)
                        if key.start_with?('[')
                            modifiedKey = key
                        else
                            modifiedKey = key.sub( /([^\[^\]]*)/, '[\1]')
                        end
    
                    parentKey = modifiedKey.sub(/\[[^\[]+\]$/, '') unless modifiedKey.sub(/\[[^\[]+\]$/, '').strip.empty?
                    end
    
                    unless modifiedKey.nil?
                        if event.get(modifiedKey).is_a?(Enumerable) &&
                        (event.get(modifiedKey).nil? || event.get(modifiedKey).empty?)
                             event.remove(modifiedKey)
                        elsif event.get(modifiedKey).to_s.strip.empty? || event.get(modifiedKey).nil?
                             event.remove(modifiedKey)
                         end
    
                        if !parentKey.nil? && event.get(parentKey).is_a?(Enumerable) &&
                        (event.get(parentKey).nil? || event.get(parentKey).empty?)
                            event.remove(parentKey)
                        end
                    end
    
                   if key == event.to_hash ||
                   event.get((modifiedKey ? modifiedKey : '')).is_a?(Enumerable)
                       key = event.get(modifiedKey) unless modifiedKey.nil?
                       key.each{ |k|
                          Compact(%{#{modifiedKey ? modifiedKey : ''}[#{k.first}]}) if k.is_a?(Enumerable)
                       }
                   end
    
                   rescue Exception => e
                       puts %{ruby_exception_#{__method__.to_s} - #{e}}
               end
          "
    
         code => "
             Compact(event.to_hash)
         "
        }
    }
    

    【讨论】:

      【解决方案3】:

      检查csv 过滤器的skip_empty_columns 选项 - 在我的用例中真的很有帮助。 :)

      用法:

      skip_empty_columns => true
      

      【讨论】:

      • 哇!那效果很好!很好很简单。
      【解决方案4】:

      Ruby过滤器可以满足您的要求。

      input {
              stdin {
              }
      }
      
      filter {
              csv {
                      columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
                      separator => "|"
              }
              ruby {
                      code => "
                              hash = event.to_hash
                              hash.each do |k,v|
                                      if v == nil
                                              event.remove(k)
                                      end
                              end
                      "
              }
      }
      
      output {
          stdout { codec => rubydebug }
      }
      

      您可以使用 ruby​​ 插件过滤所有具有 nil 值的字段(在 Ruby 中为 null)

      更新:

      这是我的环境:Windows server 2008 和 Logstash 1.4.1。 你的日志样本对我有用!我已经更新了配置、输入和输出。

      输入

      2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
      

      输出:

      {
              "@version" => "1",
            "@timestamp" => "2015-03-12T00:30:34.123Z",
                  "host" => "BENLIM",
                   "num" => "2",
                  "date" => "8Jun2012",
                  "time" => "16:52:39",
                  "orig" => "10.0.0.1",
                  "type" => "log",
                "action" => "keyinst",
              "i/f_name" => "daemon",
               "i/f_dir" => "inbound",
               "product" => "VPN-1 & FireWall-1",
          "Internal_CA:" => "Certificate initialized",
           "serial_num:" => "86232",
                   "dn:" => "CN=fw-KO,O=sc-KO.KO.dc.obn8cx"
      }
      

      【讨论】:

      • 您好 Ben 感谢您的帮助,我正在测试您的 ruby​​ 答案,但在输出文件中仍然存在空字段...
      • 我能知道字段值是“nil”还是“null”?如果要删除值为“null”的字段,请在要使用的ruby代码中使用“if v=='null'”
      • 当您的 logstash 输出是文件或弹性搜索时,这不起作用
      • 但它在标准输出或文件中对我有用。你能提供你的日志样本吗?
      • 嗨 Ben 和 10x 帮助我。我更新了我的问题并提供了一个日志示例
      【解决方案5】:

      要动态执行此操作,您需要使用 ruby​​{} 过滤器。有一些很好的示例代码in this answer

      【讨论】:

      • 您好 Alain thnks 帮助我,我正在尝试使用 ruby​​ 过滤器,但我是 ruby​​ 新手.... 输出到我的文件的结果仍然有一个空字段,我会很高兴获取我可以测试的具体问题的代码示例
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-18
      • 2021-04-14
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多