【问题标题】:How to set ValueSerde per Binding Spring Cloud Kafka Streams如何为每个绑定 Spring Cloud Kafka Streams 设置 ValueSerde
【发布时间】:2021-03-15 18:14:29
【问题描述】:

我正在尝试为每个绑定设置 valueSerde,但只考虑默认 valueserde。

AppSerde 类

public class AppSerdes {

    public static final class DepartmentSerde extends WrapperSerde<Department> {
        public DepartmentSerde() {
            super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(Department.class));
        }
    }

    public static final class EmployeeSerde extends WrapperSerde<Employee> {
        public EmployeeSerde() {
            super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(Employee.class));
        }
    }

    public static final class DepartmentDataSerde extends WrapperSerde<DepartmentData> {
        public DepartmentDataSerde() {
            super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(DepartmentData.class));
        }
    }
}

StreamsConfig.java

@Configuration
@Slf4j
public class StreamsConfiguration {

    @Bean
    public BiFunction<KStream<String, Employee>, KStream<String, Department>, KStream<String, DepartmentData>> process() {
        return (Employee, Department) -> Employee.leftJoin(Department, (v1, v2) -> {
            if (v2 == null) {
                log.info("No Department is present");
                return null;
            } else {
                var data = DepartmentData.newBuilder();
                data.setId(v2.getId());
                data.setName(v2.getName());
                data.addEmployees(v1);
                return data.build();
            }
        }, JoinWindows.of(Duration.ofMinutes(1))).peek((k, v) -> {
            log.info("Key->{}, value->{}", k, v);
        });
    }
}

和application.yml

spring:
  application.name: kafka-join-example
spring.kafka.bootstrap-servers: 192.168.56.101:19092
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
spring.cloud.stream:
  bindings:
    process-in-0:
      destination: kp.sch
      consumer:
        use-native-decoding: true
    process-in-1:
      destination: kp.lm
      consumer:
        use-native-decoding: true
    process-out-0:
      destination: kp.lm.sch
      producer:
        use-native-encoding: true
  kafka.streams.bindings:
    process-in-0:
      consumer:
        value.serde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
    process-in-1:
      consumer:
        value.serde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
    process-out-0:
      producer:
        value.serde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
  kafka.streams.binder:
    brokers:
    - 192.168.56.101:19092
#    replication-factor: 3
#    required-acks: 2
    min-partition-count: 5
    configuration:
      commit.interval.ms: 100
      default:
        key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        value.serde: io.github.kprasad99.streams.kafka.serde.ProtobufSerde

非常感谢您对此的任何见解。


编辑

完整的示例代码是here

堆栈跟踪2020-12-06 21:55:39.929 ERROR 141897 --- [-StreamThread-1]

o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread [kafka-join-example-4aa10729-e5b7-4a33-89c6-906dd8ab2e5d-StreamThread-1] Failed to process stream task 1_4 due to the following error:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000001, topic=kp.department, partition=4, offset=1, stacktrace=org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45)
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:400) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45) ~[classes/:na]
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1) ~[classes/:na]
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]

2020-12-06 21:55:39.930 ERROR 141897 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [kafka-join-example-4aa10729-e5b7-4a33-89c6-906dd8ab2e5d-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000001, topic=kp.department, partition=4, offset=1, stacktrace=org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45)
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:400) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45) ~[classes/:na]
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1) ~[classes/:na]
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]

【问题讨论】:

    标签: spring-boot spring-cloud spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka


    【解决方案1】:

    我稍微清理了您的配置,这应该可以工作。如果没有,请创建一个小示例应用程序并分享,然后我们可以进一步查看。

    spring:
      application.name: kafka-join-example
    spring.kafka.bootstrap-servers: 192.168.56.101:19092
    spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
    spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
    spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
    spring.cloud.stream:
      bindings:
        process-in-0:
          destination: kp.sch
        process-in-1:
          destination: kp.lm
        process-out-0:
          destination: kp.lm.sch 
      kafka.streams.binder:
        brokers:
        - 192.168.56.101:19092
    #    replication-factor: 3
    #    required-acks: 2
        min-partition-count: 5
        configuration:
          commit.interval.ms: 100
          default:
            key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            value.serde: io.github.kprasad99.streams.kafka.serde.ProtobufSerde
    

    您还可以在您的应用程序中定义这些 bean,如下所示。

    @Bean
    public Serde<Department> departmentSerde() {
      return new DepartmentSerde();
    }
    
    // add the other two Serde beans.
    

    如果您在应用程序中定义了这些Serde bean,那么您不需要配置中对应的3 个valueSerde 属性,因为这些bean 具有优先权。

    【讨论】:

    • 根据建议更新,它适用于第一个参数(process-in-0),第二个参数(process-in-1)它仍然需要默认的serde,因此我收到错误,因为在默认的 serde 中我没有传递类型信息。代码示例在这里github.com/kprasad99/kafka-join-example
    • 我认为您遇到的问题与此 SO 线程中的问题相同:stackoverflow.com/questions/65003575/…
    • 在那里查看我的答案。此问题已在此处解决:github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/…。如果您可以更新到这个最新的快照,它应该可以正确推断出 Serde。
    • 我仍然收到上述错误,将代码更新到最新的快照版本,并确保该类上存在修复程序,仍然得到相同的错误....对于第二个参数默认值。 serde 被考虑。
    • 我得先看看,稍后再回复您。
    猜你喜欢
    • 1970-01-01
    • 2019-06-24
    • 1970-01-01
    • 2017-03-25
    • 1970-01-01
    • 1970-01-01
    • 2019-08-27
    • 1970-01-01
    • 2021-02-24
    相关资源
    最近更新 更多