【发布时间】:2020-08-15 13:24:59
【问题描述】:
我想聚合 3 个主题,但只是为了聚合策略的 PoC,我正在尝试聚合 2 个主题。
所以,我配置了以下骆驼路线:
@Override
public void configure() throws Exception {
CamelContext contexto = new DefaultCamelContext();
try{
onException(SQLException.class, Exception.class)
.handled(true)
.maximumRedeliveries(1)
.process(new CustomExceptionProcess());
errorHandler(deadLetterChannel("file:data/error") // There is no file created
.maximumRedeliveries(3)
.redeliveryDelay(5000));
from("kafka:topic1")
.process(new ProcessTopic1()) // inside this process
.to("direct:agregador");
from("kafka:topic2")
.process(new ProcessTopic2())
.to("direct:agregador");
from("direct:agregador")
.aggregate(header("idAgregador"), new EstrategiaAgregador()) // implements AggregationStrategy
.completionPredicate(predicadoTamanhoMinimo()) // This just to check if is there at least 2 objects before realease the aggregation.
.eagerCheckCompletion()
.to("direct:insereBanco")
;
from("direct:insereBanco")
.process(new ProcessoInsere());
}catch (Exception e){
LOG.error("Error in the route: ", e);
}
而topic1和topic2的流程是这样的:
// It's the same process for ProcessTopic1 and ProcessTopic2
@Override
public void process(Exchange exchange) throws Exception {
if(!(exchange.getIn().getBody() instanceof Object1)){
String someData = (String) exchange.getIn().getBody();
JSONParser parser = new JSONParser();
Object objeto = parser.parse(someData);
JSONObject json = (JSONObject) objeto;
ObjectMapper mapeadorObj = new ObjectMapper();
Object1 someEntity = mapeadorObj.readValue(json.toString(), Object1.class);
ProducerTemplate producerTemplate = exchange.getContext().createProducerTemplate();
producerTemplate.sendBodyAndHeader("direct:agregador", someEntity, "idAgregador", someEntity.getId());
}
}
但是,当我通过聚合获得第一条消息时,消息离开(因为必须等待第二条消息),但很快这条消息退出,我得到了这个异常:
org.apache.camel.CamelExchangeException: Invalid correlation key. Exchange[ID-XXXX-1597437627634-0-1]
at org.apache.camel.processor.aggregate.AggregateProcessor.doProcess(AggregateProcessor.java:302)
at org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:271)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:76)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:76)
>> at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:76)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>> at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
一件事是我也在使用 Kafka 主题来发送消息,并且由于这出现在堆栈跟踪中,我不确定我是否还需要为 Kafka 设置一些内容。
【问题讨论】: