【发布时间】:2020-09-01 23:00:28
【问题描述】:
我有一些小的 jsonl 文件,这些文件在循环中读取并在 elasticsearch 上摄取。 python 进程似乎不断增加内存使用量。下面的代码在一个类中运行
def load_files_to_es(self, files_to_load):
count = 0
for file in files_to_load:
if file.endswith(".jsonl"):
with open(os.path.join("../pdl_out", file)) as clean_data_file:
try:
clean_data = json.load(clean_data_file)
count += 1
except Exception as e:
logging.error(f"{e} error processing {file}")
else:
logging.info("loading data to ES")
les.load_pdl_to_es(clean_data=clean_data, filename=file)
finally:
print(f"Prev File: {file}")
if count % 10 == 0:
gc.collect()
在elasticsearch上上传的代码:
es = Elasticsearch(['localhost:9200'], http_auth=None, scheme="http", port=9200)
def insert_data(data_to_insert):
data = pd.read_json(data_to_insert, orient='records', lines=True)
for index, row in data.iterrows():
data_as_json = row.to_json()
yield {
"_index": indexname,
"_id": row['id'],
"_source": data_as_json
}
def load_pdl_to_es(clean_data=None, filename=''):
try:
for success, info in parallel_bulk(es, insert_data(data_to_insert=clean_data), request_timeout=30,
queue_size=8, thread_count=8):
if not success:
logging.debug(info)
logging.error(f"Insert records to elastic search failed for ${filename}")
except ConnectionError:
logging.error("Connection error")
except TimeoutError:
logging.error("Connection Timed out")
except Exception as e:
logging.error(e)
【问题讨论】:
标签: python elasticsearch memory-leaks