【发布时间】:2016-11-24 05:58:37
【问题描述】:
我是 Spring Cloud 的新手,希望将我们的单声道结构更改为微服务,这首先说明了我现在想要做的事情如下
- 接收请求以调用来自不同来源的 Web 服务(外部系统)。在任何特定时间,这可以是 1 个请求或最多 100K 个请求。
- 外部系统支持批量发送,所以最好能聚合消息并批量发送。例如,继续聚合直到达到数量阈值(100 条消息)或达到时间阈值 2 秒。
- 另外,如果我收到一个错误,我想以指数方式回退
我的第一个想法是在我的接收器之前创建一个处理器来执行上述聚合。
这是云计算中正确的思维方式还是他们的另一条路要走?p>
工作解决方案
@EnableBinding(Processor.class)
class Configuration {
@Autowired
Processor processor;
@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
//AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("'FOO'"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
}
【问题讨论】:
标签: spring-cloud spring-cloud-stream spring-cloud-dataflow