【问题标题】:reparsing a logstash record? fix extracts?重新解析logstash记录?修复提取物?
【发布时间】:2018-10-09 08:39:58
【问题描述】:

我正在接收一条 JSON 消息(Cloudtrail,许多对象连接在一起),当我过滤完它时,Logstash 似乎没有正确解析消息。就好像散列被简单地转储到一个字符串中。

总之,这里是输入和过滤器。

input {
  s3 {
    bucket => "stanson-ops"
    delete => false
    #snipped unimportant bits
    type => "cloudtrail"
  }
}

filter {
  if [type] == "cloudtrail" {
    json { # http://logstash.net/docs/1.4.2/filters/json
      source => "message"
    }
    ruby {
      code => "event['RecordStr'] = event['Records'].join('~~~')"
    }
    split {
      field => "RecordStr"
      terminator => "~~~"
      remove_field => [ "message", "Records" ]
    }
  }
}

当我完成时,elasticsearch 条目包含一个带有以下数据的 RecordStr 键。它没有message 字段,也没有Records 字段。

{"eventVersion"=>"1.01", "userIdentity"=>{"type"=>"IAMUser", "principalId"=>"xxx"}}

请注意,这不是不是 JSON 样式,它已被解析。 (这对于 concat->split 的工作很重要)。

因此,RecordStr 键作为一个值看起来并不完全正确。此外,在 Kibana 中,可过滤字段包括 RecordStr(无子字段)。它包括一些不再存在的条目:Records.eventVersionRecords.userIdentity.type

这是为什么呢?如何获得正确的字段?

edit 1这是输入的一部分。

{"Records":[{"eventVersion":"1.01","userIdentity":{"type":"IAMUser",

这是未经修饰的 JSON。看起来文件的主体(上面)在message 字段中,json 提取它,我最终在Records 字段中得到一组记录。这就是我加入并拆分它的原因——然后我得到了单独的文档,每个文档都有一个 RecordStr 条目。但是,模板(?)似乎不理解新结构。

【问题讨论】:

  • 输入的数据是什么样的?
  • @MagnusBäck see this documentation,给出了一个可读的例子。
  • 该示例是否准确地代表了日志的样子,或者 JSON 是否打印得很漂亮?我在问,因为您的配置无法解析该示例。也许cloudtrail codec 会有用?
  • @MagnusBäck 我添加了前一百个左右的字符。让我知道是否需要更多,但它似乎排队 - 我也做了解释。 cloudtrail 编解码器doesn't exist on github- 不知道为什么会有文档。
  • 正如文档所说,它是 contrib 插件包的一部分,可以在它自己的 git 中找到。请参阅installation instructions

标签: elasticsearch logstash kibana amazon-cloudtrail


【解决方案1】:

我已经制定了一种方法,可以根据您的要求为相应的 CloudTrail 字段编制索引。以下是修改后的输入和过滤器配置:

input {
  s3 {
    backup_add_prefix => \"processed-logs/\"
    backup_to_bucket => \"test-bucket\"
    bucket => \"test-bucket\"
    delete => true
    interval => 30
    prefix => \"AWSLogs/<account-id>/CloudTrail/\"
    type => \"cloudtrail\"
  }
}

filter {
  if [type] == \"cloudtrail\" {
    json {
      source => \"message\"
    }
    ruby {
      code => \"event.set('RecordStr', event.get('Records').join('~~~'))\"
    }
    split {
      field => \"RecordStr\"
      terminator => \"~~~\"
      remove_field => [ \"message\", \"Records\" ]
    }
    mutate {
      gsub => [
        \"RecordStr\", \"=>\", \":\"
      ]
    }
    mutate {
      gsub => [
        \"RecordStr\", \"nil\", \"null\"
      ]
    }
    json {
      skip_on_invalid_json => true
      source => \"RecordStr\"
      target => \"cloudtrail\"
    }
    mutate {
      add_tag => [\"cloudtrail\"]
      remove_field=>[\"RecordStr\", \"@version\"]
    }
    date {
      match => [\"[cloudtrail][eventTime]\",\"ISO8601\"]
    }
  }
}

这里的关键观察是,一旦拆分完成,我们在事件中不再拥有有效的 json,因此需要执行变异替换('=>' 到 ':' 和 'nil' 到 'null') .此外,我发现从 CloudTrail eventTime 中获取时间戳并对不必要的字段进行一些清理很有用。

【讨论】:

    猜你喜欢
    • 2016-02-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-02
    • 1970-01-01
    • 1970-01-01
    • 2016-06-12
    • 1970-01-01
    相关资源
    最近更新 更多