【问题标题】:Using a shared queue that workers can add tasks to使用工作人员可以添加任务的共享队列
【发布时间】:2015-08-30 11:34:04
【问题描述】:

我对 python 很陌生(我主要用 Java 编写代码)。我有一个 python 脚本,它本质上是一个爬虫。它调用 phantomjs,它加载页面,返回其源代码,以及它在页面中找到的 url 列表。

我一直在尝试使用 Python 3 的 multiprocessing 模块来执行此操作,但我不知道如何使用工作人员也可以添加到的共享队列。我不断得到不可预测的结果。

我之前的方法使用了一个全局 URL 列表,我从中提取了一个块并使用 map_async 发送给工作人员。最后,我会收集所有返回的 URL 并将它们附加到全局列表中。问题是每个“块”花费的时间与最慢的工人一样长。我正在尝试对其进行修改,以便每当工作人员完成时,它都可以获取下一个 URL。但是,我认为我做的不对。到目前为止,这是我所拥有的:

def worker(url, urls):
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
    returned_urls = phantomjs(url)
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")

    for returned_url in returned_urls:
        urls.put(returned_url, block=True)

    print("There are " + str(urls.qsize()) + " URLs in total.\n")

if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    urls = manager.Queue()
    urls.append(<some-url>)

    pool = Pool()
    while True:
        url = urls.get(block=True)
        pool.apply_async(worker, (url, urls))

    pool.close()
    pool.join()

如果有更好的方法,请告诉我。我正在抓取一个已知站点,最终终止条件是没有要处理的 URL。但现在看来,我将永远继续奔跑。我不确定我是否会使用queue.empty(),因为它确实说它不可靠。

【问题讨论】:

  • 查看相关:stackoverflow.com/questions/17241663/…我认为您的设计模式不太正确)我相信您希望 N 个工作人员协作访问共享队列。
  • @JamesMills 这个例子很有意义!加入worker_main内的队列可以吗?
  • 另外,我试过了,它看起来几乎立即退出,即使使用time.sleep(10)。 phantomjs 调用需要一些时间才能返回,但脚本会在此之前退出。
  • 好吧,我的意思是;我不知道如何回答你的问题了:) 哈哈!
  • 哈哈,不用担心。谢谢 :) 我会尝试更多并更新我的问题。

标签: python concurrency multiprocessing python-multiprocessing


【解决方案1】:

这是我可能会做的:

def worker(url, urls):
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
    returned_urls = phantomjs(url)
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")

      for returned_url in returned_urls:
          urls.put(returned_url, block=True)

      # signal finished processing this url
      urls.put('no-url')

    print("There are " + str(urls.qsize()) + " URLs in total.\n")

if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    pool = Pool()
    urls = manager.Queue()

    # start first url before entering loop
    counter = 1
    pool.apply_async(worker, (<some-url>, urls))

    while counter > 0:
        url = urls.get(block=True)
        if url == 'no-url':
            # a url has finished processing
            counter -= 1
        else:
            # a new url needs to be processed
            counter += 1
            pool.apply_async(worker, (url, urls))

    pool.close()
    pool.join()

每当一个 url 从队列中弹出时,递增计数器。将其视为“当前正在处理的 url”计数器。当“无 url”从队列中弹出时,“当前正在处理的 url”已经完成,因此减少计数器。只要 counter 大于 0,就有 url 没有处理完,还返回 'no-url'。

编辑

正如我在评论中所说的(在这里为其他阅读它的人),当使用multiprocessing.Pool 时,不要将其视为单独的进程,最好将其视为执行您的功能的单个构造每次获取数据时(尽可能同时)。这对于数据驱动的问题最有用,因为您不跟踪或关心单个工作进程只处理正在处理的数据。

【讨论】:

  • 啊,我明白你在做什么了。所以我想我可以根据我的情况调整它。对我来说,我不是想知道 URL 何时完成处理,而是我想知道工作人员何时不返回 any URL。因此,终止条件是当 所有 个工作人员完成他们的任务并且 没有 个工作人员返回 URL。
  • 我首先想到了这一点,但这更具挑战性,因为有一个工人池,每个工人 process 一遍又一遍地执行worker,在执行之间闲置。因此,您可能会遇到这样一种情况,即每个工作人员 process 都完成了当前对 worker 的执行并且没有获得任何 url,但是队列中仍然有 url 备份等待传递给下一个空闲的工作人员。 确定没有更多工作要做的唯一方法是知道所有收到的 url 都已完成处理。
  • 将池不视为单个工作人员,而是将其视为反复运行功能的单个实体会有所帮助。池往往用于更多由数据驱动的问题,您不需要跟踪或关心工作进程的数量。
  • 没错,在我的特殊情况下,我确实需要知道当整个池正在运行时,worker 的每个实例返回什么。这就是为什么我认为我可以尝试修改工人本身的价值。我在家里运行它,我下班回来时会看看它,看看它是怎么做的。我喜欢你使用哨兵值的想法,所以我会做同样的事情。
【解决方案2】:

这就是我解决问题的方法。我最初使用this answer 中发布的设计,但bj0 提到它滥用了初始化函数。所以我决定使用apply_async 来做这件事,其方式类似于我在问题中发布的代码。

由于我的工作人员修改了他们正在从中读取 URL 的队列(他们添加到其中),我认为我可以像这样简单地运行我的循环:

while not urls.empty():
   pool.apply_async(worker, (urls.get(), urls))

我预计这会起作用,因为工作人员将添加到队列中,如果所有工作人员都忙,apply_async 将等待。 这没有像我预期的那样工作,循环提前终止。问题在于,如果所有工作人员都忙,apply_async 不会阻塞 并不清楚。相反,它会将提交的任务排队,这意味着urls 最终将变为空并且循环将终止。循环阻塞的唯一情况是当您尝试执行urls.get() 时队列为空。此时,它将等待队列中有更多可用的项目。但我仍然需要找出终止循环的方法。条件是当没有工人返回新的 URL 时,循环应该终止。为此,我使用共享字典,如果进程没有返回任何 URL,则将与进程名称关联的值设置为 0,否则设置为 1。我在循环的每次迭代中检查键的总和,如果它是 0,我就知道我已经完成了。

基本结构最终是这样的:

def worker(url, url_queue, proc_user_urls_queue, proc_empty_urls_queue):

    returned_urls = phantomjs(url) # calls phantomjs and waits for output
    if len(returned_urls) > 0:
        proc_empty_urls_queue.put(
            [multiprocessing.current_process().name, 1]
        )
    else:
        proc_empty_urls_queue.put(
            [multiprocessing.current_process().name, 0]
        )

    for returned_url in returned_urls:
        url_queue.put(returned_url)

def empty_url_tallier(proc_empty_urls_queue, proc_empty_urls_dict):
    while 1:
        # This may not be necessary. I don't know if this worker is run
        # by the same process every time. If not, it is possible that
        # the worker was assigned the task of fetching URLs, and returned
        # some. So let's make sure that we set its entry to zero anyway.
        # If this worker is run by the same process every time, then this
        # stuff is not necessary.
        id = multiprocessing.current_process().name
        proc_empty_urls_dict[id] = 0

        proc_empty_urls = proc_empty_urls_queue.get()
        if proc_empty_urls == "done": # poison pill
            break

        proc_id = proc_empty_urls[0]
        proc_empty_url = proc_empty_urls[1]
        proc_empty_urls_dict[proc_id] = proc_empty_url

manager = Manager()

urls = manager.Queue()
proc_empty_urls_queue = manager.Queue()
proc_empty_urls_dict = manager.dict()

pool = Pool(33)

pool.apply_async(writer, (proc_user_urls_queue,))
pool.apply_async(empty_url_tallier, (proc_empty_urls_queue, proc_empty_urls_dict))

# Run the first apply synchronously 
urls.put("<some-url>")
pool.apply(worker, (urls.get(), urls, proc_empty_urls_queue))
while sum(proc_empty_urls_dict.values()) > 0:
    pool.apply_async(worker, (urls.get(), urls, proc_empty_urls_queue))

proc_empty_urls_queue.put("done") # poison pill
pool.close()
pool.join()

【讨论】:

  • 这是一种有趣的方法,但我相信它容易受到竞争条件的影响。如果一个无 url 工作人员结束,而另一个工作(以前没有 url)正在处理 phantomjs,则 while 循环可能会在该工作人员完成之前结束(具有潜在的新 url)。另外,您使用的是共享字典,为什么需要使用第二个队列?您应该能够将 dict 传递给工作人员并让他们直接使用它。
  • 消除竞争条件(并摆脱字典)的最简单方法是计算 url。每次 urls.get() 给你一个 url,增加一个计数器。每次你得到一个“no-url”的标记值时,减少它。当计数器达到 0 时,您应该没有 url。这当然假设没有循环引用,这会导致问题......
  • @bj0 好点。我认为会有某种竞争条件。我使用共享 dict 是因为,老实说,我对如何在 python 中完成多处理还有些模糊,我不确定是否可以直接在 worker 上修改这样的 dict 属性。
  • @bj0 我喜欢计数器方法——我最初是沿着这条路线走的,但我不确定原子添加等。我可以使用来自ManagerValue 吗?我的很多困惑源于不完全熟悉 Python 中的多处理是如何工作的,而且一般来说只是 Python。我大部分时间都在 Java 中工作!感谢您提供帮助的 cmets。我意识到我的代码按照 Python 标准可能不是很干净,而且我可能违反了许多最佳实践。 :)
  • 来自管理器的对象(如 dict)是为了让工作人员可以直接访问它们(它们在内部处理进程之间的代理/锁定)。如果你在主进程中处理计数器(在while循环中,检查urls.get()的返回值然后调整计数器),你可以只使用一个普通的整数而不用担心并发访问。如果这没有意义,我可以抛出一个答案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-10-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-24
  • 2022-10-18
  • 1970-01-01
相关资源
最近更新 更多