【问题标题】:timestampExtractorBeanName setting in the Spring Cloud Stream application doesn't override default valueSpring Cloud Stream 应用程序中的 timestampExtractorBeanName 设置不会覆盖默认值
【发布时间】:2020-05-27 09:23:17
【问题描述】:

我的 Spring Cloud Stream 应用程序使用 Kafka Streams Binder 具有以下属性:

spring.cloud.stream.bindings:
  windowStream-in-0:
    destination: input
  windowStream-out-0:
    destination: window1
  hint1Stream-in-0:
    destination: window1
  hint1Stream-out-0:
    destination: hints
  realityStream-in-0:
    destination: input
  realityStream-in-1:
    destination: window1
    consumer:
      timestampExtractorBeanName: anotherTimestampExtractor
  realityStream-out-0:
    destination: hints
  countStream-in-0:
    destination: hints

spring.cloud.stream.kafka.streams:
  default:
    consumer:
      timestampExtractorBeanName: timestampExtractor
  binder:
    functions:
      windowStream:
        applicationId: mock-stream-window1
      hint1Stream:
        applicationId: mock-stream-hints
      realityStream:
        applicationId: mock-stream-reality
      countStream:
        applicationId: mock-stream-count
    stateStoreRetry:
      maxAttempts: 3
      backOffInterval: 1000
    configuration:
      schema.registry.url: mock://mock-stream-registry
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      commit.interval.ms: 100

我要做的是对所有流使用“timestampExtractor”,除了一个,称为“realityStream”。

为此,我将spring.cloud.stream.kafka.streams.default.consumer.timestampExtractorBeanName 设置为timestampExtractor,然后尝试通过设置spring.cloud.stream.bindings.realityStream-in-1.consumer.timestampExtractorBeanName 为“realityStream”“覆盖”它

不幸的是,我的覆盖似乎不起作用,因为只有“timestampExtractor”被调用,正如我在调试器(和我的测试结果)中看到的那样。

我是否应用了错误的配置,或者有错误的期望?

这是我的单个 Spring Cloud Streams 应用程序中流的图片:

(橙色圆圈是我要应用非默认时间戳提取器的地方)

【问题讨论】:

    标签: apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    覆盖在错误的位置;它需要在....kafka.streams.bindings.realityStream-in-1.... 属性下。

    这是 kafka 特有的属性;您可以在通用绑定属性中使用它(所有绑定器通用)。

    【讨论】:

    • 这就是问题所在,谢谢! Spring Cloud Stream 及其对 Kafka Streams 的支持,尤其是在 3.0.x 版本中非常简洁方便,赞!
    猜你喜欢
    • 2017-01-26
    • 2016-05-07
    • 2016-06-22
    • 2015-06-30
    • 2018-07-09
    • 2021-10-24
    • 1970-01-01
    • 1970-01-01
    • 2015-09-12
    相关资源
    最近更新 更多