【问题标题】:Spring Cloud Stream: @StreamListener processing messages twiceSpring Cloud Stream:@StreamListener 处理消息两次
【发布时间】:2019-06-09 23:07:43
【问题描述】:

我正在使用 Spring Cloud Stream (Edgware.SR5) 和 Spring Boot (1.5.10.RELEASE)。我的@StreamListener 每收到一条消息都会处理两次。

该示例的想法是在队列中发布消息并对其进行处理。

服务:

@EnableBinding(ExampleBindings.class)
@Service
public class ExampleService {

    @Publisher(channel = ExampleBindings.OUTPUT)
    public String queue(String message){
        return message;
    }

    @StreamListener(ExampleBindings.INPUT)
    public void dequeue(String message){
        System.out.println("New message: " + message);
    }
}

绑定:

public interface ExampleBindings {

    String INPUT = "input1";
    String OUTPUT = "output1";

    @Input(ExampleBindings.INPUT)
    SubscribableChannel input();

    @Output(ExampleBindings.OUTPUT)
    MessageChannel output();
}

application.properties:

spring.cloud.stream.default.group=group1
spring.cloud.stream.default.binder=binder1

spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1

spring.cloud.stream.binders.binder1.type=rabbit
spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host=localhost

配置(用于在测试中注入代理服务):

@Configuration
public class ExampleConfig {

    @Bean
    public PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor(){
         PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor =
            new PublisherAnnotationBeanPostProcessor();
        publisherAnnotationBeanPostProcessor.setProxyTargetClass(true);
        return publisherAnnotationBeanPostProcessor;
    }
}

测试:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ExampleServiceTest {

    @Autowired
    private ExampleService exampleService;

    @Test
    public void testQueue() throws InterruptedException {
        exampleService.queue("Hello!");
        Thread.sleep(1000);//Wait for message processing
        System.out.println("Ready!");
    }
}

我有以下输出:

17:19:10.230 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b received message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.231 [dest1.group1-1] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
17:19:10.231 [dest1.group1-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.232 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
17:19:10.232 [dest1.group1-2] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
Ready!

我不知道我的配置有什么问题,或者如果是一些错误,有什么建议吗?

谢谢!

已编辑:

我上传了一个(非)工作示例here

您可以使用以下方法创建 RabbitMQ 实例:

docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management

【问题讨论】:

  • 您为input1/output1 显示Processor,但我在日志中看到input2。它对你说了什么吗?
  • 我已经手动编辑了日志,因为我不想暴露业务逻辑,我会编辑它。
  • 现在已编辑。
  • 好的。我们可以在 GitHub 上的某个地方使用一些简单的示例吗?谢谢
  • 那些消息不一样。内容可能相同,但idamqp_consumerTag等都是不同的。因此,确实在某处发布了可重现的示例(您可以排除业务逻辑)

标签: spring-boot spring-integration spring-cloud spring-cloud-stream


【解决方案1】:

我认为,从配置来看,您正试图再次将相同的消息发布到相同的目的地dest_1

spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1

从日志中可以清楚地看出,第二条消息具有不同的 ID

id=788e8bbf-4ae4-86cc-0859-d4f153cb5807
id=2f22ce16-bb5a-350c-8b3d-e6c898760888

【讨论】:

  • 我认为 input1 是 dest_1.group1 队列,输出是 dest_1 交换。
  • 是的,确实它们是不同的 id,因为它们被发布了两次,谢谢!
【解决方案2】:

由于 ExampleConfig 中的配置,我检测到 @Publisher 发布了两次。这个新配置(借自here)似乎工作正常:

@Bean
public static BeanFactoryPostProcessor bfpp() {
    return bf -> bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
        PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
}

【讨论】:

  • 你不需要setProxyTargetClass(true)
  • 如果我删除了那个 Bean 配置,我有这个堆栈跟踪: 原因:java.lang.IllegalStateException:在 bean 目标类 'ExampleService' 上找到 @StreamListener 方法 'dequeue',但在任何接口中都找不到( s) 用于 bean JDK 代理。通过将 proxy-target-class/proxyTargetClass 属性设置为“true”,将方法拉到接口或切换到子类 (CGLIB) 代理
  • 这个问题的另一个答案是为您的PublisherAnnotationBeanPostProcessor 指定一个明确的 bean 名称,它就是:IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME。框架仍然创建一个默认的问题。所以,你的班级真的被代理了两次。
【解决方案3】:

我在调试模式 (intellij) 下运行我的应用程序,因此偏移量没有得到更新。尝试在运行模式下运行,它解决了我的问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-04-27
    • 2019-01-23
    • 1970-01-01
    • 2019-04-25
    • 1970-01-01
    • 2020-09-28
    • 2021-06-12
    相关资源
    最近更新 更多