【问题标题】:Why does my multiprocess queue not appear to be thread safe?为什么我的多进程队列看起来不是线程安全的?
【发布时间】:2019-06-04 10:42:36
【问题描述】:

我正在构建一个运行另一个 Python 程序的看门狗计时器,如果它无法从任何线程中找到签入,则关闭整个程序。这样它最终将能够控制所需的通信端口。定时器代码如下:

from multiprocessing import Process, Queue
from time import sleep
from copy import deepcopy

PATH_TO_FILE = r'.\test_program.py'
WATCHDOG_TIMEOUT = 2

class Watchdog:

    def __init__(self, filepath, timeout):
        self.filepath = filepath
        self.timeout = timeout
        self.threadIdQ = Queue()
        self.knownThreads = {}

    def start(self):
        threadIdQ = self.threadIdQ

        process = Process(target = self._executeFile)
        process.start()
        try:
            while True:
                unaccountedThreads = deepcopy(self.knownThreads)

                # Empty queue since last wake. Add new thread IDs to knownThreads, and account for all known thread IDs
                # in queue
                while not threadIdQ.empty():
                    threadId = threadIdQ.get()
                    if threadId in self.knownThreads:
                        unaccountedThreads.pop(threadId, None)
                    else:
                        print('New threadId < {} > discovered'.format(threadId))
                        self.knownThreads[threadId] = False

                # If there is a known thread that is unaccounted for, then it has either hung or crashed.
                # Shut everything down.
                if len(unaccountedThreads) > 0:
                    print('The following threads are unaccounted for:\n')
                    for threadId in unaccountedThreads:
                        print(threadId)
                    print('\nShutting down!!!')
                    break
                else:
                    print('No unaccounted threads...')

                sleep(self.timeout)

        # Account for any exceptions thrown in the watchdog timer itself
        except:
            process.terminate()
            raise

        process.terminate()


    def _executeFile(self):
        with open(self.filepath, 'r') as f:
            exec(f.read(), {'wdQueue' : self.threadIdQ})

if __name__ == '__main__':
    wd = Watchdog(PATH_TO_FILE, WATCHDOG_TIMEOUT)
    wd.start()

我还有一个小程序可以测试看门狗功能

from time import sleep
from threading import Thread
from queue import SimpleQueue

Q_TO_Q_DELAY = 0.013

class QToQ:

    def __init__(self, processQueue, threadQueue):
        self.processQueue = processQueue
        self.threadQueue = threadQueue
        Thread(name='queueToQueue', target=self._run).start()

    def _run(self):
        pQ = self.processQueue
        tQ = self.threadQueue
        while True:
            while not tQ.empty():
                sleep(Q_TO_Q_DELAY)
                pQ.put(tQ.get())

def fastThread(q):
    while True:
        print('Fast thread, checking in!')
        q.put('fastID')
        sleep(0.5)

def slowThread(q):
    while True:
        print('Slow thread, checking in...')
        q.put('slowID')
        sleep(1.5)

def hangThread(q):
    print('Hanging thread, checked in')
    q.put('hangID')
    while True:
        pass

print('Hello! I am a program that spawns threads!\n\n')

threadQ = SimpleQueue()

Thread(name='fastThread', target=fastThread, args=(threadQ,)).start()
Thread(name='slowThread', target=slowThread, args=(threadQ,)).start()
Thread(name='hangThread', target=hangThread, args=(threadQ,)).start()

QToQ(wdQueue, threadQ)

如您所见,我需要将线程放入 queue.Queue,而一个单独的对象将 queue.Queue 的输出缓慢地馈送到多处理队列中。相反,如果我将线程直接放入多处理队列,或者在放入之间没有 QToQ 对象休眠,则多处理队列将锁定,并且在看门狗端看起来总是空的。

现在,由于多处理队列应该是线程和进程安全的,我只能假设我在实现中搞砸了。我的解决方案似乎奏效了,但也感觉很老套,我觉得我应该修复它。

如果重要的话,我正在使用 Python 3.7.2。

【问题讨论】:

  • 我使用的是 Python 3.6 (from queue import Queue),我不能让它失败。我可以直接通过wdQueue 或通过Queue 使用QToQ 发送ID,而Watchdog 总是接听hangID 并关闭程序。
  • 更新虽然没有hangThread它总是在某个时候关闭。
  • 您的except 想成为finally

标签: python multithreading multiprocessing message-queue python-multiprocessing


【解决方案1】:

我怀疑test_program.py 退出了。

我把最后几行改成这样:

tq = threadQ
# tq = wdQueue    # option to send messages direct to WD

t1 = Thread(name='fastThread', target=fastThread, args=(tq,))
t2 = Thread(name='slowThread', target=slowThread, args=(tq,))
t3 = Thread(name='hangThread', target=hangThread, args=(tq,))

t1.start()
t2.start()
t3.start()
QToQ(wdQueue, threadQ)

print('Joining with threads...')
t1.join()
t2.join()
t3.join()

print('test_program exit')

join() 的调用意味着测试程序永远不会自行退出,因为没有任何线程退出。

因此,t3 挂起,看门狗程序检测到这一点并检测到下落不明的线程并停止测试程序。

如果t3从上述程序中删除,那么其他两个线程表现良好,看门狗程序允许测试程序无限期地继续。

【讨论】:

  • 谢谢!加入线程似乎确实解决了这个问题,即使我没有创建 QToQ,而是线程直接写入多处理队列。我仍然对为什么使用 QToQ 似乎可以解决问题感到困惑,但我稍后会对此进行调查;这显然是一个更好的解决方案。
  • 有趣的旁注:我尝试在看门狗定时器循环中检查 process.is_alive(),但它始终返回 true,即使发生错误也是如此。然而,问题似乎仍然是进程退出,就像简单的事情一样,一段时间 True: pass 似乎也解决了它。
  • 你应该尽量不要有一个忙碌的等待,例如while True:pass,而是有一个合理的块,如:t1.join()。这样可以减少服务器上的 CPU 负载。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-11-20
  • 2013-10-11
  • 2020-10-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多