【发布时间】:2020-05-27 09:23:17
【问题描述】:
我的 Spring Cloud Stream 应用程序使用 Kafka Streams Binder 具有以下属性:
spring.cloud.stream.bindings:
windowStream-in-0:
destination: input
windowStream-out-0:
destination: window1
hint1Stream-in-0:
destination: window1
hint1Stream-out-0:
destination: hints
realityStream-in-0:
destination: input
realityStream-in-1:
destination: window1
consumer:
timestampExtractorBeanName: anotherTimestampExtractor
realityStream-out-0:
destination: hints
countStream-in-0:
destination: hints
spring.cloud.stream.kafka.streams:
default:
consumer:
timestampExtractorBeanName: timestampExtractor
binder:
functions:
windowStream:
applicationId: mock-stream-window1
hint1Stream:
applicationId: mock-stream-hints
realityStream:
applicationId: mock-stream-reality
countStream:
applicationId: mock-stream-count
stateStoreRetry:
maxAttempts: 3
backOffInterval: 1000
configuration:
schema.registry.url: mock://mock-stream-registry
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
commit.interval.ms: 100
我要做的是对所有流使用“timestampExtractor”,除了一个,称为“realityStream”。
为此,我将spring.cloud.stream.kafka.streams.default.consumer.timestampExtractorBeanName 设置为timestampExtractor,然后尝试通过设置spring.cloud.stream.bindings.realityStream-in-1.consumer.timestampExtractorBeanName 为“realityStream”“覆盖”它
不幸的是,我的覆盖似乎不起作用,因为只有“timestampExtractor”被调用,正如我在调试器(和我的测试结果)中看到的那样。
我是否应用了错误的配置,或者有错误的期望?
这是我的单个 Spring Cloud Streams 应用程序中流的图片:
(橙色圆圈是我要应用非默认时间戳提取器的地方)
【问题讨论】:
标签: apache-kafka-streams spring-cloud-stream