【问题标题】:Why does my paralelization approach not scale?为什么我的并行化方法无法扩展?
【发布时间】:2019-09-02 18:27:04
【问题描述】:
  • 我必须遍历一个大型集合 (> 50GB)。
  • 我正在使用带有队列的游标和多处理池作为通信工具。
    • 速度很慢(大约 1500 个文档/秒)。

我能否以某种方式加快处理速度以换取更多内存使用量?

def dowork(args):

    uid = int(args.get('uid'))
    if map_userid_visits.get(uid):
        map_userid_visits[uid] += 1
    else:
        map_userid_visits[uid] = 1

def main():

    manager = Manager()
    map_userid_visits = manager.dict()
    start_time = time.time()
    print ('Start Time', start_time)
    cur = cursor.Cursor(mycollection)
    pool = multiprocessing.Pool(CONFIG_POOL_SIZE)
    iteration = 0
    for user_event in cur:
        pool.apply(dowork, (user_event, ))
    pool.close()
    pool.join()
    print map_userid_visits

【问题讨论】:

    标签: python python-multiprocessing


    【解决方案1】:

    您的方法无法扩展。主要问题是您一次只发送一行,以便您可以执行非常轻量级的操作。这意味着序列化开销比工作本身要大得多。

    同样来自multiprocessing documentation

    apply(func[, args[, kwds]])¶

    使用参数 args 和关键字参数 kwds 调用 func。 它会阻塞直到结果准备好。鉴于此块,apply_async() 更适合并行执行工作。此外,func 仅在池的其中一个工作人员中执行。

    【讨论】:

    • 感谢@Ente,它让我的处理速度翻了一番。
    猜你喜欢
    • 2021-12-08
    • 1970-01-01
    • 2011-12-09
    • 1970-01-01
    • 2014-01-17
    • 2010-09-29
    • 1970-01-01
    • 2012-01-10
    相关资源
    最近更新 更多