【问题标题】:Getting Class Cast exception in Kafka Stream API在 Kafka Stream API 中获取 Class Cast 异常
【发布时间】:2018-05-14 03:22:19
【问题描述】:

我将输入数据生成为 json 字符串。

主题 - 我的输入

{"time":"2017-11-28T09:42:26.776Z","name":"Lane1","oclass"
     :"myClass","id":112553,"Scope":"198S"}

我的班级是这样的:

public class App {
    static public class CountryMessage {

        public String time;
        public String Scope;
        public String id;
        public String oclass;
        public String name; 
    }

    private static final String APP_ID = "countries-streaming-analysis-app";

    public static void main(String[] args) {
        System.out.println("Kafka Streams Demonstration");


        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();

        Map < String, Object > serdeProps = new HashMap < > ();
        final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageSerializer.configure(serdeProps, false);

        final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageDeserializer.configure(serdeProps, false);
        final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer,countryMessageDeserializer);

        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream<String, CountryMessage> countriesStream = kStreamBuilder.stream(stringSerde, countryMessageSerde, "vanitopic");

        KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();

        KTable<Windowed<String>, Long> aggregatedStream = countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");

        System.out.println("Starting Kafka Streams Countries Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
        kafkaStreams.start();
        System.out.println("Now started CountriesStreams Example");
    }

    private static Properties getProperties() {
        Properties settings = new Properties();

        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.106.9.235:9092,10.106.9.235:9093,10.106.9.235:9094");
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.106.9.235:2181");
        //settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return settings;
    }
}

我收到以下类强制转换异常:

线程异常 “国家-流媒体分析-app-f7f95119-4401-4a6e-8060-5a138ffaddb2-StreamThread-1” org.apache.kafka.streams.errors.StreamsException:异常捕获 过程。 taskId=0_0,处理器=KSTREAM-SOURCE-0000000000, 主题=vanitopic,分区=0,偏移量=2036 在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) 在 org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) 在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) 在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) 引起:org.apache.kafka.streams.errors.StreamsException:A 序列化程序(键: org.apache.kafka.common.serialization.ByteArraySerializer / 值: org.apache.kafka.common.serialization.ByteArraySerializer) 不是 兼容实际的键或值类型(键类型:java.lang.String / 值类型:com.cisco.streams.countries.App$CountryMessage)。改变 StreamConfig 中的默认 Serdes 或通过提供正确的 Serdes 方法参数。在 org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) 在 org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) 在 org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) 在 org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) 在 org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) 在 org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) 在 org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) 在 org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) 在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) ... 3 更多原因:java.lang.ClassCastException: java.lang.String 不能转换为 [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) 在 org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:88) 在 org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76) 在 org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) ... 16 更多

我需要帮助来了解如何以及在何处应用我创建的自定义 Serdes

【问题讨论】:

  • 您是否尝试过在 getProperties 方法中使用 stringSerde 和 countryMessageSerde(如 StreamsConfig.KEY_SERDE_CLASS_CONFIG 和 StreamsConfig.VALUE_SERDE_CLASS_CONFIG)?另外你使用什么版本的 Kafka Streams?
  • 您需要将自定义 Serdes 应用于与 StreamsConfig 中的默认 Serdes 不匹配的每个运算符。查看文档:docs.confluent.io/3.3.1/streams/developer-guide/dsl-api.html
  • 是的,我正在做这个 settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, CountryMessage.class);但仍然出现错误。我正在使用 0.11.0.0。
  • 请帮我解决这个问题。我几乎尝试了所有方法,但遇到了同样的错误。

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

将序列化程序添加到 groupByKey

KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey(Grouped.with(Serdes.String(), new ObjectMapperSerde<>(CountryMessage.class)));

【讨论】:

  • 该死的已经 6 小时了
【解决方案2】:

在您的代码中,

KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();

groupByKey() 需要同时设置两个序列化器,因为这将触发数据重新分区。或者您将默认 Serded 设置为 StringCountryMessage 类型。

正如我在评论中提到的,每个操作员如果不使用来自StreamsConfig 的默认 Serdes,则需要设置正确的 Serdes。

因此,count() 操作也需要指定对应的StringLong Serdes:

countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");

所有可能需要Serdes 的运算符都有适当的重载。只需检查您正在使用的所有运算符的所有重载。

查看文档了解更多详情:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-09-25
    • 2014-10-23
    • 1970-01-01
    • 2019-02-15
    • 1970-01-01
    • 2018-08-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多