【问题标题】:How to make the event-driven consumer in Apache Camel delete the consumed messages?如何让 Apache Camel 中的事件驱动消费者删除已消费的消息?
【发布时间】:2014-08-08 10:32:24
【问题描述】:

我有一个将消息发送到我的队列的休息服务,这些消息被路由到文件:

from("test-jms:queue:test.queue").to("file://test");

另外,我在端点上有一个事件驱动的消费者。目前,这仅在使用消息时才写入日志:

final Consumer consumer = endpoint.createConsumer(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            String message = exchange.getIn().getBody(String.class);

            LOG.info("Message processed: " + message);
        }
    });

这一切都很好。在/test 文件夹中,我会为收到的每条消息获取一个新文件,此外,消费者还会创建一个附加了.camelLock 的标记文件。使用readLock=none 选项可防止消费者按预期制作这些标记文件。

但是,消息文件和标记文件在消费后都不会被删除。我可能在消费者实现中遗漏了什么?

【问题讨论】:

    标签: java apache-camel activemq


    【解决方案1】:

    当您使用内联处理器手动创建这样的消费者时,您需要在完成触发删除/移动文件等工作时手动完成 Exchange 的 UoW。

    exchange.getUnitOfWork().done(exchange);
    

    您也可以尝试使用应该为您完成 UnitOfWork 的 UnitOfWorkProducer 包装您的处理器。

    【讨论】:

    • 您的回答为我指明了正确的方向 - 手动完成 UoW。但是,我无法获得您建议的工作方法。相反,我从骆驼上下文创建了一个消费者模板,并在此基础上手动完成了 UoW。 consumerTemplate.doneUoW(exchange);
    【解决方案2】:

    正如克劳斯·易卜生所指出的,这里的关键是完成 UnitOfWork (UoW)。现在我的事件驱动消费者看起来像这样:

    final Consumer consumer = endpoint.createConsumer(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            String message = exchange.getIn().getBody(String.class);
    
            LOG.info("Message processed: " + message);
    
            ConsumerTemplate consumerTemplate = camelContext.createConsumerTemplate();
            consumerTemplate.doneUoW(exchange);
        }
    });
    

    另外,创建端点时必须使用delete=true 选项:

    Endpoint endpoint = camelContext.getEndpoint("file://test?delete=true");
    

    【讨论】:

      猜你喜欢
      • 2014-04-08
      • 1970-01-01
      • 1970-01-01
      • 2020-08-11
      • 1970-01-01
      • 1970-01-01
      • 2019-12-27
      • 2019-03-28
      • 2011-01-08
      相关资源
      最近更新 更多