【发布时间】:2018-08-08 02:02:17
【问题描述】:
我正在使用多处理库中的队列在进程之间共享数据。
我有 2 个队列,都限制为 10 个对象,第一个队列有一个进程将对象“放入”其中,许多进程从中“获取”。
第二个队列有许多进程将对象“放入”其中,而只有一个进程从中“获取”。
系统在一段时间内完美运行,然后开始出现奇怪的行为:只有将对象“放入”第一个队列的进程继续运行,而 从第一个队列读取的进程显然不再运行/工作(即使进程还活着)。 这里似乎有一个死锁,但我不确定,这是我的代码:
更新
import multiprocessing
import logging
from multiprocessing import Process
logger = logging.get_logger(__name__)
# Processes 2, 3 ,4:
class Processes_234(Process):
def __init__(self, message_queue_1, message_queue_2):
Process.__init__(self)
self.message_queue_1 = message_queue_1
self.message_queue_2 = message_queue_2
def run(self):
while True:
try:
# get from queue
el1, el2, el3 = self.message_queue_1.get()
logger.debug('Processes234: get from queue')
except Exception as exp:
logger.debug("message_queue_1: queue empty, Exception message: " + str(exp))
# do some stuff with el1, el2, el3...
try:
# put into second queue
self.message_queue_2.put_nowait((el1, el2, el3))
logger.debug('Processes234: put into queue')
except Exception as excpt:
logger.debug(excpt)
logger.debug("message_queue_2: queue is full")
# the queue is full so replace the old element with the new one
try:
self.message_queue_2.get_nowait()
self.message_queue_2.put_nowait((el1, el2, el3))
# in case other process already fill the queue - ignore
except:
pass
# process 5:
class Process5(Process):
def __init__(self, message_queue_2):
Process.__init__(self)
self.message_queue_2 = message_queue_2
def run(self):
while True:
try:
# get from queue
el1, el2, el = self.message_queue_2.get()
print('Process5: get from queue')
except Exception as exp:
print("message_queue_2: queue empty, Exception message: " + str(exp))
def start_process_1():
# init queues
message_queue_1 = multiprocessing.Queue(maxsize=10)
message_queue_2 = multiprocessing.Queue(maxsize=10)
processes_234 = [Processes_234(message_queue_1, message_queue_2)
for _ in range(3)]
for proc in processes_234:
proc.start()
process5 = Process5(message_queue_2)
process5.start()
counter = 1
while True:
el1 = counter + 1
el2 = counter + counter
el3 = "some string " * ((counter ** 2) % 60000)
counter += 1
# start passing data
try:
# put into queue
message_queue_1.put_nowait((el1, el2, el3))
logger.debug('Process1: put into queue')
except Exception as excpt:
logger.debug(excpt)
logger.debug("message_queue_1: queue is full")
# the queue is full so replace the old element with the new one
try:
message_queue_1.get_nowait()
message_queue_1.put_nowait((el1, el2, el3))
# in case other process already fill the queue - ignore
except:
pass
if __name__ == '__main__':
start_process_1()
有人知道我的问题是什么吗?
我正在使用 python 3.6.5
【问题讨论】:
-
看不到您发布的内容有任何直接问题,但话又说回来,很难尝试重现您的问题,因为我们必须对代码的执行方式做出假设(您的 @987654323 @objects 和你的
__main__看起来像)。请创建一个MCVE,并解释您是如何确定您的Process_234对象停止工作的。 -
谢谢,我解决了,并根据您的要求将代码更新给重现此案例的人。
标签: python python-3.x multiprocessing message-queue deadlock