【发布时间】:2019-01-15 00:16:47
【问题描述】:
我的用例是这个的变体:
Create Stream with one source, two parallel processors and one sink in Spring Cloud Data Flow
在示例中,1 个源向rabbitmq 发出一个项目,两个 处理器都得到它。
我想要相反。我希望源向 rabbitmq 发出项目,但每个项目只有 1 个处理器处理。
假设我有:
1 来源命名来源 2个处理器,分别命名为processor1和processor2
所以源发出:A,B,C到rabbitmq
RabbitMQ 会发出 A
首先获得 A 的处理器将处理它 - 假设处理器 1 是幸运的处理器并处理 A。
那么 RabbitMQ 会发出 B
由于处理器 1 忙于处理 A 而处理器 2 空闲,处理器 2 处理 B
RabbitMQ 将发出 C
处理器 1 完成了 A 并且处于空闲状态,因此处理器 1 处理 C
我想出的Spring Cloud数据流图是:
处理器A是最上面的,处理器B是下面的
当我部署并运行它时,源发出 A、B 和 C,然后 处理器 1 和处理器 2 都接收 A、B 和 C
如果我想要的行为是我可以在 Spring Cloud Data Flow 中发生的事情,或者如果有一个 RabbitMQ 设置,如消息删除的答案所暗示的那样,我会感到困惑
“当您设置自动确认标志时发生的情况。这样,消息在被消费后立即被确认 - 所以从队列中消失了。”
如果是这样,我可以在我的 Spring Cloud Data Flow 源中设置它还是它是一个 RabbitMQ 设置还是完全是别的东西
更新:
我已经添加了
spring.cloud.stream.bindings.input.group=consumerGroup
到我的处理器的 application.properties 文件。
很遗憾,两个处理器接收的数据完全相同。
我是否需要在我的源代码的 application.properties 中添加类似的条目?
我需要更改处理器上的注释吗?目前是:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
我是否需要以任何方式修改源上的注释?目前是:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller =
@Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
包含@Poller 是否会以任何方式改变这一点?
更新:
属性是否命名为 spring.cloud.stream.instanceCount?
【问题讨论】:
标签: rabbitmq spring-cloud-stream spring-cloud-dataflow