【问题标题】:ZeroMQ: I want Publish–Subscribe to drop older messages in favor of newer onesZeroMQ:我希望 Publish-Subscribe 删除旧消息以支持新消息
【发布时间】:2014-09-19 19:19:48
【问题描述】:

我正在使用 ZeroMQ 发布-订阅套接字连接两个进程。发布过程是一个传感器,刷新率比订阅过程快得多。我希望订阅过程只使用队列中的最新消息,而完全忽略旧消息。

我尝试在订阅者上设置高水位标记,但这似乎会丢弃较新的消息而不是较旧的消息。

是否有人可以为此目的指导我采用发布-订阅模式?

【问题讨论】:

标签: zeromq publish-subscribe


【解决方案1】:

documentation on zeromq 了解合并功能(这是一种新功能),我认为这正是您想要的。

来自文档:

ZMQ_CONFLATE:只保留最后一条消息 如果设置,套接字应仅保留 入站/出站队列中有一条消息,此消息是最后一条 收到的消息/要发送的最后一条消息。忽略“ZMQ_RCVHWM”和 'ZMQ_SNDHWM' 选项。不支持多部分消息,在 特别是,只有一部分保留在套接字内部队列中。

【讨论】:

  • 太好了,谢谢。现在我们可以删除 cmets 以保持整洁。
【解决方案2】:

好的,我找到了一个解决方案,但我不知道它是否是最好的 - 所以我暂时不会将其标记为正确。

zmq::message_t message;
int events = 0;
size_t events_size = sizeof(int);

// Priming read
subscriber.recv(&message);

// See if there are more messages to read
subscriber.getsockopt(ZMQ_EVENTS, static_cast<void*>(&events), &events_size);
while (events & ZMQ_POLLIN) {

  // Receive the new (and perhaps most recent) message
  subscriber.recv(&message);

  // Poll again for additional messages
  subscriber.getsockopt(ZMQ_EVENTS, static_cast<void*>(&events), &events_size);
}

// Now, message points to the most recent received data.

此策略的另一个优点是队列不应该填满。缺点是我的发布者发送的速度可能比订阅者中的这个循环运行的速度更快,然后它会无限循环。

这似乎不太可能,但我想让它变得不可能。我还不太清楚如何实现这个目标。

【讨论】:

  • 您处于一个不可能的场景中,因为订阅者可能永远知道它正在接收的消息将在发送新消息之前被处理,并且如果它不够快跟上,然后你就卡住了。处理这种情况的唯一方法是使用多个订阅者,这些订阅者共同拥有足够的带宽来处理发布者的最大输出。不要使用 pub-sub,而是使用 Dealer-router,这样您就可以在所有客户端之间轮询消息,并且只需处理确定应该在后端取代哪个消息。乱。合并(如果可用)会更好。
  • 有关合并的详细信息,请参见其他答案。
猜你喜欢
  • 1970-01-01
  • 2014-11-14
  • 1970-01-01
  • 2012-07-20
  • 1970-01-01
  • 1970-01-01
  • 2015-06-20
  • 2016-10-22
  • 2013-09-30
相关资源
最近更新 更多