【发布时间】:2013-12-18 17:33:06
【问题描述】:
我的普通脚本在 20 秒内处理了大约 30,000 条记录。考虑到我必须处理的数据量(超过 5000 万条记录),我认为使用 python 的多处理是明智的。
在我的流程结束时,我使用 sqlalchemy 核心进行了数据库更新,其中我以 50,000 的批次更新处理过的记录。 SQLAlchemy Core requires that you pass it a list for it to do a bulk update or even insert。我会将此列表称为py_list
对于 Python 的多处理,我通过 multiprocessing.manager.list() 捕获进程的结果,我将其称为 mp_list。
在我将 mp_list 传递给 SQLAlchemy 批量更新语句之前,一切正常。这将失败并出现错误 AttributeError: 'list' object has no attribute 'keys'。谷歌搜索将我带到question on SO,它指出 multiprocessing.manager.list() 甚至 multiprocessing.manager.dict() 不是真正的 python 列表/字典。
那么问题是,如何将 multiprocessing.manager.list 转换为真正的 python 列表。
mp_list 填充如下:
import multiprocessing
manager = multiprocessing.Manager()
mp_list = manager.list()
def populate_mp_list(pid, is_processed):
'''Mark the record as having been processed'''
dict = {}
dict['b_id'] = pid
dict['is_processed'] = is_processed
mp_list.append(dict)
抛出错误的SQLALchemy代码如下:
CONN = Engine.connect()
trans = CONN.begin()
stmt = mytable.update().where(mytable.c.id == bindparam('b_id')).\
values(is_processed=bindparam('is_processed'))
CONN.execute(stmt, mp_list)
trans.commit(
我尝试将 mp_list 转换为真正的 python 列表。新列表创建了作品,但其创建的时间损失抵消了多处理中节省的所有时间。
如果我循环返回的mp_list 并创建一个新列表。
y = []
for x in mp_list:
y.append(x)
另外,如果我对mp_list 进行“复制”,每个复制都会增加 3 秒!平均罚款,这并不酷。
y = mp_list[0:len(mp_list)]
那么,将 multiprocessing.manager.list 转换为 SQLAlchemy Core 可用的列表的最快方法是什么?
【问题讨论】:
-
您的
mp_list中有什么内容?错误消息表明它包含lists,而它需要包含dicts(或者,可能是multiprocessing.manager.dicts)。纯 Pythonlist仍然没有属性keys -
@jonrsharpe 我已经编辑了问题,包括如何生成
mp_list以及抛出错误的 sqlalchemy 代码。
标签: python python-3.x sqlalchemy multiprocessing