【问题标题】:Pyzmq high-water mark not working on pub socketPyzmq 高水位标记在 pub 套接字上不起作用
【发布时间】:2019-04-20 17:45:08
【问题描述】:

根据 ZeroMQ 文档,一旦排队的消息数量达到高水位线,发布套接字就应该丢弃消息。

这在以下示例中似乎不起作用(是的,我确实在绑定/连接之前设置了 hwm):

import time
import pickle
from threading import Thread
import zmq

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.set_hwm(2)
    pub.bind('tcp://*:5555')

    i = 0
    while True:
        # Send message every 100ms
        time.sleep(0.1)
        pub.send_string("test", zmq.SNDMORE)
        pub.send_pyobj(i)
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.subscribe("test")
    sub.connect('tcp://localhost:5555')
    while True:
        # Receive messages only every second
        time.sleep(1)
        msg = sub.recv_multipart()
        print("Sub: %d" % pickle.loads(msg[1]))

t_pub = Thread(target=pub_thread)
t_sub = Thread(target=sub_thread)
t_pub.start()
t_sub.start()

while True:
    pass

我在 pub 上发送消息的速度比在子套接字上读取消息快 10 倍,hwm 设置为 2。我预计每 10 条消息只会收到一次。相反,我看到以下输出:

Sub: 0
Sub: 1
Sub: 2
Sub: 3
Sub: 4
Sub: 5
Sub: 6
Sub: 7
Sub: 8
Sub: 9
Sub: 10
Sub: 11
Sub: 12
Sub: 13
Sub: 14
...

所以我看到所有消息都到达了,因此它们被保存在某个队列中,直到我阅读它们。在连接之前在子套接字上添加 hwm=2 时也是如此。

我做错了什么还是我误解了hwm

我使用 pyzmq 版本 17.1.2

【问题讨论】:

  • 试试thisthis 发帖。
  • 我更新了我的答案。希望能帮到你。

标签: python zeromq pyzmq


【解决方案1】:

借用issue which I opened in Github 的答案,我将答案更新如下:


消息保存在操作系统的网络缓冲区中。我已经发现 因此,HWM 没有那么有用。这是修改后的代码 订阅者错过消息的地方:

import time
import pickle
import zmq
from threading import Thread
import os

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.setsockopt(zmq.SNDHWM, 2)
    pub.setsockopt(zmq.SNDBUF, 2*1024)  # See: http://api.zeromq.org/4-2:zmq-setsockopt
    pub.bind('tcp://*:5555')
    i = 0
    while True:
        time.sleep(0.001)
        pub.send_string(str(i), zmq.SNDMORE)
        pub.send(os.urandom(1024))
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    sub.setsockopt(zmq.RCVHWM, 2)
    sub.setsockopt(zmq.RCVBUF, 2*1024)
    sub.connect('tcp://localhost:5555')
    while True:
        time.sleep(0.1)
        msg, _ = sub.recv_multipart()
        print("Received:", msg.decode())

t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()

输出看起来像这样:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 47
Received: 48
Received: 64
Received: 65
Received: 84
Received: 85
Received: 159
Received: 160
Received: 270

消息丢失,因为所有队列/缓冲区已满且发布者 开始丢弃消息(参见 ZMQ_PUB 的文档: http://api.zeromq.org/4-2:zmq-socket)。


[注意]:

  • 您应该在侦听器/订阅者和广告商/发布者中使用高水位标记选项。
  • 这些帖子也很相关 (Post1 - Post2)
  • sock.setsockopt(zmq.CONFLATE, 1) 是另一种仅获取在订阅者端定义的最后一条消息的选项。

【讨论】:

  • 感谢您的建议。但是,set_hwm 在这种情况下等同于setsockopt,它在内部调用setsockopt(zmq.SNDHWM, x)setsockopt(zmq.RCVHWM, x),具体取决于套接字类型。您的代码产生与我的完全相同的输出,在接收方添加 hwm 不会改变行为,如我最初的帖子中所述。根据 ZeroMQ 文档的合并设置“不支持多部分消息”,因此它不是一个选项。
  • @Schwingkopf 我仔细检查了一下。
  • 您的意思是“我已经仔细检查过,它正在工作”还是“我会仔细检查它仍然无法正常工作,正如您所说的那样”?
  • @Schwingkopf 不,在你的情况下不起作用,我有the same problem,但我无法使用hwm 解决它,然后我的问题使用conflate 选项解决。
  • 仅供参考,Linux 上 SND 的最小缓冲区为 2048,RCV 为 256。因此,您不能只将缓冲区设置得非常小以强制它使用高水位标记值(我认为这可能是可能的)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-10-18
  • 1970-01-01
  • 1970-01-01
  • 2014-10-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-22
相关资源
最近更新 更多