【问题标题】:Spring Cloud StreamListener @Output KStream Serdes dont seem to workSpring Cloud StreamListener @Output KStream Serdes 似乎不起作用
【发布时间】:2019-02-25 11:28:44
【问题描述】:

我有一个流监听器

@StreamListener(target = "requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
    // Predicate<UUID, Event> isAccount = (key, value) ->
    // value.getEntity().getClass().equals(Account.class);

    // @formatter:off
    return events
            //.filter(isAccount)
            .peek((key, value) -> {
                log.debug("Processing {} {}", key, value);
            });
            /*
            .filter(isAccount)
            .map((key, value) -> process(value))

            .peek((key, value) -> {
                log.debug("Processed {} {}", key, value);
            });
            */
    // @formatter:on

}

@Input("requesti") 配置如下;

spring.cloud.stream.kafka.streams.bindings.requesti.consumer.application-id=repo-event-consumer
spring.cloud.stream.bindings.requesti.destination=request
spring.cloud.stream.bindings.requesti.content-type=application/json
spring.cloud.stream.bindings.requesti.consumer.header-mode=raw

@output("responseo") 配置如下

spring.cloud.stream.kafka.streams.bindings.responseo.consumer.application-id=repo-response-producer
spring.cloud.stream.bindings.responseo.destination=response
spring.cloud.stream.bindings.responseo.content-type=application/json
spring.cloud.stream.bindings.responseo.producer.header-mode=raw
spring.cloud.stream.bindings.responseo.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.responseo.producer.key-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.bindings.responseo.producer.value-serde=org.springframework.kafka.support.serializer.JsonSerde

我的处理器收到请求,也可以发送输出,但输出如下

[Producer clientId=repo-event-consumer-49827b40-2357-4af0-8103-228343faa59e-StreamThread-1-producer] 发送记录ProducerRecord(topic=response, partition=null, headers= RecordHeaders(headers = [RecordHeader(key = TypeId, value = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116 , 46, 101, 115, 46, 99, 117, 98, 101, 46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116 , 46, 109, 111, 100, 101, 108, 46, 65, 99, 99, 111, 117, 110, 116])], isReadOnly = true), key=[B@6a5e4294, value=[B@5a0852e1 , timestamp=1551093349173) 回调 org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1@336dbba5 到主题响应分区 2

我对 Producer Record id 感到困惑的几件事不是“repo-response-producer”,其次没有使用 key-serde/value-serde,在我看来应该是

发送记录ProducerRecord(topic=request, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = Key_TypeId, value = [106, 97, 118, 97, 46, 117, 116, 105, 108, 46, 85, 85, 73, 68]), RecordHeader(key = TypeId, value = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116, 46, 101, 115, 46, 99, 117, 98, 101, 46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116, 46, 109, 111, 100, 101, 108, 46, 65, 99, 99, 111, 117, 110, 116])], isReadOnly = true), key=6f0f50e2-3add-4d22-a370-cac66d016af0, value=Account() with callback org.springframework.kafka.core.KafkaTemplate$$Lambda$582/533392019@85ab964 to topic request partition 2

默认的serdeConfig是

    spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde

Repo

【问题讨论】:

  • 您好,您不需要在输出中设置应用程序 ID。您在输入上设置的那个适用于整个处理器(输入和输出)。至于您的第二个Serde 问题,我不确定通过查看输出会发生什么。您是否在控制台上看到消息Native encoding is enabled for responseo...?你能给我们一个小样本,我们可以在其中重现问题吗?
  • spring.cloud.stream.bindings.reponseo.use-native-encoding=true 已启用,我现在正在摸索如何添加类型信息 @OutPut 及其 KStream 以允许 ResponseHandler KStream 成为能够识别各种 Json Payload 到 Java 类型。
  • 有没有机会提供一个小型可重复的独立应用程序?
  • 有没有一种方法可以将分区关联到处理器让我们说基于类型名称的 hashCode 将整理一个分区中的所有类似消息,如果我可以指示 Kstream 仅从该分区读取并响应响应主题的分区号相同。
  • 你有空聊天吗,我很绝望

标签: spring-cloud-stream spring-kafka


【解决方案1】:

这是一个示例,演示了 JsonSerde 使用 Kafka Streams 绑定器在出站上的工作:https://github.com/schacko-samples/json-serde-example。 运行示例并确保其正常工作。 查看application.yml 了解配置详情。我在提供的README 中添加了一些细节。

【讨论】:

  • 我可以将流uuid,模型转换为ktable而不是减少或分组
  • 我早上试试
  • 应该是可以的。请使用我提供的示例来查看它是否有效。这样,我们就很容易调试。
  • 我尝试了绑定:requesti.destination: data-in responseo: destination: data-out producer: use-native-encoding: true partition-count: 10 partition-key-extractor-name: customExtractor
  • 我的假设是使用自定义 keyExtractor 响应将被发送到一个唯一的分区,同样我希望流处理器只从请求主题的一个分区中读取
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-05-16
  • 1970-01-01
  • 1970-01-01
  • 2021-06-17
  • 2014-12-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多