【问题标题】:Using the Queue class in Python 2.6在 Python 2.6 中使用 Queue 类
【发布时间】:2010-04-16 01:38:01
【问题描述】:

假设我被困在使用 Python 2.6 并且无法升级(即使这会有所帮助)。我编写了一个使用 Queue 类的程序。我的制作人是一个简单的目录列表。我的消费者线程从队列中拉出一个文件,并用它做一些事情。如果文件已经被处理,我跳过它。处理的列表是在所有线程启动之前生成的,所以它不是空的。

这是一些伪代码。

import Queue, sys, threading

processed = []

def consumer():
    while True:
        file = dirlist.get(block=True)
        if file in processed:
            print "Ignoring %s" % file
        else:
            # do stuff here
        dirlist.task_done()

dirlist = Queue.Queue()

for f in os.listdir("/some/dir"):
    dirlist.put(f)

max_threads = 8

for i in range(max_threads):
    thr = Thread(target=consumer)
    thr.start()

dirlist.join()

我得到的奇怪行为是,如果一个线程遇到一个已经被处理的文件,该线程就会停止并等待整个程序结束。我做了一些测试,前 7 个线程(假设 8 个是最大值)停止,而第 8 个线程继续处理,一次一个文件。但是,这样一来,我就失去了对应用程序进行线程化的全部理由。

我做错了什么,或者这是 Python 2.6 中队列/线程类的预期行为?

【问题讨论】:

  • 肯定有什么问题——为什么会预期依赖于与队列完全无关的测试?!但是我不认为它在这个代码中,尽管它有缺陷(线程不是守护进程,滥用内置名称file,...)——我认为这些不会使线程停止!相反,如何处理填充,并且它是否在“...”部分进行了更改(这可能是一个问题,因为它周围没有锁定)?你能用微不足道的处理人口(例如把一半的文件放在那里)和微不足道的“...”来重现这个问题吗?例如print file

标签: python multithreading queue


【解决方案1】:

我尝试运行您的代码,但没有看到您描述的行为。但是,程序永远不会退出。我建议更改.get() 调用如下:

    try:
        file = dirlist.get(True, 1)
    except Queue.Empty:
        return

如果想知道当前执行的是哪个线程,可以导入thread模块,打印thread.get_ident()

我在.get()之后添加了以下行:

    print file, thread.get_ident()

得到以下输出:

bin 7116328
cygdrive 7116328
 cygwin.bat 7149424
cygwin.ico 7116328
 dev etc7598568
7149424
 fix 7331000
 home 7116328lib
 7598568sbin
 7149424Thumbs.db
 7331000
tmp 7107008
 usr 7116328
var 7598568proc
 7441800

输出很混乱,因为线程同时写入标准输出。线程标识符的多样性进一步证实了所有线程都在运行。

也许实际代码或您的测试方法有问题,但您发布的代码没有问题?

【讨论】:

  • 啊,这就解释了为什么我不得不继续杀死程序。感谢您的提示。
  • @voipme 谢谢很好。向上投票更好。 :-)
【解决方案2】:

由于此问题仅在查找已处理的文件时才会出现,因此这似乎与processed 列表本身有关。你试过实现一个简单的锁吗?例如:

processed = []
processed_lock = threading.Lock()

def consumer():
    while True:
        with processed_lock.acquire():
            fileInList = file in processed
        if fileInList:
            # ... et cetera

线程往往会导致最奇怪的错误,即使它们看起来“不应该”发生。对共享变量使用锁是确保您最终不会遇到某种可能导致线程死锁的竞争条件的第一步。


当然,如果您在# do stuff here 下执行的操作是 CPU 密集型的,那么由于全局解释器锁,Python 一次只能从一个线程运行代码。在这种情况下,您可能需要切换到 multiprocessing 模块 - 它与 threading 非常相似,但您需要将共享变量替换为其他解决方案(有关详细信息,请参阅 here)。

【讨论】:

  • 我没想过在已处理列表上加锁。现在想来,这绝对是有道理的。感谢您推荐使用多处理模块。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2010-12-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多