【问题标题】:Remote queue consumer misses first message after restart远程队列消费者在重启后错过了第一条消息
【发布时间】:2017-02-05 12:49:30
【问题描述】:

我有如下代码:

server.py

import queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

q = queue.Queue()
QueueManager.register('queue', callable=lambda:q)
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
s = m.get_server()
s.serve_forever()

生产者.py

from multiprocessing.managers import BaseManager
import time

class QueueManager(BaseManager):
    pass

QueueManager.register('queue')
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
m.connect()
queue = m.queue()

idx = 0
while True:
    time.sleep(2)
    queue.put(idx)
    idx += 1

consumer.py

from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

QueueManager.register('queue')
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
m.connect()
queue = m.queue()

while True:
    message = queue.get()
    print(message)

如果我运行服务器和生产者,然后启动消费者,我会看到生产者放入队列的所有消息都出现在消费者身上。但是,如果我停止消费者并立即重新启动它,它总是会跳过一条消息。

为了说明我看到的 consumer.py 的输出:

0
1
2
3
<restart the consumer>
5
6
7
etc.

这正是 python 多处理队列应该如何工作的方式,是错误还是我做错了什么?

【问题讨论】:

  • 如何重启消费者?
  • 我只是用 Ctrl+C 杀死它并重新启动它
  • 当生产者将整数放入队列时,您正在杀死消费者,那么您希望它如何打印上一个项目?
  • 如果我在生产者将下一个整数放入队列之前杀死消费者,也会发生这种情况。我希望这个整数只保留在队列中,直到有消费者使用它,但是似乎杀死消费者会影响队列。
  • @Semi:我用你的代码又玩了大约 1 个小时,但无法让它工作,请参阅下面的更新答案,我会将其归档为错误

标签: python python-3.x queue multiprocessing


【解决方案1】:

我认为问题在于管道在 python 中的实现方式,或者它甚至可能是对操作系统的限制。这是完整的堆栈跟踪:

Traceback (most recent call last):
  File "consumer.py", line 12, in <module>
    message = queue.get()
  File "<string>", line 2, in get
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 757, in _callmethod
    kind, result = conn.recv()
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt

queue.get() 调用中的值似乎丢失了,该调用没有被SIGINT 正确终止。 queue.get() 立即被取消,因此 python 不会完成 get() 调用然后失去价值。看起来更像是 python 没有正确取消管道上的recv

如果您将消费者更改为此:

while True:
    while queue.empty():
        sleep(0.1)
    message = queue.get()
    print(message)

它会起作用的。但当然这是一种解决方法,而不是真正的解决方案。

更新:

在更多地使用您的代码之后,我认为这是一个错误,因为:

  1. followed their coding example one by one
  2. 没有任何类型的队列可以解决问题(multiprocessing.Queuemultiprocessing.JoinableQueue
  3. 同时发送 task_done() 也无济于事

该错误同时发生在 python2 和 python3 上。我建议你report this as a bug。在最坏的情况下,如果它不是一个错误,你至少可以解释为什么 python 会这样。

【讨论】:

  • 感谢您抽出宝贵的时间,我也尝试了几件事,但没有成功。现在我将其报告为错误:bugs.python.org/issue29454
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-08-08
  • 1970-01-01
  • 1970-01-01
  • 2021-11-14
  • 2011-10-30
  • 1970-01-01
相关资源
最近更新 更多