订阅如何运作?
直到 v3.1,订阅机制(又名 TOPIC 过滤器)是在 SUB 端处理的,因此这部分进程分布在所有SUB-s(以所涉及的所有传输类的统一广泛的数据流量为代价)并且没有任何惩罚,除了在 @ 上采购此类与数据流相关的工作负载(参考下文) 987654324@-side.
由于 v3.1,TOPIC 过滤器在 PUB 端处理,代价是这样的处理开销,但节省了之前的所有浪费的传输容量,只是为了稍后在SUB 端意识到消息与 TOPIC 过滤器不匹配并将被丢弃。
“显着较慢”确实意味着体内的定量指标
根据问题中的假设,比较应该与:
Scenario A:PUB-进程没有SUB-consumer 连接/订阅任何主题过滤器Scenario B: 一个PUB-进程有一个SUB-consumer 连接/订阅了一个主题过滤器
ZeroMQ 有一个状态完整的内部 FSA,它可以节省编程架构和资源利用率。这就是说,Scenario A 产生零工作负载,即对PUB-处理没有影响,因为在第一个真正的SUB-connects 之前实际上不会发生此类处理。
如果您的 Scenario B 确实代表了用例,那么与仅服务一个 SUB-consumer 相关的额外处理开销很容易衡量:
from zmq import Stopwatch as StopWATCH
aStopWATCH = StopWATCH()
# -----------------------------------------------------------------<TEST_SECTION>-start
aStopWATCH.start();s_PUB_send.send( "This is a MESSAGE measured for 0 SUB-s", zmq.NOBLOCK );t0 = aStopWATCH.stop()
# -----------------------------------------------------------------<TEST_SECTION>-end
# .connect the first SUB-process and let it .setsockopt() for v3.1+ accordingly
# -----------------------------------------------------------------<TEST_SECTION>-start
aStopWATCH.start();s_PUB_send.send( "This is a MESSAGE measured for 1 SUB-s", zmq.NOBLOCK );t1 = aStopWATCH.stop()
# -----------------------------------------------------------------<TEST_SECTION>-end
print "\nZeroMQ has consumed {0:} [us] for PUB-side processing on [Scenario A]\nZeroMQ has consumed {1:} [us] for PUB-side processing on [Scenario B]".format( t0, t1 )
如果.connect()-ed(FSA-知道实时交易对手),但未订阅任何内容(.setsockopt( "" ))SUB-消费者处理是经过验证,不管实际使用的{ pre-v3.1 | v3.1+ }-API(请注意在分布式系统中处理不同版本的 API,其中无法为远程节点强制执行统一的 API 版本,这超出了自己的控制范围的配置管理)。
如果性能已经下降?
可以进一步微调性能已经受限的项目的性能属性。
对于选定的处理任务,性能(人们可能会先验地猜测)在这里并不是那么困难,人们可以通过将每个工作流的处理映射到多个创建的 I/O 线程的分离子集上来隔离工作负载流的处理:
地图s_REQ_sock.setsockopt( ZMQ_AFFINITY, 0 );
和s_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
分别s_SUB_recv.setsockopt( ZMQ_AFFINITY, ... );
set s_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
set s_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
set s_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
set s_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );