【问题标题】:queue and thread, while loop in thread队列和线程,线程中的while循环
【发布时间】:2016-04-11 20:01:33
【问题描述】:

我正在用 Python 3 编写一些东西来从站点获取代理并检查代理是否有效。 我使用队列和线程模块来加快检查过程。 但是,结果很奇怪。

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()  

这是来自队列文档的示例。我的代码就是基于这个例子。

所以,我的问题是: worker() 中的 while 循环何时结束?

当队列中的item数量超过200时,q保持block code,队列中的1个item无法处理,1个线程一直在做q.get(),而其他线程说q是空的。

请帮帮我。谢谢。 对我糟糕的英语感到抱歉。我还在努力。

----更新 ------------------------------ --------------------------

我尝试了 ThreadPoolExecutor,它像线程和队列一样工作。但是阻塞的情况并没有改变。

在玩了 20 分钟的游戏后,代码的一次试运行结束并打印了预期的输出。

我发现检查过程在 2 或 3 分钟后结束(对于 100 个代理),并且代码在结束前一直阻塞了大约 10 分钟。

还有第二个问题: 是什么原因造成的?

谢谢! :)

----更新------------------------------- ----------------------------------------

问题解决了!!

我以为是线程的东西导致了阻塞,结果发现是连接和传输时间造成的。

由于我使用pycurl进行代理检查,而pycurl的默认TIMEOUT是300。

我只将 CONNECTTIMEOUT 设置为 5 并忽略了限制整个传输时间的 TIMEOUT。

这是我用于代理检查的新代码:

c = pycurl.Curl()

c.setopt(c.URL, url)
c.setopt(c.HTTPHEADER, headers)
c.setopt(c.PROXY, proxy)
c.setopt(c.WRITEFUNCTION, lambda x: None)
c.setopt(c.CONNECTTIMEOUT, 5)
*c.setopt(c.TIMEOUT, 5)*

c.perform()
c.close()

但是,将 TIMEOUT 设置为 5 会显着减少有效代理的数量。我会继续努力争取最好的 TIMEOUT 值。

【问题讨论】:

  • 诡计循环不会结束,因为True 始终为真。使用例如。 while not q.empty()
  • @Finwood 感谢您的回复。由于while循环不会结束,那么我想线程也不会结束。那正确吗?或者当 q 为空时线程结束?还是继续做while循环?
  • 由于你的线程被配置为daemon,它们在主程序结束时结束。最后的q.join() 等待队列变空,因此您的所有线程也会在那个时候结束。不过,您可能想看看ThreadPoolExecutor
  • 线程在任何元素写入队列之前启动。如果您在队列为空时终止,它们可能会在您有机会填满队列之前立即终止。也就是说,请提供一个完整但最小的示例,以便每个人都可以重现您的问题。
  • 请避免同时提出两个问题。对于第二个问题,创建一个新问题并提供一个示例,正如 Ulrich 所说。另见mcve

标签: python multithreading python-3.x pycurl


【解决方案1】:

没有这样的while True 循环将永远不会结束,您的线程将永远不会退出。您必须明确告诉您的线程何时退出。

这样做的一种方法是使用哨兵,如下所示:

end_of_queue = object()

def worker():
    while True:
        item = q.get()
        if item is end_of_queue:
            q.task_done()
            break
        do_work(item)
        q.task_done()

q = Queue()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

for i in range(num_worker_threads):
    q.put(end_of_queue)

q.join()

我在这里所做的是将一些end_of_queue 元素添加到您的队列中,每个线程一个。当一个线程看到这个end_of_queue对象时,意味着它必须退出并且可以跳出循环。

如果您喜欢不同的方法,可以考虑使用Event object 在线程必须退出时通知它们,如下所示:

quit_event = Event()

def worker():
    while not q.empty() or not quit_event.is_set():
        try:
            item = q.get(timeout=.1)
        except Empty:
            continue
        do_work(item)
        q.task_done()

q = Queue()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

quit_event.set()
q.join()

这个解决方案的缺点是你必须get() 超时。

最后但同样重要的是,您的代码似乎可以从使用 thread pool 中受益,如下所示:

with ThreadPoolExecutor(max_workers=num_worker_threads) as executor:
    executor.map(do_work, source())

(作为参考,ThreadPoolExecutor uses the end_of_queue approach,唯一的两个区别是end_of_queueNone,每个线程负责通知其他线程。)

【讨论】:

  • 感谢您的回答。我尝试了线程池,它工作得很好,有一段时间了。请看更新。
【解决方案2】:

只是另一个使用类中的线程、队列和循环的示例

import threading
import Queue

q = Queue.Queue()

class listener(object):
    def __init__(self):
        thread = threading.Thread(target=self.loop)
        # thread.daemon = True
        thread.start()

    def loop(self):
        for i in xrange(0,13):
            q.put(i)

class ui(object):
    def __init__(self):
        listener()
        while True:
            item = q.get()
            print item
            if item == 10:
                break
ui()

【讨论】:

    猜你喜欢
    • 2015-06-14
    • 1970-01-01
    • 2014-05-27
    • 1970-01-01
    • 2013-08-05
    • 2010-09-28
    • 2013-01-12
    • 2012-10-28
    • 1970-01-01
    相关资源
    最近更新 更多