【问题标题】:Serde error in Kafka Streams process with Spring Cloud Stream使用 Spring Cloud Stream 的 Kafka Streams 进程中的 Serde 错误
【发布时间】:2020-07-11 05:01:29
【问题描述】:

我正在尝试使用 Spring Cloud Stream 3.0.3.RELEASE 处理一些 Kafka 记录,但是一旦记录进入流管道,我的 Serdes 配置就会出错。

这是堆栈跟踪:

30-03-2020 19:28:33 ERROR org.apache.kafka.streams.KafkaStreams              [application-local,,,]: stream-client [joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a] All stream threads have died. The instance will be in error state and should be closed.
Exception in thread "joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=moaii.security.pe.incidence.queue, partition=0, offset=18108, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 35 more

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:380)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
    ... 5 more
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 35 more

这是我的功能:

@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, ?>> joinPriorityData() {
    return incidenceStream -> incidenceStream
            .toStream()
            .filter((key, value) -> filterZoneTypeAndPriority(value))
            .selectKey((key, value) -> value.getsInc())
            .mapValues((readOnlyKey, value) -> stateMapper.toIncidendeState(value, null));
}

这是我的 application.yml:

  spring.json.value.default.type: RawAccounting
  spring.cloud.stream:
    function.definition: joinPriorityData
    bindings:
      joinPriorityData-in-0:
        destination: moaii.security.pe.incidence.queue
        consumer.valueSerde: IncidenceItemSerde
      joinPriorityData-out-0:
        destination: moaii.security.pe.incidence.state
        producer.valueSerde: IncidenceItemSerde
    kafka:
      binder:
        configuration:
          auto.commit.interval.ms: 100
          auto.offset.reset: latest
      streams:
        binder:
          applicationId: moaii-cep
          content-type: application/json
          configuration:
            default:
              key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              value.serde:  org.springframework.kafka.support.serializer.JsonSerde
              useNativeDecoding: true

我从很多地方尝试了很多不同的配置,但任何一种似乎都有效。正如您在堆栈跟踪中看到的那样,默认值 Serdes 被忽略,而是使用 ByteArray。

我也创建了一些自定义 serdes,将它们声明为 bean 并在配置文件中使用,如图所示,但结果相同。

无论如何,我感觉其他配置也被忽略了,例如,我可以看到 Ktable 如何使用 null 键跳过一些旧记录,然后在读取第一个非空记录时失败,即使我有 auto.offset.reset: latest

不能说是 Kafka 还是 Spring 的问题,但无法解决

编辑: 在此日志中,您可以看到活页夹如何为入站而不是出站获取正确的 serde:

31-03-2020 11:54:24 INFO  o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor  [application-local,,,]: Key Serde used for joinPriorityData-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
31-03-2020 11:54:24 INFO  o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor  [application-local,,,]: Value Serde used for joinPriorityData-in-0: com.vcp.moaii.cep.broker.serde.MoaiiSerdes$IncidenceItemSerde

31-03-2020 11:54:27 INFO  o.s.c.stream.binder.kafka.streams.KStreamBinder    [application-local,,,]: Key Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$StringSerde    
31-03-2020 11:54:27 INFO  o.s.c.stream.binder.kafka.streams.KStreamBinder    [application-local,,,]: Value Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$ByteArraySerde

如您所见,在出站中甚至没有提到绑定“joinPriorityData-out-0”

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams spring-cloud-stream confluent-platform


    【解决方案1】:

    不知道问题是否与您的配置和功能方法有关。试试下面的功能和配置,看看有没有什么不同。

    @Bean
    public Function<KTable<String, IncidenceItem>, KStream<String, IncidenceState>> 
      //same as your original code
    }
    

    请注意,我为出站KStream 添加了参数化类型。这是绑定器正确推断要使用的Serde 类型所必需的。假设IncidenceItemIncidenceState 都是JSON 友好对象,您可以提供任何默认的serdes。但是,如果您的内部逻辑需要依赖这些 Serdes,您仍然需要提供它们。以下是修改后的配置。我删除了不必要的属性或重新排列它们。

    spring.json.value.default.type: RawAccounting
      spring.cloud.stream:
        function.definition: joinPriorityData
        bindings:
          joinPriorityData-in-0:
            destination: moaii.security.pe.incidence.queue
          joinPriorityData-out-0:
            destination: moaii.security.pe.incidence.state
        kafka:
          streams:
            binder:
              applicationId: moaii-cep
              configuration:
                auto.commit.interval.ms: 100
                auto.offset.reset: latest
                default:
                  key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                  value.serde:  org.springframework.kafka.support.serializer.JsonSerde
    
    

    我不确定您是否同时需要 Kafka 和 Kafka Streams 活页夹。从您共享的设置来看,情况并非如此。因此,将所有配置移至 Kafka Streams binder 配置下。

    看看这些变化是否有什么不同。

    【讨论】:

    • 我之前注意到我必须从输出参数中删除通配符,但我忘记在这里发布它,不是问题。但是配置就像一个魅力!似乎 Kafka 和 Kafka Streams 绑定器会覆盖一些彼此的配置。我真的只使用流,但我认为配置两者都是必要的。所有@sobychacko 的坦克!
    • 无论如何我将需要通配符用于以后的流处理,因为我用两个不同的输出类分支了一个流,所以我的输出 KStream 是KStream&lt;String, ?&gt;[]。我会遇到同样的问题吗?
    • 如果您无法指定该输出类型,请确保在配置中明确指定 Serdes。
    • 我在该过程中将通配符更改为“对象”,它可以正常工作。谢谢!
    【解决方案2】:

    IncidenceState 使用两个不同的类加载器加载了两次。它使用应用程序(系统类加载器)加载一次,然后使用引导类加载器加载一次。

    这是一个 Kafka 问题。检查这个 https://discuss.kotlinlang.org/t/classloading-error-with-kafka-streams/4547 还有这个https://youtrack.jetbrains.com/issue/KT-24966

    有一个关于如何修复它的工作。

    【讨论】:

    • 感谢您的帮助!问题是我的配置。我猜想同时配置 Kafka 和 Kafka 流绑定器会产生冲突并触发此类加载器问题。
    猜你喜欢
    • 1970-01-01
    • 2020-12-22
    • 1970-01-01
    • 1970-01-01
    • 2021-10-26
    • 1970-01-01
    • 1970-01-01
    • 2019-05-27
    • 1970-01-01
    相关资源
    最近更新 更多