【问题标题】:Optimising millions of Elasticsearch query export to csv with pandas使用 pandas 优化数百万个 Elasticsearch 查询导出到 csv
【发布时间】:2021-10-27 16:04:52
【问题描述】:

我正在尝试使用pandas 将大量elasticsearch query 结果导出到csv。我正在使用至少1 millionrecords,这需要是exported。执行花费了太多时间,在检查日志时我发现pandas 在附加数据时花费了太多时间。

为了实现这一点,我使用了 scan 辅助函数并将其拆分为 5000 个块,我可以使用该函数提取具有无限 scroll 的所有 elasticsearch 数据

我的代码如下:

for hit in scan(elastic_client, index=index, query=query, scroll='20h', clear_scroll=True, size=5000):
    scan_source_data = hit["_source"]
    scan_id = hit["_id"]
    scan_doc_data = pandas.Series(scan_source_data, name=scan_id)
    scan_docs = scan_docs.append(scan_doc_data)

scan_docs.to_csv("/tmp/scandocs.csv", ",")

当我检查日志时:

[2021-08-26 08:10:51,876: INFO/ForkPoolWorker-1] POST https://search-52bt6xrhhsi.ap-south-1.es.amazonaws.com:443/server-logs/_search?scroll=20h&size=5000 [status:200 request:0.814s]
[2021-08-26 08:13:39,213: INFO/ForkPoolWorker-1] POST https://search-52bt6xrhhsi.ap-south-1.es.amazonaws.com:443/_search/scroll [status:200 request:0.653s]
[2021-08-26 08:17:50,795: INFO/ForkPoolWorker-1] POST https://search-52bt6xrhhsi.ap-south-1.es.amazonaws.com:443/_search/scroll [status:200 request:0.813s]
[2021-08-26 08:23:15,433: INFO/ForkPoolWorker-1] POST https://search-52bt6xrhhsi.ap-south-1.es.amazonaws.com:443/_search/scroll [status:200 request:0.554s]
[2021-08-26 08:30:25,491: INFO/ForkPoolWorker-1] POST https://search-52bt6xrhhsi.ap-south-1.es.amazonaws.com:443/_search/scroll [status:200 request:0.585s]
[2021-08-26 08:38:42,866: INFO/ForkPoolWorker-1] POST https://search-52bt6xrhhsi.ap-south-1.es.amazonaws.com:443/_search/scroll [status:200 request:0.483s]

如您所见,前 5000 个块用时 3 分钟,下一个 5000 个块用时 4 分钟,下一个 6 分钟,接下来 8 分钟,随着append 的大小增加,这个时间也会增加。

我尝试一次导出大约 50,000 的记录,大约需要 one and half hour 才能获得 CSV。在尝试导出 200,000 记录时,我在日志中运行的两个 scroll 查询之间存在 1 小时的时间间隔,并且运行时间超过 24 小时。

我是python世界的新手,对它了解不多,但我相信一定有更好的方法。

我找不到解决此问题的优化方法。对此的任何信息表示赞赏。谢谢。

【问题讨论】:

    标签: python pandas elasticsearch


    【解决方案1】:

    编辑添加计时打印语句以查看scan 调用是否缓慢,或者for 循环是否缓慢。如果是scan,那你可能就倒霉了。

    import time
    
    with open('/tmp/scandocs.csv','w') as f_out:
        scan_start_time = time.time()
        hits = scan(elastic_client, index=index, query=query, scroll='20h', clear_scroll=True, size=5000)
        scan_duration = time.time()-scan_start_time
        print(scan_duration)
    
        loop_start_time = time.time()
        for hit in hits:
            scan_source_data = hit["_source"]
            scan_id = hit["_id"]
            output_line = '{},'.format(scan_id)
            output_line += ','.join(scan_source_data)
            f_out.write(output_line+'\n')
    
    loop_duration = time.time()-loop_start_time
    print(loop_duration)
    

    编辑前:

    在 pandas 中追加速度很慢 (Improve Row Append Performance On Pandas DataFrames)。您是否需要一次将所有scan_doc_data 存储在内存中?或者您可以在检索到它后立即将其写出来吗?如果您可以立即写出,我建议直接写入打开的文件句柄并自己逐行创建 CSV:

    with open('/tmp/scandocs.csv','w') as f_out:
        for hit in scan(elastic_client, index=index, query=query, scroll='20h', clear_scroll=True, size=5000):
            scan_source_data = hit["_source"]
            scan_id = hit["_id"]
            output_line = '{},'.format(scan_id)
            output_line += ','.join(scan_source_data)
            f_out.write(output_line+'\n')
    

    我没有测试这个,所以可能有错误

    【讨论】:

    • 嘿,谢谢@mitoRibo,抱歉回复晚了。我尝试了您的代码,并且花费了相同的时间。我在此开发过程中观察到,随着块大小的增长,执行会出现延迟。当我将块大小更改为 500-1000 时,导出速度很快。用这个在一个小时内导出了 200 万条记录。为您的帮助点赞
    • 我猜scan 函数调用是缓慢的根源。您可以通过添加一些计时打印语句来测试这一点,就像我在上面的编辑中所做的那样
    猜你喜欢
    • 2018-12-01
    • 2023-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-24
    • 1970-01-01
    • 1970-01-01
    • 2020-12-31
    相关资源
    最近更新 更多