【问题标题】:How could I set hwm in the push/pull pattern of zmq?如何在 zmq 的推/拉模式中设置 hwm?
【发布时间】:2014-05-02 01:34:27
【问题描述】:

我发现了一个类似的问题,ZeroMQ: HWM on PUSH does not work,但它无法解决我的问题。

我想控制推送套接字排队的消息数量,但它不起作用,仍然排队 1000 条消息。
所以我想知道如何设置推送套接字的hwm。提前致谢。

我的环境是:libzmq 4.0.4、pyzmq 14.1.0、python 3.3

这是我的代码:

server.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random

import zmq


class TestPush(object):

    def __init__(self):
        self.ctx = zmq.Context()

        random.seed()

    def run(self):
        task_snd = self.ctx.socket(zmq.PUSH)
        task_snd.setsockopt(zmq.SNDHWM, 10)
        task_snd.bind('tcp://*:53000')        

        while True:
            workload = str(random.randint(1, 100))
            task_snd.send(workload.encode('utf-8'))
            print('Send {0}'.format(workload))


if __name__ == '__main__':
    test_push = TestPush()
    test_push.run()

client.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import random

import zmq


class TestPull(object):

    def __init__(self):
        self.ctx = zmq.Context()

    def run(self):
        task_rcv = self.ctx.socket(zmq.PULL)
        task_rcv.setsockopt(zmq.RCVHWM, 1)
        task_rcv.connect('tcp://localhost:53000')

        while True:
            msg = task_rcv.recv()
            print('Receive msg: {0}'.format(msg))

            time.sleep(random.randint(2, 3))


if __name__ == '__main__':
    test_pull = TestPull()
    test_pull.run()

【问题讨论】:

  • 你能在设置后用“getsocktopt”函数读取HWM值吗?因此您可以验证是否设置了值。
  • @AhmetKakıcı 是的,getsockopt() 返回的值表明对应的 hwm 值已更改。

标签: python zeromq pyzmq


【解决方案1】:

当我尝试在推拉式套接字上设置 HWM(高水位线)时,我遇到了与 ZeroMQ 类似的问题。甚至,它也不适用于 pub 和 sub 套接字。我想解释一下发生了什么以及它是如何解决的。

我制作了 2 个脚本,第一个是带有推式套接字的发送器,另一个是带有拉式套接字的接收器。在两个套接字上将 HWM 设置为 10。在接收器脚本中,我在收到每条消息后延迟了 5 秒。然后我用 100 条消息循环运行发送者脚本(在保持接收器运行接收之后)。

我的预期:

接收方队列和发送方队列将到达 hwm。之后,发件人将停止发送更多消息。

但是发生了什么:

发件人发送所有 100 条消息并退出。但是接收者一直在处理一个又一个的消息,直到它接收到所有的消息。

经过研究发现原因:

有一个叫做内核套接字缓冲区的东西,它位于发送者套接字和接收者套接字之间。每当一个进程打开一个套接字时,内核都会为 tcp 套接字分配内存空间,默认为 128KB。内核套接字缓冲区适用于发送方和接收方套接字(因此总缓冲区将为 128KB + 128KB)。我的消息大小以字节为单位(带有一些字符的 int)。因此,总的消息缓冲如下:

总缓冲区消息 = 发送方套接字 hwm + 发送方套接字的内核套接字缓冲区 (128KB) + 接收器插座 hwm + 发送方套接字的内核套接字缓冲区 (128KB)

解决方案:

现在,我将消息长度更改为 1KB 多一点。然后再次执行测试,发现发送了大约 260 条消息(如预期的那样),此后发送方停止间隔,直到接收方收到一些消息并重新开始。


附加信息

为了让push socket在接收方无法接收的情况下也能继续发送消息,我们可以在发送例程中使用NOBLOCK选项,但是接收方丢失的消息数量会增加很多。因此,更好的选择是使用轮询或超时,然后使用 NOBLOCK 选项调用发送例程。

请注意,您可以使用 zeromq 的 SNDBUFF/RCVBUFF,但操作系统可能不遵守它(在我的情况下它不起作用)。

【讨论】:

    猜你喜欢
    • 2012-09-12
    • 1970-01-01
    • 2021-11-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-02
    • 1970-01-01
    • 2023-03-28
    相关资源
    最近更新 更多