【问题标题】:Python - multiprocessing threads don't close when using QueuePython - 使用队列时多处理线程不会关闭
【发布时间】:2015-09-19 04:03:47
【问题描述】:

这适用于 Python 3.x

我正在从 CSV 文件中加载 300 条记录,然后生成工作线程以将它们提交到 REST API。我将 HTTP 响应保存在队列中,以便在处理整个 CSV 文件后计算跳过的记录数。但是,在我向工作人员添加队列后,线程似乎不再关闭。我想监控线程数有两个原因:(1)一旦完成,我可以计算并显示跳过计数和(2)我想增强我的脚本以产生不超过 20 个左右的线程,所以我不要内存不足。

我有两个问题:

  • 有人可以解释为什么使用q.put() 时线程保持活动状态吗?
  • 是否有不同的方法来管理线程数并监控是否所有线程都已完成?

这是我的代码(有些简化,因为我无法分享我正在调用的 API 的确切细节):

import requests, json, csv, time, datetime, multiprocessing

TEST_FILE = 'file.csv'

def read_test_data(path, chunksize=300):
    leads = []
    with open(path, 'rU') as data:
        reader = csv.DictReader(data)
        for index, row in enumerate(reader):
            if (index % chunksize == 0 and index > 0):
                yield leads
                del leads[:]
            leads.append(row)
        yield leads

def worker(leads, q):
    payload = {"action":"createOrUpdate","input":leads}
    r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
    q.put(r.text) # this puts the response in a queue for later analysis
    return

if __name__ == "__main__":
    q = multiprocessing.Queue() # this is a queue to put all HTTP responses in, so we count the skips
    jobs = []
    for leads in read_test_data(TEST_FILE): # This function reads a CSV file and provides 300 records at a time
        p = multiprocessing.Process(target=worker, args=(leads,q,))
        jobs.append(p)
        p.start()
    time.sleep(20) # checking if processes are closing automatically (they don't)
    print(len(multiprocessing.active_children())) ## always returns the number of threads. If I remove 'q.put' from worker, it returns 0

    # The intent is to wait until all workers are done, but it results in an infinite loop
    # when I remove 'q.put' in the worker it works fine
    #while len(multiprocessing.active_children()) > 0:  # 
    #    time.sleep(1)

    skipped_count = 0
    while not q.empty(): # calculate number of skipped records based on the HTTP responses in the queue
        http_response = json.loads(q.get())
        for i in http_response['result']:
            if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
                skipped_count += 1
    print("Number of records skipped: " + str(skipped_count))

【问题讨论】:

  • 只是想澄清一下:您在上述程序中使用了多个进程,而不是线程。两者之间的内存和消息传递开销存在很大差异,因此清楚自己使用的是哪一个非常重要。
  • 还值得注意的是,由于您的问题是 I/O 密集型的,您可能可以使用线程而不是进程,并且与同步方法相比仍然可以获得良好的性能。

标签: python multithreading python-3.x queue python-multiprocessing


【解决方案1】:

这很可能是因为multiprocessing.Queue 的这个记录在案的怪癖:

请记住,已将项目放入队列的进程将等待 在终止之前,直到所有缓冲的项目都由 “进料器”螺纹连接到底层管道。 (子进程可以调用 队列的cancel_join_thread() 方法来避免这种行为。)

这意味着每当您使用队列时,您需要确保 所有已放入队列的项目最终将被删除 在进程加入之前。否则你不能确定 将项目放入队列的进程将终止。 记住 非守护进程也会自动加入。

基本上,您需要确保get() 中的所有项目都来自Queue,以保证put 中的所有进程都能够退出Queue

我认为在这种情况下,您最好使用multiprocessing.Pool,并将您的所有工作提交给multiprocessing.Pool.map。这大大简化了事情,并让您完全控制正在运行的进程数量:

def worker(leads):
    payload = {"action":"createOrUpdate","input":leads}
    r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
    return r.text

if __name__ == "__main__":
    pool = multiprocessing.Pool(multiprocessing.cpu_count() * 2)  # cpu_count() * 2 processes running in the pool
    responses = pool.map(worker, read_test_data(TEST_FILE))

    skipped_count = 0
    for raw_response in responses:
        http_response = json.loads(raw_response)
        for i in http_response['result']:
            if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
                skipped_count += 1
    print("Number of records skipped: " + str(skipped_count))

如果您担心将read_test_data(TEST_FILE) 转换为列表(使用Pool.map 需要)的内存成本,您可以改用Pool.imap

编辑:

正如我在上面的评论中提到的,这个用例看起来像是受 I/O 限制的,这意味着您可以通过使用 multiprocessing.dummy.Pool(它使用线程池而不是进程池)看到更好的性能。尝试一下,看看哪个更快。

【讨论】:

  • 效果很好,谢谢。我确实更改了这一行:responses = [pool.apply(worker, args=(leads,)) for Leads in read_test_data(TEST_FILE)] 这样,它会加载整个 CSV 文件。旧代码多次加载同一批次的 300 条记录,我加载了 200,000 条记录,并且内存使用情况很好。实际上,该进程是 I/O 绑定的,因为我调用的 API 可能需要一分钟才能响应。
  • responses = [pool.apply(worker, args=(leads,)) for leads in read_test_data(TEST_FILE)]
猜你喜欢
  • 2018-06-27
  • 1970-01-01
  • 2012-11-12
  • 1970-01-01
  • 1970-01-01
  • 2023-03-14
  • 1970-01-01
  • 1970-01-01
  • 2016-05-02
相关资源
最近更新 更多