【问题标题】:Python threading with queue: how to avoid to use join?带有队列的 Python 线程:如何避免使用连接?
【发布时间】:2015-02-09 16:43:52
【问题描述】:

我有一个有 2 个线程的场景:

  1. 线程等待来自套接字的消息(嵌入在 C 库中 - 阻塞调用是“Barra.ricevi”)然后将元素放入队列中

  2. 等待从队列中获取元素并做某事的线程

示例代码

import Barra
import Queue    
import threading

posQu = Queue.Queue(maxsize=0)

def threadCAN():
    while True:
        canMsg = Barra.ricevi("can0")
        if canMsg[0] == 'ERR':
            print (canMsg)
        else:
            print ("Enqueued message"), canMsg
            posQu.put(canMsg)

thCan = threading.Thread(target = threadCAN)
thCan.daemon = True
thCan.start()

while True:
    posMsg = posQu.get()
    print ("Messagge from the queue"), posMsg

结果是每次来自套接字的新消息都会向队列中添加一个新元素,但是应该从队列中获取项目的主线程永远不会被唤醒。

输出如下:

排队的消息

排队的消息

排队的消息

排队的消息

我希望有:

排队的消息

队列中的消息

排队的消息

队列中的消息

解决这个问题的唯一方法是添加线缝:

posQu.join()

在等待来自socket的消息的线程结束时,一行:

posQu.task_done()

在主线程的末尾。

在这种情况下,从套接字接收到新消息后,线程将阻塞等待主线程处理入队的项目。

不幸的是,这不是我们想要的行为,因为我希望一个线程始终准备好从套接字获取消息,而不是等待另一个线程完成作业。

我做错了什么? 谢谢

安德鲁 (意大利)

【问题讨论】:

  • 你能给我们一个独立的例子,不需要Barra 库吗?因为当我用只给它一个随机值的代码替换它时,它就像你想要的那样工作。所以我怀疑其他一些代码有问题,而不是这段代码。
  • 由于 Pastebin 现在似乎已关闭,因此我的更改如下:将 import Barra 替换为 import random,将 canMsg = barra.cicevi("can0") 替换为 canMsg = ['ERR'] if random.random() < .25 else [0, 1, 2],然后运行您的代码,您将看到队列和消息交错(通常在同一行)。
  • 另外,CAN-bus真的和这个程序有关系吗? (也许你正在使用一个没有 pthread 的嵌入式 Linux 系统,所以它使用了虚拟线程?这可能可能导致这个问题......)
  • 由于您在评论中给出的更改,我无法重现该问题。我收到有关入队和出队的消息。但是,它可能与一个奇怪的并且在您的情况下出现故障的调度有关。也许写入线程永远不会放弃处理器,因此系统(以及糟糕的调度机制)永远不会激活读取线程。您是否尝试过使用Queue(1) 而不是Queue(0)?这可能会使写入线程在尝试将第二条消息放入队列时立即进入睡眠状态。

标签: python linux multithreading queue can-bus


【解决方案1】:

这可能是因为您的BarraBarra.ricevi 时没有释放全局解释器锁(GIL)。不过你可能想检查一下。

GIL 确保任何时候只能运行一个线程(限制了多处理器系统中线程的有用性)。 GIL 每 100 个“滴答”切换线程——一个松散地映射到字节码指令的滴答。详情请见here

在您的生产者线程中,在 C 库调用之外没有发生太多事情。这意味着生产者线程将在 GIL 切换到另一个线程之前多次调用Barra.ricevi

就复杂性增加而言,解决方案是:

  • 将项目添加到队列后调用time.sleep(0)。这会产生线程,以便另一个线程可以运行。
  • 使用sys.setcheckinterval() 来降低切换线程前执行的“滴答”数量。这是以使程序的计算成本更高为代价的。
  • 使用multiprocessing 而不是threading。这包括使用multiprocessing.Queue 而不是Queue.Queue
  • 修改Barra,使其在调用其函数时释放 GIL。

使用multiprocessing 的示例。请注意,使用多处理时,您的进程不再具有隐含的共享状态。您需要查看多处理以了解如何在进程之间传递信息。

import Barra  
import multiprocessing

def threadCAN(posQu):
    while True:
        canMsg = Barra.ricevi("can0")
        if canMsg[0] == 'ERR':
            print(canMsg)
        else:
            print("Enqueued message", canMsg)
            posQu.put(canMsg)

if __name__ == "__main__":
    posQu = multiprocessing.Queue(maxsize=0)
    procCan = multiprocessing.Process(target=threadCAN, args=(posQu,))
    procCan.daemon = True
    procCan.start()

    while True:
        posMsg = posQu.get()
        print("Messagge from the queue", posMsg)

【讨论】:

  • 太棒了!!非常感谢。是吉尔!我更改了我的 Barra 库,在 I/O 阻塞调用之前和之后添加了宏:Py_BEGIN_ALLOW_THREADS 和 Py_END_ALLOW_THREADS。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2010-09-24
  • 1970-01-01
  • 2023-03-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多