【问题标题】:Threading in python using queuepython中的线程使用队列
【发布时间】:2012-11-20 20:11:05
【问题描述】:

我想在 python 中使用线程来下载大量网页,并通过以下代码在其中一个网站中使用队列。

它放了一个无限的while循环。每个线程是否连续运行而没有结束,直到所有线程都完成?我是不是错过了什么。

#!/usr/bin/env python
import Queue
import threading
import urllib2
import time

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]

queue = Queue.Queue()

class ThreadUrl(threading.Thread):
  """Threaded Url Grab"""
  def __init__(self, queue):
    threading.Thread.__init__(self)
    self.queue = queue

  def run(self):
    while True:
      #grabs host from queue
      host = self.queue.get()

      #grabs urls of hosts and prints first 1024 bytes of page
      url = urllib2.urlopen(host)
      print url.read(1024)

      #signals to queue job is done
      self.queue.task_done()

start = time.time()
def main():

  #spawn a pool of threads, and pass them queue instance 
  for i in range(5):
    t = ThreadUrl(queue)
    t.setDaemon(True)
    t.start()

  #populate queue with data   
  for host in hosts:
    queue.put(host)

  #wait on the queue until everything has been processed     
  queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)

【问题讨论】:

  • 请检查您的缩进。我试图纠正它,但queue.join 的方式放错了位置,它也可能在顶层。此外,将主机添加到队列的循环位于创建线程的循环内,因此您将每个主机添加五次。
  • 通过缩进校正,脚本对我来说也很好。
  • 这段代码看起来像是从here复制了一些修改

标签: python multithreading


【解决方案1】:

将线程设置为daemon 线程会导致它们在主线程完成时退出。但是,是的,您是正确的,只要queue 中有内容,您的线程就会连续运行,否则它将阻塞。

文档解释了这个细节Queue docs

python 线程文档也解释了daemon 部分。

当没有活着的非守护线程时,整个 Python 程序退出。

因此,当队列被清空并且queue.join 在解释器退出时恢复时,线程将终止。

编辑:更正Queue 的默认行为

【讨论】:

  • get 的默认行为是在队列为空时阻塞,而不是引发Empty 异常。
  • 如果我们让线程休眠 100 毫秒或这样的时间间隔,如果它在队列中找不到任何项目,是否应该纠正行为。我计划跨越 50 个线程以下载 5000 多个页面。我不认为我可以让所有 50 个线程争夺 cpu 资源。
  • 不,我的意思是他们将一直处于无限循环中。您不必在循环中添加睡眠以释放 CPU 时间。在queue.get 和 http 请求期间会有一个块。该块将用作下一个线程发生上下文切换的点。五十个线程不应该成为您的问题,因为您最终会受到下载速度和您将执行的任何磁盘 I/O 的限制。
  • 我担心所有 49 个线程都完成工作并等待第 50 个线程完成最后一个 http 请求的情况。到时候不是所有49个线程都占用cpu,不让其他事情发生吗?
  • 不,queue.get 的默认操作是阻止 @WarrenWeckesser 已更正。因此,其他 49 个线程将阻塞,因此不会占用 CPU。
【解决方案2】:

您的脚本对我来说很好,所以我假设您是在询问发生了什么,以便您更好地理解它。是的,您的子类将每个线程置于无限循环中,等待将某些内容放入队列中。当发现某物时,它会抓住它并做它的事情。然后,关键部分,它通过 queue.task_done 通知队列它已完成,并继续等待队列中的另一个项目。

虽然所有这些都在工作线程中进行,但主线程正在等待(加入)直到队列中的所有任务都完成,这将是线程发送 queue.task_done 标志相同次数的时间作为队列中的消息。此时主线程完成并退出。由于这些是恶魔线程,它们也会关闭。

这是很酷的东西,线程和队列。它是 Python 真正优秀的部分之一。你会听到各种关于 Python 中的线程如何被 GIL 等搞砸的东西。但是如果你知道在哪里使用它们(比如在这种情况下使用网络 I/O),它们真的会为你加快速度。一般规则是,如果您受 I/O 限制,请尝试和测试线程;如果您受 CPU 限制,线程可能不是一个好主意,不妨尝试使用进程。

祝你好运,

迈克

【讨论】:

    【解决方案3】:

    我认为Queue 在这种情况下是不必要的。仅使用Thread:

    import threading, urllib2, time
    
    hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
    "http://ibm.com", "http://apple.com"]
    
    class ThreadUrl(threading.Thread):
        """Threaded Url Grab"""
        def __init__(self, host):
            threading.Thread.__init__(self)
            self.host = host
    
        def run(self):
            #grabs urls of hosts and prints first 1024 bytes of page
            url = urllib2.urlopen(self.host)
            print url.read(1024)
    
    start = time.time()
    def main():
        #spawn a pool of threads
        for i in range(len(hosts)):
            t = ThreadUrl(hosts[i])
            t.start()
    
    main()
    print "Elapsed Time: %s" % (time.time() - start)
    

    【讨论】:

    • 如果未来版本要加载 10,000 个 URL,这不是一个很好的计划。
    • 是的,我不会创建 10000 个线程。相反,只会创建几个具有多个 URL 获取的线程。
    • 不仅如此,如果不同的线程正在写入标准输出,输出可能会以不希望的方式交错。将结果排队以供单个输出线程处理总是更简单。相关讨论见this qeustion
    猜你喜欢
    • 1970-01-01
    • 2013-10-10
    • 1970-01-01
    • 1970-01-01
    • 2015-07-30
    • 2014-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多