【问题标题】:ZMQ-python: PUSH/PULL with one-to-manyZMQ-python:一对多的推/拉
【发布时间】:2020-03-25 08:20:38
【问题描述】:

我正在使用以下代码尝试 zmq,但订阅者正在一个接一个地获取对象。

以下是我的 PUSH 脚本:

# zmq server -- run it once

import zmq
import time
# server
# print(zmq.Context)
ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind('ipc:///tmp/zmqtest')
i=0
while True:
    i+=1
    time.sleep(0.5)
    sock.send_pyobj((i))

以下是 PULL 脚本:

# zmq client -- run it 2,3 times in parallel

import zmq
ctx = zmq.Context() # create a new context to kick the wheels
sock = ctx.socket(zmq.PULL)
sock.connect('ipc:///tmp/zmqtest')

i=0
while True:
    i+=1
    o = sock.recv_pyobj()
    print('received python object:', o,i)
    if o == 'quit':
        print('exiting.')
        break

我从 PULL 脚本之一得到以下输出:

received python object: 1 1
received python object: 3 2
received python object: 5 3
received python object: 7 4

如何将对象并行推送到两个脚本? 我尝试了 PUB/SUB,但它不是这样工作的。 (可以检查将PUSH/PULL替换为PUB/SUB

【问题讨论】:

  • “我怎样才能将对象同时推送到两个脚本?” - 这是否意味着您坚持有机会收到当前( .bind() + .connect() )-ed、并发操作的所有客户端中的相同对象?
  • 是的。我需要将每个对象都发送到两个(或多个)脚本而不会丢失

标签: one-to-many zeromq publish-subscribe pyzmq


【解决方案1】:

PUB-side :

# zmq PUB-server -- run it once

import zmq
import time

IPC  = 'ipc:///tmp/zmqtest'
ctx  = zmq.Context()
PUB  = ctx.socket( zmq.PUB )
PUB.bind( IPC )
#------------------------------------------------- SELF-DEFENSIVE CONFIGURATION
PUB.setsockopt( zmq.LINGER, 0 )
PUB.setsockopt( zmq...        )
#------------------------------------------------------------------------------
i = 0
while True:
    i += 1
    time.sleep( 0.5 )
    sock.send_pyobj( ( i ) )
#------------------------------------------------------------------------------

SUB-side(s):

# zmq SUB-client -- run x-times concurrently ( or distributed, if other TransportClasses permit )

import zmq

IPC = 'ipc:///tmp/zmqtest'        # <TransportClass>://<address>, TCP,TIPC,...may follow
ctx = zmq.Context()               # create a new context to kick the wheels
SUB = ctx.socket( zmq.SUB )
SUB.connect( IPC )
#------------------------------------------------- SELF-DEFENSIVE CONFIGURATION
SUB.setsockopt( zmq.LINGER,     0 )
SUB.setsockopt( zmq.SUBSCRIBE, "" )
SUB.setsockopt( zmq...            )
#------------------------------------------------------------------------------
i    = 0
aClk = zmq.Stopwatch()
MASK = '(i:{1:_>9d}): After{2:_>+12d} [us] did .recv() a python object:[{0:}]'
while True:
    i += 1
    aClk.start()
    o = sock.recv_pyobj()
    _ = aClk.stop()
    print( MASK.format( repr( o ), i, _ ) )

    if o == 'quit':
        print( 'Will exit.' )
        #--------------------------------------- BE NICE & FAIR TO RESOURCES
        SUB.setsockopt( zmq.UNSUBSCRIBE, "" )
        SUB.disconnect( IPC )
        SUB.close()
        ctx.term()
        #-------------------------------------------------------------------
        break

“是的。我需要将每个对象发送到两个(或多个)脚本无损

请注意,对此有零保修。可以构建自己的应用级协议来实现这一点。

【讨论】:

    猜你喜欢
    • 2012-09-12
    • 2014-05-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-22
    相关资源
    最近更新 更多