【发布时间】:2017-12-12 15:33:56
【问题描述】:
我需要能够从 rabbitmq 接收消息,进行一些转换(从 1 条输入消息,我创建 1000 条消息),然后按以下方式处理这 1000 条消息:我以 10 条为一组推送消息,然后休眠 5 秒.
您可以看到下面的代码,我需要的帮助是最后一步 - 如何以这种方式进行消息批处理?
@Bean
public IntegrationFlow refreshFlow() {
return IntegrationFlows
//get messages from rabbitmq
.from(refreshInboundAdapter())
//convert to POJO
.transform(new JsonToObjectTransformer(RefreshRequest.class))
//make 1 -> 1000 messages (but release in batches of 10, not all)
.<RefreshRequest, List<ElasticMatch>>transform(m -> componentConfig.matchRefreshService().processRequest(m))
//HERE WAIT 5 seconds and forward to rabbit in batches of 10
.handle(refreshOutboundEndpoint())
.get();
}
【问题讨论】:
标签: java spring spring-integration spring-amqp