【问题标题】:Weighted Average value over documents in elastic search弹性搜索中文档的加权平均值
【发布时间】:2021-11-22 12:05:43
【问题描述】:

我需要使用弹性搜索计算加权平均值,我无法更改文档的结构。如果我们假设有 2 个索引文档。
第一个文档

const doc1 = {
  "id": "1",
  "userId: "2",
  "scores" : [
    {
      "name": "score1",
      "value": 93.0
    },
    {
      "name": "score2",
      "value": 90.0
    },
    {
      "name": "score3",
      "value": 76.0
    }
  ],
  "metadata": {
    "weight": 130
  }
}

第二份文件

const doc2 = {
  "id": "2",
  "userId: "2",
  "scores" : [
    {
      "name": "score1",
      "value": 80.0
    },
    {
      "name": "score2",
      "value": 70.0
    },
    {
      "name": "score3",
      "value": 88.0
    }
  ],
  "metadata": {
    "weight": 50
  }
}

应按以下公式计算:

score1Avg = (doc1.scores['score1'].value * doc1.metadata.weight +
       doc2.scores['score1'].value * doc2.metadata.weight)/(doc1.weight+doc2.weight)

score2Avg = (doc1.scores['score2'].value * doc1.metadata.weight +
       doc2.scores['score2'].value * doc2.metadata.weight)/(doc1.weight+doc2.weight)

score3Avg = (doc1.scores['score3'].value * doc1.metadata.weight +
       doc2.scores['score3'].value * doc2.metadata.weight)/(doc1.weight+doc2.weight)

我尝试使用嵌套类型映射scores,但无法访问父文档字段metadata.weight。应该如何处理,我应该使用嵌套类型映射还是可以通过其他方式完成?

编辑:我最终将分数元素存储为单独的文档。现在我有以下文档,而不是 doc1。

{
  "id": "1",
  "userId: "2",
  "score": {
      "name": "score1",
      "value": 93.0
    },
  "metadata": {
    "weight": 130
  }
}
{
  "id": "1",
  "userId: "2",
  "score": {
      "name": "score2",
      "value": 90.0
    },
  "metadata": {
    "weight": 130
  }
}
{
  "id": "1",
  "userId: "2",
  "score": {
      "name": "score3",
      "value": 76.0
    },
  "metadata": {
    "weight": 130
  }
}

查询是:

GET /scores/_search
{
  "size": 0,
   "aggs": {
        "group_by_score_and_user": {
              "composite": {
                "sources": [
                  {
                    "scoreName": {
                      "terms": {
                        "field": "score.name.keyword"
                      }
                    }
                  },{
                    "userId": {
                      "terms": {
                        "field": "userId.keyword"
                      }
                    }
                  }
                ]
              },
              "aggs": {
                "avg": {
                  "weighted_avg": {
                    "value":{ "field": "score.value" },
                    "weight":{ "field": "metadata.weight" }
                  }
                }
              }
            }
  }
}

顺便说一句,使用脚本方法对 5k 个文档进行查询平均需要 120 毫秒,而对 100k 个文档进行查询大约需要 35-40 毫秒。

【问题讨论】:

    标签: elasticsearch aggregation opensearch


    【解决方案1】:

    编辑以符合评论中的要求,就像我之前说的那样根本不是最佳解决方案,脚本 + params._source + my subpar java 的使用将导致这非常处理大量文档时速度慢或无法使用。

    我还是学到了很多

    映射:

    {
      "mappings": {
        "properties": {
          "id": {
            "type": "keyword"
          },
          "userId": {
            "type": "keyword"
          },
          "scores": {
            "properties": {
              "name": {
                "type": "keyword"
              },
              "value": {
                "type": "float"
              }
            }
          },
          "metadata": {
            "properties": {
              "weight": {
                "type": "float"
              }
            }
          }
        }
      }
    }
    

    文档:

    POST ron_test/_doc/1
    {
      "id": "1",
      "userId": "2",
      "scores" : [
        {
          "name": "score1",
          "value": 93.0
        },
        {
          "name": "score2",
          "value": 90.0
        },
        {
          "name": "score3",
          "value": 76.0
        }
      ],
      "metadata": {
        "weight": 130
      }
    }
    
    POST ron_test/_doc/2
    {
      "id": "2",
      "userId": "2",
      "scores" : [
        {
          "name": "score1",
          "value": 80.0
        },
        {
          "name": "score2",
          "value": 70.0
        },
        {
          "name": "score3",
          "value": 88.0
        }
      ],
      "metadata": {
        "weight": 50
      }
    }
    
    POST ron_test/_doc/3
    {
      "id": "2",
      "userId": "2",
      "scores" : [
        {
          "name": "score1",
          "value": 80.0
        },
        {
          "name": "score2",
          "value": 70.0
        },
        {
          "name": "score9",
          "value": 88.0
        }
      ],
      "metadata": {
        "weight": 12
      }
    }
    
    POST ron_test/_doc/4
    {
      "id": "2",
      "userId": "2",
      "scores" : [
        {
          "name": "score9",
          "value": 50.0
        }
      ],
      "metadata": {
        "weight": 17
      }
    }
    
    

    查询

    GET ron_test/_search
    {
      "size": 0,
      "aggs": {
        "weigthed_avg": {
          "scripted_metric": {
            "init_script": """
            state.name_to_sum = new HashMap();
            state.name_to_weight = new HashMap();
            """,
            "map_script": """
            for (score in params._source['scores']){
              def name = score['name'];
              def value = score['value'];
              def weight = doc['metadata.weight'].value;
              
              if (state.name_to_sum.containsKey(name)){
                state.name_to_sum[name] += value * weight;
              }
              else {
                state.name_to_sum[name] = value * weight;
              }
              
              if (state.name_to_weight.containsKey(name)){
                state.name_to_weight[name] += weight;
              }
              else {
                state.name_to_weight[name] = weight;
              }
              
            }
            """,
            "combine_script": "return [state.name_to_sum, state.name_to_weight]",
            "reduce_script": """
            def total_score_per_name = new HashMap();
            def total_weigth_per_name = new HashMap();
            
            for (state in states){
              total_score_per_name = Stream.concat(total_score_per_name.entrySet().stream(), state[0].entrySet().stream())
                 .collect(Collectors.groupingBy(Map.Entry::getKey,
                 Collectors.summingDouble(Map.Entry::getValue)));
                 
              total_weigth_per_name = Stream.concat(total_weigth_per_name.entrySet().stream(), state[1].entrySet().stream())
                 .collect(Collectors.groupingBy(Map.Entry::getKey,
                 Collectors.summingDouble(Map.Entry::getValue)));
            }
            
            def results = new HashMap();
            total_score_per_name.forEach((name, score) -> results[name] = score / total_weigth_per_name[name]);
            return results;
            """
          }
        }
      }
    }
    

    结果

    {
      "took" : 258,
      "timed_out" : false,
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 4,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [ ]
      },
      "aggregations" : {
        "weigthed_avg" : {
          "value" : {
            "score9" : 65.72413793103448,
            "score2" : 83.54166666666667,
            "score3" : 79.33333333333333,
            "score1" : 88.80208333333333
          }
        }
      }
    }
    

    有关脚本化指标的更多信息 https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-scripted-metric-aggregation.html

    顺便说一句,我选择简化的方法是在每个嵌套分数中插入 metadata.weight 值

    【讨论】:

    • 恐怕这不是一个满意的解决方案,score1、2 和 3 只是示例,可以有任意数量的分数,它们的名称不同但整体文件相同,所以它会如果有某种方法可以按score.name 值分组到存储桶中然后进行计算,那就太好了,有没有办法为嵌套文档提供权重?我成功地将它们分组,但无法进行计算,因为我缺少对 metadata.weight 的引用。谢谢。
    • @NemanjaStankovic 编辑了我的答案
    • 正是我的想法,你说的最后一个,在每个分数中都添加权重。因为不会超过10个分数。但每月可能会创建大约 4800 万份文档。所以脚本方法可能迟早会失败。感谢您的时间和精力。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-01
    • 1970-01-01
    • 2015-12-18
    • 2022-08-12
    • 2016-10-08
    • 2017-09-09
    相关资源
    最近更新 更多