【问题标题】:AggregationStrategy with org.apache.camel.CamelExchangeException: Invalid correlation keyAggregationStrategy with org.apache.camel.CamelExchangeException:相关键无效
【发布时间】:2020-08-15 13:24:59
【问题描述】:

我想聚合 3 个主题,但只是为了聚合策略的 PoC,我正在尝试聚合 2 个主题。

所以,我配置了以下骆驼路线:

@Override
public void configure() throws Exception {
CamelContext contexto = new DefaultCamelContext();
try{
    onException(SQLException.class, Exception.class)
            .handled(true)
            .maximumRedeliveries(1)
            .process(new CustomExceptionProcess());

    errorHandler(deadLetterChannel("file:data/error") // There is no file created 
            .maximumRedeliveries(3)
            .redeliveryDelay(5000));

    from("kafka:topic1") 
            .process(new ProcessTopic1()) // inside this process 
            .to("direct:agregador");

    from("kafka:topic2") 
            .process(new ProcessTopic2())
            .to("direct:agregador");

    from("direct:agregador")
            .aggregate(header("idAgregador"), new EstrategiaAgregador()) // implements AggregationStrategy
            .completionPredicate(predicadoTamanhoMinimo()) // This just to check if is there at least 2 objects before realease the aggregation.
            .eagerCheckCompletion()
            .to("direct:insereBanco")
            ;

    from("direct:insereBanco")
            .process(new ProcessoInsere());

}catch (Exception e){
    LOG.error("Error in the route: ", e);
}

而topic1和topic2的流程是这样的:

// It's the same process for ProcessTopic1 and ProcessTopic2
@Override
public void process(Exchange exchange) throws Exception {

    if(!(exchange.getIn().getBody() instanceof Object1)){

        String someData = (String) exchange.getIn().getBody();

        JSONParser parser = new JSONParser();
        Object objeto = parser.parse(someData);
        JSONObject json = (JSONObject) objeto;

        ObjectMapper mapeadorObj = new ObjectMapper();
        Object1 someEntity = mapeadorObj.readValue(json.toString(), Object1.class);

        ProducerTemplate producerTemplate = exchange.getContext().createProducerTemplate();
        producerTemplate.sendBodyAndHeader("direct:agregador", someEntity, "idAgregador", someEntity.getId());
    }
}

但是,当我通过聚合获得第一条消息时,消息离开(因为必须等待第二条消息),但很快这条消息退出,我得到了这个异常:

org.apache.camel.CamelExchangeException: Invalid correlation key. Exchange[ID-XXXX-1597437627634-0-1]
    at org.apache.camel.processor.aggregate.AggregateProcessor.doProcess(AggregateProcessor.java:302)
    at org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:271)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:76)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:76)
>>  at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:76)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>>  at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)

一件事是我也在使用 Kafka 主题来发送消息,并且由于这出现在堆栈跟踪中,我不确定我是否还需要为 Kafka 设置一些内容。

【问题讨论】:

    标签: apache-kafka apache-camel


    【解决方案1】:

    在您的路线中,您已经在执行.to("direct:agregador")。为什么要使用生产者模板在处理器中发送?

    似乎正在发生的事情是处理器在交换中成功地将someEntity 发送到聚合器,但是当控制权返回到路由时,由于您没有将交换的主体设置为someEntity,所以交换仍然保留没有idAgregador 标头的原始消息。

    您可以在您的处理器中做的是删除生产者模板代码并添加:

    exchange.getIn().setHeader("idAgregador", someEntity.getId());
    exchange.getIn().setBody(someEntity);
    

    【讨论】:

    • 我明白你的意思 - 我会研究一下。如果这解决了我的问题,我会标记你的答案。无论如何,谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-11-25
    • 1970-01-01
    • 2019-09-03
    • 1970-01-01
    • 2012-04-18
    • 2017-11-21
    • 2015-07-15
    相关资源
    最近更新 更多