【问题标题】:one SCDF source, 2 processors but only 1 processes each item一个 SCDF 源,2 个处理器,但每个项目只有 1 个进程
【发布时间】:2019-01-15 00:16:47
【问题描述】:

我的用例是这个的变体:

Create Stream with one source, two parallel processors and one sink in Spring Cloud Data Flow

在示例中,1 个源向rabbitmq 发出一个项目,两个 处理器都得到它。

我想要相反。我希望源向 rabbitmq 发出项目,但每个项目只有 1 个处理器处理。

假设我有:

1 来源命名来源 2个处理器,分别命名为processor1和processor2

所以源发出:A,B,C到rabbitmq

RabbitMQ 会发出 A

首先获得 A 的处理器将处理它 - 假设处理器 1 是幸运的处理器并处理 A。

那么 RabbitMQ 会发出 B

由于处理器 1 忙于处理 A 而处理器 2 空闲,处理器 2 处理 B

RabbitMQ 将发出 C

处理器 1 完成了 A 并且处于空闲状态,因此处理器 1 处理 C

我想出的Spring Cloud数据流图是:

处理器A是最上面的,处理器B是下面的

当我部署并运行它时,源发出 A、B 和 C,然后 处理器 1 和处理器 2 都接收 A、B 和 C

如果我想要的行为是我可以在 Spring Cloud Data Flow 中发生的事情,或者如果有一个 RabbitMQ 设置,如消息删除的答案所暗示的那样,我会感到困惑

“当您设置自动确认标志时发生的情况。这样,消息在被消费后立即被确认 - 所以从队列中消失了。”

如果是这样,我可以在我的 Spring Cloud Data Flow 源中设置它还是它是一个 RabbitMQ 设置还是完全是别的东西

更新:

我已经添加了

spring.cloud.stream.bindings.input.group=consumerGroup

到我的处理器的 application.properties 文件。

很遗憾,两个处理器接收的数据完全相同。

我是否需要在我的源代码的 application.properties 中添加类似的条目?

我需要更改处理器上的注释吗?目前是:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

我是否需要以任何方式修改源上的注释?目前是:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = 
     @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

包含@Poller 是否会以任何方式改变这一点?

更新:

属性是否命名为 spring.cloud.stream.instanceCount?

【问题讨论】:

标签: rabbitmq spring-cloud-stream spring-cloud-dataflow


【解决方案1】:

对于流式应用,您需要设置 ...consumer.group 属性,以便它们都在同一个组中并竞争消息。

但这应该使用 SCDF 自动发生。

【讨论】:

  • 它是否适用于任何 SCDF 源应用程序或仅适用于名为“兔子”的应用程序。
  • 它发生在处理器消费者身上,而不是来源。
  • 明白。您能否确认来源不一定是兔子源应用程序?
  • 来源无关。看看绑定到它们的交换和队列。
  • 知道了。早上会做第一件事。非常感谢。
猜你喜欢
  • 2021-01-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-09-14
  • 1970-01-01
  • 2012-12-16
相关资源
最近更新 更多