【发布时间】:2020-11-03 03:26:21
【问题描述】:
请参阅下面的更新以显示潜在的解决方法
我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试过程中,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。
在启动应用程序之前,我们使用多个分区配置测试了这一理论。使用 1 个分区,我们可以看到 100% 的消息。使用 2,我们看到一些消息(少于 50%)。有 10 个,我们几乎看不到任何(不到 10%)。
因为我们要加入,所以从主题 1 消费的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。似乎消息卡在从 Ktable 的外键连接创建的“中间”主题中,但没有错误消息。
任何帮助将不胜感激!
Service.java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
注意:由于 spring-cloud-stream 中包含的版本中存在错误,我们排除了 org.apache.kafka 依赖项
application.yml
spring:
application:
name: app-name
stream:
bindings:
process-in-0:
destination: topic1
group: ${spring.application.name}
process-in-1:
destination: topic2
group: ${spring.application.name}
process-out-0:
destination: outputTopic
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BROKERS}
configuration:
commit.interval.ms: 1000
producer:
acks: all
retries: 20
default:
key:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
min-partition-count: 2
测试场景:
举个具体的例子,如果我将以下 3 条消息发布到主题 1:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
输出主题只会收到2条消息。
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
另外两个怎么了?似乎某些键/值对无法写入输出主题。重试这些“丢失”的消息也不起作用。
更新:
通过将主题 1 用作 KStream 而不是 KTable 并在继续执行 KTable-KTable 连接之前调用 toTable(),我能够正常运行。我仍然不确定为什么我的原始解决方案不起作用,但希望这种解决方法可以对实际问题有所了解。
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.map(...)
.toTable()
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
【问题讨论】:
-
奇怪的是,使用
KSteam#toTable()会改变任何东西。你可以分享两个程序的拓扑描述来比较它们吗?可以提供一些启示。 -
@MatthiasJ.Sax 事实证明,
KStream#map()和KStream#toTable()的组合是使用多个分区时的诀窍。重申一下,这在 1 个分区上可以正常工作,但是当我们尝试多个分区时,它只有在我们作为 KStream 消费然后通过映射键/值强制它重新分区时才有效。
标签: java spring-boot apache-kafka apache-kafka-streams spring-cloud-stream