【问题标题】:Spring Kafka Stream doesn't get writtenSpring Kafka Stream 没有被写入
【发布时间】:2019-09-21 19:27:43
【问题描述】:

我正在编写一个 Spring Boot (2.1.4) 应用程序,尝试将 Spring Cloud Streams 用于 Kafka。

我想要做的是维护一个主题的传感器列表(“传感器”)。 OTOH,我有关于另一个主题(“数据”)的传入数据。我想要实现的是,当我获取我还没有的传感器的数据时,我想将它添加到传感器列表中。

为此,我从传感器主题创建了一个KTable<String, Sensor>,将温度主题映射到纯传感器的数据(在本例中为它的名称),并使用ValueJoiner 进行外部连接,如果出现则保留传感器存在,否则使用读数的传感器。然后,我将结果写回传感器主题。

KTable<String, Sensor> sensorTable = ...;
KStream<String, SensorData> sensorDataStream = ...;

// get sensors providing measurements
KTable<String, Sensor> sensorsFromData =
        sensorDataStream.groupByKey()
                .aggregate(
                        Sensor::new,
                        (k, v, s) -> {
                            s.setName(k);
                            return s;
                        },
                        Materialized.with(Serdes.String(), SensorSerde.SERDE));

// join both sensor tables, preferring the existing ones
KTable<String, Sensor> joinedSensorTable =
        sensorTable.outerJoin(
                sensorsFromData,
                // only use sensors from measurements if sensor not already present
                (ex, ft) -> (ex != null) ? ex : ft,
                Materialized.<String, Sensor, KeyValueStore<Bytes, byte[]>>as(SENSORS_TABLE)
                        .withKeySerde(Serdes.String()).withValueSerde(SensorSerde.SERDE));

// write to new topic for downstream services
joinedSensorTable.toStream();

如果我使用 StreamBuilder 创建它,这很好用 - 即如果 sensorTablesensorDataStream 来自 builder.table("sensors", Consumed.with(Serdes.String(), SensorSerde.SERDE)) 之类的东西。

但是,我正在尝试为此使用 Spring Stream Binding,即上面的代码包含在

@Configuration
@EnableBinding(SensorTableBinding.class)
class StreamConfiguration {
    static final String SENSORS_TABLE = "sensors-table";

    @StreamListener
    @SendTo("sensorsOut")
    private KStream<String, Sensor> getDataFromData
            (@Input("sensors") KTable<String, Sensor> sensorTable,
                    @Input("data") KStream<String, SensorData> sensorDataStream) {
        // ...
        return joinedSensorTable.toStream();
    }
}

有一个

interface SensorTableBinding {
    @Input("sensors")
    KTable<String, Sensor> sensorStream();

    @Output("sensorsOut")
    KStream<String, Sensor> sensorOutput();

    @Input("data")
    KStream<String, SensorData> sensorDataStream();
}

这里是application.properties的spring stream部分:

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

spring.cloud.stream.kafka.binder.brokers: ${spring.kafka.bootstrap-servers}
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset: latest

spring.cloud.stream.kafka.binder.bindings.sensors.group: sensor-service
spring.cloud.stream.kafka.binder.bindings.sensors.destination: sensors
spring.cloud.stream.kafka.binder.bindings.sensorsOut.destination: sensors

spring.cloud.stream.kafka.binder.data.group: sensor-service
spring.cloud.stream.kafka.binder.data.destination: data

流初始化良好,并执行连接(键值存储正确填充),但是,生成的流永远不会写入“传感器”主题。

为什么?我错过了什么吗?

另外:我确信有一种更好的方法可以使用现有的 Serde 将我的对象从/序列化到 JSON,而不是必须声明我自己的类来添加到处理中 (SensorSerde/SensorDataSerdeObjectMapper) 的瘦委托包装器吗?

【问题讨论】:

  • 不确定是什么问题。如果您可以将它们放在一个可以独立运行的小样本中,我们可以进一步了解。看起来像是配置相关的东西。
  • @sobychacko 是的,我想通了;如果您有兴趣,请查看我给出的答案。

标签: spring spring-boot apache-kafka spring-cloud-stream


【解决方案1】:

原来数据毕竟是写的,但是写错了主题,即sensorOut

原因是配置。而不是

spring.cloud.stream.kafka.binder.bindings.sensors.destination: sensors
spring.cloud.stream.kafka.binder.bindings.sensorsOut.destination: sensors

主题配置如下:

spring.cloud.stream.bindings.sensors.destination: sensors
spring.cloud.stream.bindings.sensorsOut.destination: sensors

对于传感器和数据主题,这无关紧要,因为绑定的名称与主题相同;但由于 Spring 找不到合适的输出目的地,它使用绑定的名称 sensorOut 并将数据写入那里。

请注意,围绕这些的整个配置设置非常混乱。各个项目都记录在案,但很难区分它们属于哪个配置前缀。查看源代码也无济于事,因为在那个级别传递的是Maps,在运行时去掉了前缀的键,所以很难判断数据来自哪里以及它将包含什么.

IMO 传递类似@ConfigurationProperties 的数据类真的很有帮助,这将使它更容易理解。

【讨论】:

    猜你喜欢
    • 2019-09-07
    • 1970-01-01
    • 2018-04-28
    • 1970-01-01
    • 2018-09-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多