【发布时间】:2014-10-17 20:06:49
【问题描述】:
我正在尝试编写一个服务器/客户端脚本,其中包含一个用于发泄任务的服务器以及多个执行它的工作人员。 问题是我的呼吸机有很多任务,它会在心跳中填满内存。 我尝试在绑定之前设置 HWM,但没有成功。只要工作人员连接,它就会继续发送消息,完全无视设置的 HWM。我还有一个水槽,用来记录已完成的任务。
server.py
import zmq
def ventilate():
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
sender.bind("tcp://*:5557")
# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print "Sending tasks to workers…"
# The first message is "0" and signals start of batch
sink.send('0')
print "Sent starting signal"
while True:
sender.send("Message")
if __name__=="__main__":
ventilate()
worker.py
import zmq
from multiprocessing import Process
def work():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process t asks forever
while True:
msg = receiver.recv_msg()
print "Doing sth with msg %s"%(msg)
sender.send("Message %s done"%(msg))
if __name__ == "__main__":
for worker in range(10):
Process(target=work).start()
sink.py
import zmq
def sink():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
print "Received start signal"
while True:
msg = receiver.recv_msg()
print msg
if __name__=="__main__":
sink()
【问题讨论】:
-
我会尝试重现您的问题。您能告诉我您使用的是哪个版本的 PyZMQ 和 ZMQ 吗?请运行
zmq.zmq_version()和zmq.__version__ -
ZMQ 版本为 4.0.3 和 pyzmq 13.1.0
-
-Eh,这是一个令人讨厌的组合。您能否更新到 pyzmq 14.0.1 并进行测试(我不介意您使用什么 zmq 版本,请告诉我)。我在 Windows 上使用 pyzmq 13.1.0 和 zmq 3.x.x,在不更新到 pyzmq v14 的情况下更改 zmq 版本很痛苦,但我想确保在我尝试重现之前您仍然看到该版本的问题跨度>
-
用 v14 测试,同样的问题。