【发布时间】:2019-09-21 19:27:43
【问题描述】:
我正在编写一个 Spring Boot (2.1.4) 应用程序,尝试将 Spring Cloud Streams 用于 Kafka。
我想要做的是维护一个主题的传感器列表(“传感器”)。 OTOH,我有关于另一个主题(“数据”)的传入数据。我想要实现的是,当我获取我还没有的传感器的数据时,我想将它添加到传感器列表中。
为此,我从传感器主题创建了一个KTable<String, Sensor>,将温度主题映射到纯传感器的数据(在本例中为它的名称),并使用ValueJoiner 进行外部连接,如果出现则保留传感器存在,否则使用读数的传感器。然后,我将结果写回传感器主题。
KTable<String, Sensor> sensorTable = ...;
KStream<String, SensorData> sensorDataStream = ...;
// get sensors providing measurements
KTable<String, Sensor> sensorsFromData =
sensorDataStream.groupByKey()
.aggregate(
Sensor::new,
(k, v, s) -> {
s.setName(k);
return s;
},
Materialized.with(Serdes.String(), SensorSerde.SERDE));
// join both sensor tables, preferring the existing ones
KTable<String, Sensor> joinedSensorTable =
sensorTable.outerJoin(
sensorsFromData,
// only use sensors from measurements if sensor not already present
(ex, ft) -> (ex != null) ? ex : ft,
Materialized.<String, Sensor, KeyValueStore<Bytes, byte[]>>as(SENSORS_TABLE)
.withKeySerde(Serdes.String()).withValueSerde(SensorSerde.SERDE));
// write to new topic for downstream services
joinedSensorTable.toStream();
如果我使用 StreamBuilder 创建它,这很好用 - 即如果 sensorTable 和 sensorDataStream 来自 builder.table("sensors", Consumed.with(Serdes.String(), SensorSerde.SERDE)) 之类的东西。
但是,我正在尝试为此使用 Spring Stream Binding,即上面的代码包含在
@Configuration
@EnableBinding(SensorTableBinding.class)
class StreamConfiguration {
static final String SENSORS_TABLE = "sensors-table";
@StreamListener
@SendTo("sensorsOut")
private KStream<String, Sensor> getDataFromData
(@Input("sensors") KTable<String, Sensor> sensorTable,
@Input("data") KStream<String, SensorData> sensorDataStream) {
// ...
return joinedSensorTable.toStream();
}
}
有一个
interface SensorTableBinding {
@Input("sensors")
KTable<String, Sensor> sensorStream();
@Output("sensorsOut")
KStream<String, Sensor> sensorOutput();
@Input("data")
KStream<String, SensorData> sensorDataStream();
}
这里是application.properties的spring stream部分:
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.binder.brokers: ${spring.kafka.bootstrap-servers}
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset: latest
spring.cloud.stream.kafka.binder.bindings.sensors.group: sensor-service
spring.cloud.stream.kafka.binder.bindings.sensors.destination: sensors
spring.cloud.stream.kafka.binder.bindings.sensorsOut.destination: sensors
spring.cloud.stream.kafka.binder.data.group: sensor-service
spring.cloud.stream.kafka.binder.data.destination: data
流初始化良好,并执行连接(键值存储正确填充),但是,生成的流永远不会写入“传感器”主题。
为什么?我错过了什么吗?
另外:我确信有一种更好的方法可以使用现有的 Serde 将我的对象从/序列化到 JSON,而不是必须声明我自己的类来添加到处理中 (SensorSerde/SensorDataSerde是ObjectMapper) 的瘦委托包装器吗?
【问题讨论】:
-
不确定是什么问题。如果您可以将它们放在一个可以独立运行的小样本中,我们可以进一步了解。看起来像是配置相关的东西。
-
@sobychacko 是的,我想通了;如果您有兴趣,请查看我给出的答案。
标签: spring spring-boot apache-kafka spring-cloud-stream