【问题标题】:ZeroMQ: HWM on PUSH does not workZeroMQ:PUSH 上的 HWM 不起作用
【发布时间】:2014-10-17 20:06:49
【问题描述】:

我正在尝试编写一个服务器/客户端脚本,其中包含一个用于发泄任务的服务器以及多个执行它的工作人员。 问题是我的呼吸机有很多任务,它会在心跳中填满内存。 我尝试在绑定之前设置 HWM,但没有成功。只要工作人员连接,它就会继续发送消息,完全无视设置的 HWM。我还有一个水槽,用来记录已完成的任务。

server.py

import zmq

def ventilate():
    context = zmq.Context()

    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
    sender.bind("tcp://*:5557")


    # Socket with direct access to the sink: used to syncronize start of batch
    sink = context.socket(zmq.PUSH)
    sink.connect("tcp://localhost:5558")

    print "Sending tasks to workers…"

    # The first message is "0" and signals start of batch
    sink.send('0')
    print "Sent starting signal"

    while True:
        sender.send("Message")



if __name__=="__main__":
    ventilate()

worker.py

import zmq
from multiprocessing import Process

def work():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")

    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")

    # Process t asks forever
    while True:
        msg = receiver.recv_msg()
        print "Doing sth with msg %s"%(msg)     
        sender.send("Message %s done"%(msg))

if __name__ == "__main__":
    for worker in range(10):        
        Process(target=work).start()

sink.py

import zmq

def sink():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://*:5558")

    # Wait for start of batch
    s = receiver.recv()
    print "Received start signal"
    while True:
        msg = receiver.recv_msg()
        print msg


if __name__=="__main__":
    sink()

【问题讨论】:

  • 我会尝试重现您的问题。您能告诉我您使用的是哪个版本的 PyZMQ 和 ZMQ 吗?请运行zmq.zmq_version()zmq.__version__
  • ZMQ 版本为 4.0.3 和 pyzmq 13.1.0
  • -Eh,这是一个令人讨厌的组合。您能否更新到 pyzmq 14.0.1 并进行测试(我不介意您使用什么 zmq 版本,请告诉我)。我在 Windows 上使用 pyzmq 13.1.0 和 zmq 3.x.x,在不更新到 pyzmq v14 的情况下更改 zmq 版本很痛苦,但我想确保在我尝试重现之前您仍然看到该版本的问题跨度>
  • 用 v14 测试,同样的问题。

标签: python zeromq pyzmq


【解决方案1】:

好的,我玩了一下,我认为问题不在于 PUSH HWM,而是您不能为 PULL 设置 HWM。如果您查看 this documentation,您会看到那里显示 N/A for action on HWM。

每个 PULL 套接字似乎都接收数百条消息(我确实尝试设置一个 HWM,以防它在 PULL 套接字上执行任何操作。它没有。)。我通过更改呼吸机以发送带有递增整数的消息来证明这一点,并将池中的每个工作人员更改为在调用 recv() 之间等待 2 秒。工作人员打印出他们正在处理具有很大不同整数的消息。例如,一个工作人员将处理消息 10,而下一个工作人员正在处理消息 400。随着时间的推移,您会看到正在处理消息 10 的工作人员现在正在处理消息 11、12、13 等。其他是处理401、402等

这表明 ZMQ_PULL 套接字正在某处缓冲消息。因此,虽然 ZMQ_PUSH 套接字确实有一个 HWM,但 PULL 套接字正在快速请求消息,尽管它们实际上并没有被调用 recv() 访问。因此,如果连接了 PULL 套接字,则会有效地忽略 PUSH HWM。据我所知,您无法控制 PULL 套接字缓冲区的长度(我希望 RCVHWM 套接字选项可以控制它,但它似乎没有)。

这种行为当然引出了一个问题,即 ZMQ_PULL HWM 选项的意义何在,只有在您还可以控制接收套接字 HWM 时才有意义。

此时,我会开始询问 0MQ people 您是否遗漏了一些明显的东西,或者这是否被视为错误。

很抱歉,我帮不上忙!

【讨论】:

  • 非常感谢您迄今为止所做的努力。我确实发现设置 setsockopt(zmq.RCVBUF, 2) 实际上会减慢速度。默认情况下,它设置为 0,这意味着它采用操作系统的默认缓冲区大小。不知道它是什么。它仍然不能完全满足我的要求,但它更接近了。
【解决方案2】:

ZeroMQ 在套接字的发送端和接收端都有缓冲区,因此您需要在代码中的 PUSH 和 PULL 套接字上设置高水位线(实际上是在 bind()connect() 之前)。

在 Python 绑定中,现在可以通过 socket.hwm = 1 方便地完成此操作,这将一次性设置 ZMQ_SNDHWMZMQ_RCVHWM

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-11-22
    • 2020-09-18
    • 1970-01-01
    • 2011-11-11
    • 2014-06-25
    • 2015-07-18
    • 2013-06-13
    • 2015-02-21
    相关资源
    最近更新 更多