【问题标题】:Aggregate Messages in Spring Cloud Stream在 Spring Cloud Stream 中聚合消息
【发布时间】:2016-11-24 05:58:37
【问题描述】:

我是 Spring Cloud 的新手,希望将我们的单声道结构更改为微服务,这首先说明了我现在想要做的事情如下

  1. 接收请求以调用来自不同来源的 Web 服务(外部系统)。在任何特定时间,这可以是 1 个请求或最多 100K 个请求。
  2. 外部系统支持批量发送,所以最好能聚合消息并批量发送。例如,继续聚合直到达到数量阈值(100 条消息)或达到时间阈值 2 秒。
  3. 另外,如果我收到一个错误,我想以指数方式回退

我的第一个想法是在我的接收器之前创建一个处理器来执行上述聚合。

这是云计算中正确的思维方式还是他们的另一条路要走?​​p>


工作解决方案

@EnableBinding(Processor.class)
class Configuration {

    @Autowired
    Processor processor;


    @ServiceActivator(inputChannel = Processor.INPUT)
    @Bean
    public MessageHandler aggregator() {

        AggregatingMessageHandler aggregatingMessageHandler =
                new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                        new SimpleMessageStore(10));

        //AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        //aggregatorFactoryBean.setMessageStore();
        aggregatingMessageHandler.setOutputChannel(processor.output());
        //aggregatorFactoryBean.setDiscardChannel(processor.output());
        aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
        aggregatingMessageHandler.setSendTimeout(1000L);
        aggregatingMessageHandler.setCorrelationStrategy(new  ExpressionEvaluatingCorrelationStrategy("'FOO'"));
        aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
        aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
        aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
        aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
        return aggregatingMessageHandler;
    }
}

【问题讨论】:

    标签: spring-cloud spring-cloud-stream spring-cloud-dataflow


    【解决方案1】:

    您可以编写一个aggregator 处理器应用程序,将多条消息组合成一条消息。有关 Spring Integration 聚合器的更多信息,请参阅 here

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-06-20
      • 2020-09-28
      • 2021-06-12
      • 1970-01-01
      • 1970-01-01
      • 2021-04-27
      • 2019-06-09
      • 2018-08-29
      相关资源
      最近更新 更多