【问题标题】:Memory Leak while using elasticsearch parallel_bulk in python在 python 中使用 elasticsearch parallel_bulk 时出现内存泄漏
【发布时间】:2020-09-01 23:00:28
【问题描述】:

我有一些小的 jsonl 文件,这些文件在循环中读取并在 elasticsearch 上摄取。 python 进程似乎不断增加内存使用量。下面的代码在一个类中运行

    def load_files_to_es(self, files_to_load):
        count = 0
        for file in files_to_load:
            if file.endswith(".jsonl"):
                with open(os.path.join("../pdl_out", file)) as clean_data_file:
                    try:
                        clean_data = json.load(clean_data_file)
                        count += 1
                    except Exception as e:
                        logging.error(f"{e} error processing {file}")
                    else:
                        logging.info("loading data to ES")
                        les.load_pdl_to_es(clean_data=clean_data, filename=file)
                    finally:
                        print(f"Prev File: {file}")
                        if count % 10 == 0:
                            gc.collect()

在elasticsearch上上传的代码:

es = Elasticsearch(['localhost:9200'], http_auth=None, scheme="http", port=9200)
def insert_data(data_to_insert):
    data = pd.read_json(data_to_insert, orient='records', lines=True)
    for index, row in data.iterrows():
        data_as_json = row.to_json()
        yield {
            "_index": indexname,
            "_id": row['id'],
            "_source": data_as_json
        }


def load_pdl_to_es(clean_data=None, filename=''):
    try:
        for success, info in parallel_bulk(es, insert_data(data_to_insert=clean_data), request_timeout=30,
                                           queue_size=8, thread_count=8):
            if not success:
                logging.debug(info)
                logging.error(f"Insert records to elastic search failed for ${filename}")
    except ConnectionError:
        logging.error("Connection error")
    except TimeoutError:
        logging.error("Connection Timed out")
    except Exception as e:
        logging.error(e)

【问题讨论】:

    标签: python elasticsearch memory-leaks


    【解决方案1】:

    找出错误。 Pandas 数据帧会导致内存泄漏。该对象即使在不再被引用后仍保留在内存中。 https://github.com/pandas-dev/pandas/issues/2659

    解决方案: 完全移除pandas,直接使用json对象。

    其他选项是使用 gc.collect() 手动调用垃圾收集器以删除旧的未引用对象(特别是 pd dfs)

    【讨论】:

      猜你喜欢
      • 2013-11-09
      • 2013-11-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-02-27
      • 2011-04-24
      • 2016-06-11
      • 1970-01-01
      相关资源
      最近更新 更多