【问题标题】:Avro serialization object not serializable issueAvro 序列化对象不可序列化问题
【发布时间】:2018-04-08 14:37:45
【问题描述】:

我正在尝试通过 spark 流 API 使用 Kafka 生成和使用 Avro 消息。但是 Avro 抛出对象而不是可序列化的异常。我尝试使用 AvroKey 包装器包装数据。尽管如此,它仍然无法正常工作。

生产者代码:

public static final String schema = "{"
    +"\"fields\": ["
    +   " { \"name\": \"str1\", \"type\": \"string\" },"
    +   " { \"name\": \"str2\", \"type\": \"string\" },"
    +   " { \"name\": \"int1\", \"type\": \"int\" }"
    +"],"
    +"\"name\": \"myrecord\","
    +"\"type\": \"record\""
    +"}"; 

    public static void startAvroProducer() throws InterruptedException, IOException{
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(AvroProducer.schema);

        AvroKey<GenericRecord> k = new AvroKey<GenericRecord>();

        GenericRecord datum = new GenericData.Record(schema);
        datum.put("str1","phani");
        datum.put("str2", "kumar");
        datum.put("int1", 1);
        k.datum(datum);
        GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        Encoder e = EncoderFactory.get().binaryEncoder(os, null);
        writer.write(k.datum(), e);
        e.flush();
        byte[] bytedata = os.toByteArray();
        KafkaProducer<String,byte[]> producer = new KafkaProducer<String,byte[]>(props);
        ProducerRecord<String,byte[]> producerRec = new ProducerRecord<String, byte[]>("jason", bytedata);
        producer.send(producerRec);
        producer.close();
    }

消费者代码:

private static SparkConf sc = null;
private static JavaSparkContext jsc = null;
private static JavaStreamingContext jssc = null;

public static void startAvroConsumer() throws InterruptedException {
    sc = new SparkConf().setAppName("Spark Avro Streaming Consumer")
            .setMaster("local[*]");

    jsc = new JavaSparkContext(sc);
    jssc = new JavaStreamingContext(jsc, new Duration(200));

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(AvroProducer.schema);

    Set<String> topics = Collections.singleton("jason");
    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    kafkaParams.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("value.deserializer",
            "org.apache.kafka.common.serialization.ByteArrayDeserializer");

    JavaPairInputDStream<String, byte[]> inputDstream = KafkaUtils
            .createDirectStream(jssc, String.class, byte[].class,
                    StringDecoder.class, DefaultDecoder.class, kafkaParams,
                    topics);

    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(
            schema);

    inputDstream.map(message -> {
        ByteArrayInputStream bis = new ByteArrayInputStream(message._2);
        Decoder decoder = DecoderFactory.get().binaryDecoder(bis, null);
        GenericRecord record = reader.read(null, decoder);
        String str1 = getValue(record, "str1", String.class);
        String str2 = getValue(record, "str2", String.class);
        int int1 = getValue(record, "int1", Integer.class);

        return str1 + " " + str2 + " " + int1;
    }).print();;

    jssc.start();
    jssc.awaitTermination();
}

@SuppressWarnings("unchecked")
public static <T> T getValue(GenericRecord genericRecord, String name,
        Class<T> clazz) {
    Object obj = genericRecord.get(name);
    if (obj == null)
        return null;
    if (obj.getClass() == Utf8.class) {
        return (T) obj.toString();
    }
    if (obj.getClass() == Integer.class) {
        return (T) obj;
    }
    return null;
}

例外:

Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericDatumReader
Serialization stack:
    - object not serializable (class: org.apache.avro.generic.GenericDatumReader, value: org.apache.avro.generic.GenericDatumReader@7da8db47)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.applications.streaming.consumers.AvroConsumer, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/applications/streaming/consumers/AvroConsumer.lambda$0:(Lorg/apache/avro/generic/GenericDatumReader;Lscala/Tuple2;)Ljava/lang/String;, instantiatedMethodType=(Lscala/Tuple2;)Ljava/lang/String;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class com.applications.streaming.consumers.AvroConsumer$$Lambda$13/1805404637, com.applications.streaming.consumers.AvroConsumer$$Lambda$13/1805404637@aa31e58)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 15 more

在阅读各种博客时,我了解到 Avro 对象没有实现可序列化接口。但是,根据下面的jira

https://issues.apache.org/jira/browse/AVRO-1502

问题已解决。我仍然遇到此问题。

是否有可能解决此问题。

【问题讨论】:

  • 已解决,在发行版中已解决。您在使用它吗?不清楚你在问什么。

标签: java serialization apache-kafka spark-streaming avro


【解决方案1】:

来自消费者代码:

 kafkaParams.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringSerializer");
    kafkaParams.put("value.deserializer",
            "org.apache.kafka.common.serialization.ByteArraySerializer");

你可以看到你已经为解串器键放置了序列化器类。

Deserializers to be used : ByteArrayDeserializer, StringDeserializer

一般性说明:kafka 上的 Avro 数据需要使用某种模式注册服务来实现,因为 avro 模式会随着时间而发展。

http://bytepadding.com/big-data/spark/avro/avro-serialization-de-serialization-using-confluent-schema-registry/

【讨论】:

  • 这是一个错字。更改键、值序列化程序并没有解决问题。
【解决方案2】:

您的问题是您从 lambda 引用以下对象 功能

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(
        schema);

GenericDatumReader 不可序列化。您有 2 个选项。在地图函数中移动对象的实例化(不是一个好选择)或将此对象作为类的静态成员移动。这将强制为每个执行程序(每个 jvm 1 个)只创建一个新对象。考虑到您使用的是预编译模式,您可以在静态块中轻松创建实例。像这样

static GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(new Schema.Parser().parse(AvroProducer.schema));

static GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(AvroProducer.$SCHEMA);

【讨论】:

    猜你喜欢
    • 2017-11-20
    • 2018-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-24
    • 2020-07-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多