【问题标题】:Logstash indexing JSON arraysLogstash 索引 JSON 数组
【发布时间】:2014-02-27 11:37:27
【问题描述】:

Logstash 很棒。我可以像这样发送 JSON(多行以方便阅读):

{
  "a": "one"
  "b": {
    "alpha":"awesome"
  }
}

然后使用搜索词b.alpha:awesome 在 kibana 中查询该行。不错。

但是我现在有一个这样的 JSON 日志行:

{
  "different":[
    {
      "this": "one",
      "that": "uno"
    },
    {
      "this": "two"
    }
  ]
}

我希望能够通过类似different.this:two(或different.this:one,或different.that:uno)的搜索找到这一行

如果我直接使用 Lucene,我会遍历 different 数组,并为其中的每个哈希生成一个新的搜索索引,但 Logstash 目前似乎像这样摄取该行:

不同:{this: one, that: uno}, {this: two}

这不会帮助我使用 different.thisdifferent.that 搜索日志行。

任何人对我可以进行的编解码器、过滤器或代码更改有什么想法吗?

【问题讨论】:

  • 索引数组后你想要的 JSON 格式是什么?

标签: json logstash


【解决方案1】:

您可以编写自己的filter(复制粘贴,重命名类名,config_name 并重写filter(event) 方法)或修改当前的JSON 过滤器(Github 上的source

您可以在名为json.rb 的以下路径logstash-1.x.x\lib\logstash\filters 中找到JSON 过滤器(Ruby 类)源代码。 JSON过滤器将内容解析为JSON如下

begin
  # TODO(sissel): Note, this will not successfully handle json lists
  # like your text is '[ 1,2,3 ]' JSON.parse gives you an array (correctly)
  # which won't merge into a hash. If someone needs this, we can fix it
  # later.
  dest.merge!(JSON.parse(source))

  # If no target, we target the root of the event object. This can allow
  # you to overwrite @timestamp. If so, let's parse it as a timestamp!
  if !@target && event[TIMESTAMP].is_a?(String)
    # This is a hack to help folks who are mucking with @timestamp during
    # their json filter. You aren't supposed to do anything with
    # "@timestamp" outside of the date filter, but nobody listens... ;)
    event[TIMESTAMP] = Time.parse(event[TIMESTAMP]).utc
  end

  filter_matched(event)
rescue => e
  event.tag("_jsonparsefailure")
  @logger.warn("Trouble parsing json", :source => @source,
               :raw => event[@source], :exception => e)
  return
end

你可以修改解析过程来修改原来的JSON

  json  = JSON.parse(source)
  if json.is_a?(Hash)
    json.each do |key, value| 
        if value.is_a?(Array)
            value.each_with_index do |object, index|
                #modify as you need
                object["index"]=index
            end
        end
    end
  end
  #save modified json
  ......
  dest.merge!(json)

然后您可以修改配置文件以使用/您的新/修改后的 JSON 过滤器并放置在\logstash-1.x.x\lib\logstash\config

这是我的 elastic_with_json.conf,带有修改后的 json.rb 过滤器

input{
    stdin{

    }
}filter{
    json{
        source => "message"
    }
}output{
    elasticsearch{
        host=>localhost
    }stdout{

    }
}

如果您想使用新过滤器,可以使用 config_name 对其进行配置

class LogStash::Filters::Json_index < LogStash::Filters::Base

  config_name "json_index"
  milestone 2
  ....
end

并配置它

input{
    stdin{

    }
}filter{
    json_index{
        source => "message"
    }
}output{
    elasticsearch{
        host=>localhost
    }stdout{

    }
}

希望这会有所帮助。

【讨论】:

    【解决方案2】:

    为了快速而肮脏的 hack,我使用了 Ruby 过滤器和下面的代码,不再需要使用开箱即用的“json”过滤器

    input {
      stdin{}
    }
    
    filter {
      grok {
        match => ["message","(?<json_raw>.*)"]
      }
      ruby {
        init => "
          def parse_json obj, pname=nil, event
             obj = JSON.parse(obj) unless obj.is_a? Hash
             obj = obj.to_hash unless obj.is_a? Hash
    
             obj.each {|k,v|
             p = pname.nil?? k : pname
             if v.is_a? Array
               v.each_with_index {|oo,ii|               
                 parse_json_array(oo,ii,p,event)
               }
               elsif v.is_a? Hash
                 parse_json(v,p,event)
               else
                 p = pname.nil?? k : [pname,k].join('.')
                 event[p] = v
               end
             }
            end
    
            def parse_json_array obj, i,pname, event
              obj = JSON.parse(obj) unless obj.is_a? Hash
              pname_ = pname
              if obj.is_a? Hash
                obj.each {|k,v|
                  p=[pname_,i,k].join('.')
                  if v.is_a? Array
                    v.each_with_index {|oo,ii|
                      parse_json_array(oo,ii,p,event)
                    }
                  elsif v.is_a? Hash
                    parse_json(v,p, event)
                  else
                    event[p] = v
                  end
                }
              else
                n = [pname_, i].join('.')
                event[n] = obj
              end
            end
          "
          code => "parse_json(event['json_raw'].to_s,nil,event) if event['json_raw'].to_s.include? ':'"
        }
    
    
      }
    
    output {
      stdout{codec => rubydebug}
    }
    

    测试json结构

    {"id":123, "members":[{"i":1, "arr":[{"ii":11},{"ii":22}]},{"i":2}], "im_json":{"id":234, "members":[{"i":3},{"i":4}]}}
    

    这是什么输出

          {
               "message" => "{\"id\":123, \"members\":[{\"i\":1, \"arr\":[{\"ii\":11},{\"ii\":22}]},{\"i\":2}], \"im_json\":{\"id\":234, \"members\":[{\"i\":3},{\"i\":4}]}}",
              "@version" => "1",
            "@timestamp" => "2014-07-25T00:06:00.814Z",
                  "host" => "Leis-MacBook-Pro.local",
              "json_raw" => "{\"id\":123, \"members\":[{\"i\":1, \"arr\":[{\"ii\":11},{\"ii\":22}]},{\"i\":2}], \"im_json\":{\"id\":234, \"members\":[{\"i\":3},{\"i\":4}]}}",
                    "id" => 123,
           "members.0.i" => 1,
    "members.0.arr.0.ii" => 11,
    "members.0.arr.1.ii" => 22,
           "members.1.i" => 2,
               "im_json" => 234,
           "im_json.0.i" => 3,
           "im_json.1.i" => 4
          }
    

    【讨论】:

    • 随着时间的推移,这个解决方案应该仍然有效,但以一种笨拙的方式 - 我会说。可预测的 json 结构最好使用预定义的映射,对于内部带有数组的不可预测的 json,您仍然可以做类似的事情,但在您自己的自定义过滤器中而不是 ruby​​ 过滤器中
    • 在最近的 elasticsearch 版本中,字段名称中不能有句点。如果您被这个解决方案所吸引,请使用另一个角色。
    【解决方案3】:

    我喜欢的解决方案是 ruby​​ 过滤器,因为它要求我们不要编写另一个过滤器。但是,该解决方案创建的字段位于 JSON 的“根”上,并且很难跟踪原始文档的外观。

    我想出了类似的东西,它更容易理解,并且是一种递归解决方案,因此更简洁。

    ruby {
        init => "
            def arrays_to_hash(h)
              h.each do |k,v|
                # If v is nil, an array is being iterated and the value is k.
                # If v is not nil, a hash is being iterated and the value is v.
                value = v || k
                if value.is_a?(Array)
                    # "value" is replaced with "value_hash" later.
                    value_hash = {}
                    value.each_with_index do |v, i|
                        value_hash[i.to_s] = v
                    end
                    h[k] = value_hash
                end
    
                if value.is_a?(Hash) || value.is_a?(Array)
                  arrays_to_hash(value)
                end
              end
            end
          "
          code => "arrays_to_hash(event.to_hash)"
    }
    

    它将数组转换为具有每个键作为索引号。更多详情:- http://blog.abhijeetr.com/2016/11/logstashelasticsearch-best-way-to.html

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-04-03
      • 1970-01-01
      • 2014-12-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多