【问题标题】:Why iterating over ndb Datastore query consumes too much memory?为什么迭代 ndb Datastore 查询会消耗太多内存?
【发布时间】:2020-01-14 18:58:38
【问题描述】:

我有一个类似的查询:

query = HistoryLogs.query()
query = query.filter(HistoryLogs.exec_id == exec_id)
iter = query.iter()

for ent in iter:
    # write log to file, nothing memory intensive

我在 for 循环中添加了日志,读取 10K 行会增加 200MB 的内存使用量,然后读取接下来的 10K 行会增加额外的 200MB,依此类推。读取 100K 需要 2GB,超过了 highmem 内存限制。

我尝试在 for 循环中清除内存缓存,在读取 10K 行后,添加:

                # clear ndb cache in order to reduce memory footprint
                context = ndb.get_context()
                context.clear_cache()

在 for 循环中,在每 10K 次迭代中,但导致查询超时,引发错误 BadRequestError: The requested query has expired. Please restart it with the last cursor to read more results. ndb

我最初的期望是,通过使用query.iter() 而不是query.fetch(),我不会遇到任何内存问题并且内存几乎是恒定的,但事实并非如此。有没有办法在不超过时间和内存限制的情况下使用迭代器读取数据?通过清除上下文缓存,我发现内存消耗几乎是恒定的,但我遇到了检索所有行所需的时间。

顺便说一句,有很多行要检索,最多 150K。是否可以通过一些简单的调整来完成这项工作,或者我需要一个更复杂的解决方案,例如一个会使用一些并行化的?

【问题讨论】:

    标签: python python-2.7 performance google-cloud-datastore app-engine-ndb


    【解决方案1】:

    您是否在远程 api-shell 中运行它?否则我会想象应用引擎的最大请求超时将开始成为问题。

    您绝对应该在 google 数据流中运行它。它将为您并行化/运行得更快。

    https://beam.apache.org/documentation/programming-guide/ https://beam.apache.org/releases/pydoc/2.17.0/index.html https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py

    我想你的管道代码看起来像这样:

    def run(project, gcs_output_prefix, exec_id):
    
        def format_result(element):
            csv_cells = [
                datastore_helper.get_value(element.properties['exec_id']),
                # extract other properties here!!!!!
            ]
            return ','.join([str(cell) for cell in csv_cells])
    
        pipeline_options = PipelineOptions([])
        pipeline_options.view_as(SetupOptions).save_main_session = True
    
        p = beam.Pipeline(options=pipeline_options)
    
        query = query_pb2.Query()
        query.kind.add().name = 'HistoryLogs'
    
        datastore_helper.set_property_filter(query.filter, 'exec_id', PropertyFilter.EQUAL, exec_id)
    
        _ = (p 
             | 'read' >> ReadFromDatastore(project, query, None)
             | 'format' >> beam.Map(format_result)
             | 'write' >> beam.io.WriteToText(file_path_prefix=gcs_output_prefix,
                                              file_name_suffix='.csv',
                                              num_shards=1) # limits output to a single file
        result = p.run()
        result.wait_until_finish()
    
    if __name__ == '__main__':
        logging.getLogger().setLevel(logging.INFO)
        run(project='YOUR-PROJECT-ID', 
            gcs_output_prefix='gs://YOUR-PROJECT-ID.appspot.com/history-logs-export/some-time-based-prefix/',
            exec_id='1234567890')
    

    此代码从 Google Datastore 读取并以 csv 格式导出到 Google Cloud Storage。

    【讨论】:

      【解决方案2】:

      在对从数据存储查询获得的大量实体进行操作时,防止同时达到内存和请求处理时间限制的典型解决方案是使用 cursors 将工作负载拆分为多个可管理的块,并将它们分散到多个请求中(例如,使用推送队列任务),最终及时错开,以防止实例爆炸和访问输出媒体(如果有)的争用。

      通过这种方式,您可以处理几乎无限的工作负载,即使出于某种原因您不能/不会使用 Alex 建议的出色数据流解决方案。

      您可以在How to delete all the entries from google datastore? 中找到该技术的示例

      但请注意游标限制,请参阅Are GAE Datastore cursors permanent and durable?

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-09-30
        • 2018-10-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2010-10-06
        • 1970-01-01
        • 2020-04-16
        相关资源
        最近更新 更多