【发布时间】:2019-02-25 11:28:44
【问题描述】:
我有一个流监听器
@StreamListener(target = "requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
// Predicate<UUID, Event> isAccount = (key, value) ->
// value.getEntity().getClass().equals(Account.class);
// @formatter:off
return events
//.filter(isAccount)
.peek((key, value) -> {
log.debug("Processing {} {}", key, value);
});
/*
.filter(isAccount)
.map((key, value) -> process(value))
.peek((key, value) -> {
log.debug("Processed {} {}", key, value);
});
*/
// @formatter:on
}
@Input("requesti") 配置如下;
spring.cloud.stream.kafka.streams.bindings.requesti.consumer.application-id=repo-event-consumer
spring.cloud.stream.bindings.requesti.destination=request
spring.cloud.stream.bindings.requesti.content-type=application/json
spring.cloud.stream.bindings.requesti.consumer.header-mode=raw
@output("responseo") 配置如下
spring.cloud.stream.kafka.streams.bindings.responseo.consumer.application-id=repo-response-producer
spring.cloud.stream.bindings.responseo.destination=response
spring.cloud.stream.bindings.responseo.content-type=application/json
spring.cloud.stream.bindings.responseo.producer.header-mode=raw
spring.cloud.stream.bindings.responseo.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.responseo.producer.key-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.bindings.responseo.producer.value-serde=org.springframework.kafka.support.serializer.JsonSerde
我的处理器收到请求,也可以发送输出,但输出如下
[Producer clientId=repo-event-consumer-49827b40-2357-4af0-8103-228343faa59e-StreamThread-1-producer] 发送记录ProducerRecord(topic=response, partition=null, headers= RecordHeaders(headers = [RecordHeader(key = TypeId, value = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116 , 46, 101, 115, 46, 99, 117, 98, 101, 46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116 , 46, 109, 111, 100, 101, 108, 46, 65, 99, 99, 111, 117, 110, 116])], isReadOnly = true), key=[B@6a5e4294, value=[B@5a0852e1 , timestamp=1551093349173) 回调 org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1@336dbba5 到主题响应分区 2
我对 Producer Record id 感到困惑的几件事不是“repo-response-producer”,其次没有使用 key-serde/value-serde,在我看来应该是
发送记录ProducerRecord(topic=request, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = Key_TypeId, value = [106, 97, 118, 97, 46, 117, 116, 105, 108, 46, 85, 85, 73, 68]), RecordHeader(key = TypeId, value = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116, 46, 101, 115, 46, 99, 117, 98, 101, 46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116, 46, 109, 111, 100, 101, 108, 46, 65, 99, 99, 111, 117, 110, 116])], isReadOnly = true), key=6f0f50e2-3add-4d22-a370-cac66d016af0, value=Account() with callback org.springframework.kafka.core.KafkaTemplate$$Lambda$582/533392019@85ab964 to topic request partition 2
默认的serdeConfig是
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
【问题讨论】:
-
您好,您不需要在输出中设置应用程序 ID。您在输入上设置的那个适用于整个处理器(输入和输出)。至于您的第二个
Serde问题,我不确定通过查看输出会发生什么。您是否在控制台上看到消息Native encoding is enabled for responseo...?你能给我们一个小样本,我们可以在其中重现问题吗? -
spring.cloud.stream.bindings.reponseo.use-native-encoding=true 已启用,我现在正在摸索如何添加类型信息 @OutPut 及其 KStream 以允许 ResponseHandler KStream 成为能够识别各种 Json Payload 到 Java 类型。
-
有没有机会提供一个小型可重复的独立应用程序?
-
有没有一种方法可以将分区关联到处理器让我们说基于类型名称的 hashCode 将整理一个分区中的所有类似消息,如果我可以指示 Kstream 仅从该分区读取并响应响应主题的分区号相同。
-
你有空聊天吗,我很绝望
标签: spring-cloud-stream spring-kafka