【问题标题】:Grouping consecutive documents with Elasticsearch使用 Elasticsearch 对连续文档进行分组
【发布时间】:2015-11-14 10:36:09
【问题描述】:

有没有办法让 Elasticsearch 在分组时考虑序列间隙?

假设以下数据被批量导入到 Elasticsearch:

{ "index": { "_index": "test", "_type": "groupingTest", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "5" } }
{ "sequence": 5, "type": "A" }

有没有办法以某种方式查询这些数据

  • 序列号为 1 和 2 的文档进入一个输出组,
  • 序列号为 3 的文档转到另一个文档,并且
  • 序列号为 4 和 5 的文档进入第三组?

...考虑到 A 类序列被 B 类项目(或任何其他非 A 类项目)打断的事实?

我希望结果桶看起来像这样(sequence_group 的名称和值可能不同 - 只是试图说明逻辑):

"buckets": [
    {
       "key": "a",
       "sequence_group": 1,
       "doc_count": 2
    },
    {
       "key": "b",
       "sequence_group": 3,
       "doc_count": 1
    },
    {
       "key": "a",
       "sequence_group": 4,
       "doc_count": 2
    }
]

https://www.simple-talk.com/sql/t-sql-programming/the-sql-of-gaps-and-islands-in-sequences/ 上有一个很好的问题描述和一些 SQL 解决方案方法。我想知道是否还有可用的弹性搜索解决方案。

【问题讨论】:

    标签: elasticsearch gaps-and-islands


    【解决方案1】:

    我们可以在这里使用Scripted Metric Aggregation,它以map-reduce 方式工作(参考link)。它有不同的部分,如 init、map、combine 和 reduce。而且,好消息是所有这些的结果也可以是一个列表或地图。

    我玩了一下这个。

    使用的 ElasticSearch 版本:7.1

    创建索引:

    PUT test
    {
      "mappings": {
        "properties": {
          "sequence": {
            "type": "long"
          },
          "type": {
            "type": "text",
            "fielddata": true
          }
        }
      }
    }
    

    批量索引:(请注意,我删除了映射类型“groupingTest”)

    POST _bulk
    { "index": { "_index": "test", "_id": "1" } }
    { "sequence": 1, "type": "A" }
    { "index": { "_index": "test", "_id": "2" } }
    { "sequence": 2, "type": "A" }
    { "index": { "_index": "test", "_id": "3" } }
    { "sequence": 3, "type": "B" }
    { "index": { "_index": "test", "_id": "4" } }
    { "sequence": 4, "type": "A" }
    { "index": { "_index": "test", "_id": "5" } }
    { "sequence": 5, "type": "A" }
    

    查询

    GET test/_doc/_search
    {
      "size": 0,
      "aggs": {
        "scripted_agg": {
          "scripted_metric": {
            "init_script": """ 
              state.seqTypeArr = [];
            """,
            "map_script": """ 
              def seqType = doc.sequence.value + '_' + doc['type'].value;
              state.seqTypeArr.add(seqType);
            """,
            "combine_script": """
              def list = [];
              for(seqType in state.seqTypeArr) {
                list.add(seqType);
              }
              return list;
            """,
            "reduce_script": """ 
              def fullList = [];
              for(agg_value in states) {
                for(x in agg_value) {
                  fullList.add(x);
                }
              }
              fullList.sort((a,b) -> a.compareTo(b));
              def result = [];
              def item = new HashMap();
              for(int i=0; i<fullList.size(); i++) {
                def str = fullList.get(i);
                def index = str.indexOf("_");
                def ch = str.substring(index+1);
                def val = str.substring(0, index);
                if(item["key"] == null) {
                  item["key"] = ch;
                  item["sequence_group"] = val;
                  item["doc_count"] = 1;
                } else if(item["key"] == ch) {
                  item["doc_count"] = item["doc_count"] + 1;
                } else {
                  result.add(item);
                  item = new HashMap();
                  item["key"] = ch;
                  item["sequence_group"] = val;
                  item["doc_count"] = 1;
                }
              }
              result.add(item);
              return result;
            """
          }
        }
      }
    }
    

    最后是输出:

    {
      "took" : 21,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [ ]
      },
      "aggregations" : {
        "scripted_agg" : {
          "value" : [
            {
              "doc_count" : 2,
              "sequence_group" : "1",
              "key" : "a"
            },
            {
              "doc_count" : 1,
              "sequence_group" : "3",
              "key" : "b"
            },
            {
              "doc_count" : 2,
              "sequence_group" : "4",
              "key" : "a"
            }
          ]
        }
      }
    }
    

    请注意,脚本聚合对查询性能有很大影响。因此,如果没有大量文档,您可能会注意到速度有些慢。

    【讨论】:

    • 终于,在 6 年后,有一些东西看起来正是我想要的。谢谢!
    【解决方案2】:

    您始终可以进行术语聚合,然后应用热门命中聚合来获得此信息。

    {
      "aggs": {
        "types": {
          "terms": {
            "field": "type"
          },
          "aggs": {
            "groups": {
              "top_hits": {
                "size": 10
              }
            }
          }
        }
      }
    }
    

    【讨论】:

    • top hits 聚合似乎并不能解决问题。使用您建议的聚合只检索两个存储桶 - 一个用于“A”类型,一个用于“B”类型。我看不出这如何解决考虑序列间隙的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-29
    • 2013-08-31
    • 2013-02-10
    • 1970-01-01
    • 2023-04-06
    • 1970-01-01
    相关资源
    最近更新 更多