【问题标题】:elasticsearch aggregates some values in a single fieldelasticsearch 在单个字段中聚合一些值
【发布时间】:2020-11-10 06:10:10
【问题描述】:

我有一些原始数据

{
  {
    "id":1,
    "message":"intercept_log,UDP,0.0.0.0,68,255.255.255.255,67"
  },
  {
    "id":2,
    "message":"intercept_log,TCP,172.22.96.4,52085,239.255.255.250,3702,1:"
  },
  {
    "id":3,
    "message":"intercept_log,UDP,1.0.0.0,68,255.255.255.255,67"
  },
  {
    "id":4,
    "message":"intercept_log,TCP,173.22.96.4,52085,239.255.255.250,3702,1:"
  }
}

需求

我想按消息的消息部分的值对这些数据进行分组。 这样的输出值

{
  {
    "GroupValue":"TCP",
    "DocCount":"2"
  },
  {
    "GroupValue":"UDP",
    "DocCount":"2"
  }
}

试试

  • 我已尝试使用这些代码但失败了

GET systemevent*/_search
{
  "size": 0, 
  "aggs": {
    "tags": {
      "terms": {
        "field": "message.keyword",
        "include": " intercept_log[,,](.*?)[,,].*?"
      }
    }
  },
  "track_total_hits": true
}
  • 现在我尝试使用管道来满足这一需求。
  • “aggs”似乎只对字段进行分组。
  • 有人有更好的主意吗?

链接

Terms aggregation

更新

我的场景有点特别。我从许多不同的服务器收集日志,然后将日志导入es。因此,消息字段之间存在很大差异。如果直接使用脚本语句进行分组统计,会导致分组失败或分组不准确。我尝试根据条件过滤掉一些数据,然后用脚本对操作代码进行分组(注释代码1),但是这段代码无法对正确的结果进行分组。

这是我要添加的场景:

我们团队使用es分析服务器日志,使用rsyslog将数据转发到服务器中心,然后使用logstash过滤提取数据到es。这时候ES中有一个叫message的字段,message的值就是详细的日志信息。这时,我们需要统计消息中包含一些值的数据。

注释代码1

POST systemevent*/_search
{
  "size": 0, 
  "query": {
    "bool": {
      "must": [
        {
          "match_phrase": {
            "message": {
              "query": "intercept_log"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "protocol": {
      "terms": {
        "script": "def values = /,/.split(doc['message.keyword'].value); return values.length > 1 ? values[1] : 'N/A'",
        "size": 10
      }
    }
  },
  "track_total_hits": true
}

注释代码2

POST test2/_search
{
  "size": 0,
  "aggs": {
    "protocol": {
      "terms": {
        "script": "def values = /.*,.*/.matcher( doc['host.keyword'].value ); if( name.matches() ) {return values.group(1) } else { return 'N/A' }",
        "size": 10
      }
    }
  }
}

【问题讨论】:

    标签: database elasticsearch aggregates


    【解决方案1】:

    解决此问题的最简单方法是利用 terms 聚合中的脚本。该脚本将简单地以逗号分隔并取第二个值。

        POST systemevent*/_search
        {
          "size": 0,
          "aggs": {
            "protocol": {
              "terms": {
                "script": "def values = /,/.split(doc['message.keyword'].value); return values.length > 1 ? values[1] : 'N/A';",
                "size": 10
              }
            }
          }
        }
    

    使用正则表达式

    POST test2/_search
    {
      "size": 0,
      "aggs": {
        "protocol": {
          "terms": {
            "script": "def m = /.*proto='(.*?)'./.matcher(doc['message.keyword'].value ); if( m.matches() ) { return m.group(1) } else { return 'N/A' }"
          }
        }
      }
    }
    

    结果看起来像

      "buckets" : [
        {
          "key" : "TCP",
          "doc_count" : 2
        },
        {
          "key" : "UDP",
          "doc_count" : 2
        }
      ]
    

    更好、更有效的方法是使用摄取管道或 Logstash 将 message 字段拆分为新字段。

    【讨论】:

    • 这很酷!感谢您的回答。但如果我们的消息字段没有常规值示例“aabbb”,我们将收到运行时错误。我怎样才能避免它。
    • 既然您使用的是 Logstash,我想知道为什么您在将文档索引到 ES 之前不解析消息字段。仅转发异构日志而不对其进行限定并不是一个很好的方法。我在回答中建议的内容(基于脚本的方法)只应谨慎使用,但如果您有办法在数据进入 ES 之前更好地解析数据,那么您绝对应该这样做。
    • 我们不选择更改logstash配置文件,因为我们只有一个logstash部署机会,而且我们有无数个logstash要部署,部署后无法远程升级。因此,我们只能为logstash提供一个通用的配置文件。所以我们只能在 ES 端进行过滤。我想知道是否可以使用 Scipt 中的常规进行聚合。
    • 在 Logstash 中,您绝对可以拥有多个配置而无需多个实例。它被称为pipelines,每个应用程序都可以有自己的管道。如果您有大量不同的日志流入 Logstash/ES,那么让 ES 通过将异构原始数据流式传输到 ES 来完成所有工作是没有意义的。
    • 请介绍一下我是从事铁路行业的。我们需要收集站台设备的数据。这些电台分布在我国的各个角落。他们可能在沙漠和原始森林中。那里的工作人员没有电脑概念。他们无法提供该站的数据。让我们配置独占的logstash。开发者来回一个站需要很长时间,这会让我们很难完成我们的数据收集计划。
    猜你喜欢
    • 2016-11-23
    • 2015-06-05
    • 1970-01-01
    • 2020-11-14
    • 2018-06-13
    • 2020-02-18
    • 2018-06-01
    • 1970-01-01
    • 2022-01-14
    相关资源
    最近更新 更多