【问题标题】:Elasticsearch Aggregation to pandas DataframeElasticsearch 聚合到 pandas 数据框
【发布时间】:2020-01-24 16:28:10
【问题描述】:

我正在处理一些 ElasticSearch 数据,我想从 Kibana 中的聚合生成表。下面是基于以下代码的聚合示例输出:

    s.aggs.bucket("name1", "terms", field="field1").bucket(
        "name2", "terms", field="innerField1"
    ).bucket("name3", "terms", field="InnerAgg1")
     response = s.execute()
   resp_dict = response.aggregations.name.buckets




{
    "key": "Locationx",
    "doc_count": 12,
    "name2": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "Sub-Loc1",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }, {
            "key": "Sub-Loc2",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }]
    }
}

在这种情况下,预期的输出是:

现在,我尝试了多种方法,并简要说明了问题所在:

Pandasticsearch = 即使只有 1 个字典也完全失败。没有创建字典,因为它正在与键作斗争,即使每个字典都是单独处理的:

for d in resp_dict :
    x= d.to_dict()
    pandas_df = Select.from_dict(x).to_pandas()
    print(pandas_df)

特别是,收到的错误与没有制作字典的事实有关,因此 ['took'] 不是键。

Pandas (pd.Dataframe.from_records()) = 只给了我第一个聚合,一列包含内部字典,并在其上使用 pd.apply(pd.Series) 给出了另一个结果字典表。

StackOverflow 发布 recursive function = 字典看起来与使用的示例完全不同,除非我彻底改变输入,否则修修补补让我无处可去。

【问题讨论】:

    标签: python pandas dataframe elasticsearch


    【解决方案1】:

    遇到同样的问题,我开始相信这是因为 response_dict 不是普通的字典,而是 elasticsearch_dsl.utils.AttrDictelasticsearch_dsl.utils.AttrList

    如果您有 AttrListAttrDicts,则可以这样做:

    resp_dict = response.aggregations.name.buckets
    new_response = [i._d_ for i in resp_dict]
    

    改为获取普通字典的列表。这可能会更好地与其他库一起使用。

    编辑:

    我写了一个递归函数,它至少可以处理一些情况,虽然还没有经过广泛的测试,也没有包装在一个好的模块或任何东西中。这只是一个脚本。 one_lvl 函数在名为 tmp 的字典中跟踪树中的所有兄弟姐妹和父母的兄弟姐妹,并在找到新的命名聚合时递归。它假设了很多关于数据结构的假设,我不确定在一般情况下是否有必要。

    我认为lvl 的内容是必要的,因为您可能有重复的名称,因此key 存在于多个聚合级别。

    #!/usr/bin/env python3
    
    from elasticsearch_dsl.query import QueryString
    from elasticsearch_dsl import Search, A
    from elasticsearch import Elasticsearch
    import pandas as pd
    
    PORT = 9250
    TIMEOUT = 10000
    USR = "someusr"
    PW = "somepw"
    HOST = "test.com"
    INDEX = "my_index"
    QUERY = "foobar"
    
    client = Elasticsearch([HOST], port = PORT, http_auth=(USR, PW), timeout = TIMEOUT)
    
    qs = QueryString(query = QUERY)
    s = Search(using=client, index=INDEX).query(qs)
    
    s = s.params(size = 0)
    
    agg= {
        "dates" : A("date_histogram", field="date", interval="1M", time_zone="Europe/Berlin"),
        "region" : A("terms", field="region", size=10),
        "county" : A("terms", field="county", size = 10)
    }
    
    s.aggs.bucket("dates", agg["dates"]). \
           bucket("region", agg["region"]). \
           bucket("county", agg["county"])
    
    resp = s.execute()
    
    data = {"buckets" : [i._d_ for i in resp.aggregations.dates]}
    rec_list = ["buckets"] + [*agg.keys()]
    
    def get_fields(i, lvl):
        return {(k + f"{lvl}"):v for k, v in i.items() if k not in rec_list}
    
    def one_lvl(data, tmp, lvl, rows, maxlvl):
        tmp = {**tmp, **get_fields(data, lvl)}
    
        if "buckets" not in data:
            rows.append(tmp)
    
        for d in data:
            if d in ["buckets"]:
                for v, b in enumerate(data[d]):
                    tmp = {**tmp, **get_fields(data[d][v], lvl)}
                    for k in b:
                        if k in agg.keys():
                            one_lvl(data[d][v][k], tmp, lvl+1, rows, maxlvl)
                        else:
                            if lvl == maxlvl:
                                tmp = {**tmp, (k + f"{lvl}") : data[d][v][k]}
                                rows.append(tmp)
    
        return rows
    
    
    rows = one_lvl(data, {}, 1, [], len(agg))
    df = pd.DataFrame(rows)
    
    

    【讨论】:

    • 老实说,这是目前可以做的最好的事情。我很想找到一个更好的方法,但把它变成一堆 dicts 并使用它们就是我这样做的方式
    • 我尝试寻找递归解决方案,编辑了帖子。
    猜你喜欢
    • 2021-11-04
    • 2017-02-05
    • 1970-01-01
    • 2017-06-23
    • 2013-02-06
    • 2023-02-16
    • 1970-01-01
    • 2018-12-04
    • 1970-01-01
    相关资源
    最近更新 更多