【问题标题】:UDP handler with Reactor in SpringSpring 中带有 Reactor 的 UDP 处理程序
【发布时间】:2013-11-13 23:44:16
【问题描述】:

我希望我的应用对从数千个不同客户端发送的 UDP 事件做出反应。每个客户端每 5-10 秒发送 1-10 个 UDP 数据包。每个数据包都将并且应该非常快速地处理(主要是在内存和小型计算中,在 redis 的帮助下,只有偶尔的 DB 调用)。不会向调用者返回数据。

我在 Spring 中实现了 Reactor,就像他们在 wiki 中描述的那样。 然后我实现了 UDP 入站通道,就像他们的 Spring Integration 文档中描述的那样。这是配置:

<int-ip:udp-inbound-channel-adapter id="receiverChannel"
                                    channel="stringConvert"
                                    port="9000"
                                    multicast="false"
                                    check-length="false"
                                    pool-size="10"
                                    lookup-host="false"
        />

<int:transformer id="convertChannel"
                 input-channel="stringConvert"
                 output-channel="toProcess"
                 ref="transformer"
                 method="transform"

        />

<int:service-activator input-channel="toProcess"
                       ref="accumulator"
                       method="accumulate"/>

<bean id="accumulator" class="hello.UDPAccumulator" />
<bean id="transformer" class="hello.UDPTransformer" />

然后在 UDPAccumulator 中我将该消息发布到反应器:

@Service
public class UDPAccumulator {

@Autowired
ReactorProducer producer;

public void accumulate(String quote) {
    producer.fireEvent(quote);

}

}

关于我想要高通量输出,这是“正确”的方法吗? int-ip:udp-inbound-channel-adapter 的内部工作原理是什么?在将消息传递到反应器之前,它会成为瓶颈吗?我看到反应器有一些与 TCP 相关的类和支持,但没有 UDP。任何关于如何以最佳方式做到这一点的建议表示赞赏!

奖金问题。如果消息到达的速度比发送到反应器的速度快怎么办? redis message store(文章底部)会有帮助吗?如果我在反应器中处理这些数据包的方法很慢怎么办?

【问题讨论】:

    标签: spring spring-integration reactor


    【解决方案1】:

    由于我们在 Reactor 中还没有直接的 UDP 支持,因此您将事件发布到 Reactor 的抽象是非常明智的。但是您确实在“额外问题”中注意到,发布者/消费者吞吐量存在问题,必须以某种特定于域的方式进行管理;那里没有灵丹妙药。

    在您的用例中,我实际上很想说Processor [1] 可能更合适。它为数据处理提供了更高的整体吞吐量,因为它绕过了在普通Reactor 中发生的基于动态选择器的调度。除非您根据某些主题标准将传入事件分派给不同的处理程序,否则我建议您改为查看它。有了更高的吞吐量,你就不必担心消费者跟不上(除非你的Consumer 正在做一些非常慢的事情,没有什么可以自动加速)。

    但如果你真的,真的需要管理积压,我建议通过Queue 将你的生产者和消费者解耦。 Reactor 有一个 PersistentQueue [2] 抽象,您可以使用 JavaChronicle [3] 将对象发布到磁盘并持久保存到磁盘,然后可以使用 Poller 将其导出到 Consumer(javadoc 即将推出 Poller这周的某个时候,当我们为 1.0 做准备时……它以前被称为 Pipe[4])。

    【讨论】:

      【解决方案2】:

      我无法与 Reactor 通话,但 UDP 适配器有一个专用线程来读取原始数据包并将它们交给TaskExecutor。它会尽快执行此操作,以便重新读取下一个数据包。

      默认TaskExecutor是固定线程池。

      Reactor 有一个DispatcherTaskExecutor 可以注入到适配器中。

      相同的任务执行器用于主读取线程和切换。

      【讨论】:

        猜你喜欢
        • 2021-07-30
        • 2015-07-17
        • 2020-08-21
        • 2017-01-10
        • 1970-01-01
        • 2015-06-07
        • 2017-07-29
        • 2015-05-13
        • 1970-01-01
        相关资源
        最近更新 更多