【问题标题】:Flink Avro Serialization shows "not serializable" error when working with GenericRecords使用 GenericRecords 时 Flink Avro 序列化显示“不可序列化”错误
【发布时间】:2020-05-15 20:50:26
【问题描述】:

我真的很难让 Flink 与正在运行的 Kafka 实例正确通信,它使用来自 Confluent Schema Registry 的 Avro 模式(对于 键和值)。

经过一段时间的思考和重组我的程序,我能够将我的实现推进到现在:

生产者方法

    public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {  
        final Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "--.-.-.--:9092");
        properties.put("schema.registry.url", "http://--.-.-.---:8081");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class should not matter
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class but should not matter


        return new FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>>("flink_output",
                new GenericSerializer("flink_output", schemaK, schemaV, "http://--.-.-.---:8081"),
                properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

    }

GenericSerializer.java

package com.reeeliance.flink;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import flinkfix.ConfluentRegistryAvroSerializationSchema;

public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{

    private String topic;   
    private Schema schemaKey;
    private Schema schemaValue;
    private String registryUrl;

    public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
        super();
        this.topic = topic;
        this.schemaKey = schemaK;
        this.schemaValue = schemaV;
        this.registryUrl = url;
    }

    public GenericSerializer() {
        super();
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
        byte[] key = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl).serialize(element.f0);
        byte[] value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl).serialize(element.f1);

        return new ProducerRecord<byte[], byte[]>(topic, key, value);
    }

}

但是,当我执行作业时,它在准备阶段失败,作业实际上没有运行,并出现以下错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [H_EQUNR type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
    at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
    at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
    - custom writeObject data (class "java.util.ArrayList")
    - root object (class "org.apache.avro.Schema$LockableArrayList", [H_EQUNR type:STRING pos:0])
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.ArrayList.writeObject(ArrayList.java:766)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    ... 8 more

我知道所有类都必须实现 Serializable-Interface 或被设为瞬态,但我不使用我自己的类并且错误未解决不可序列化的函数(如通常线程处理),而是一个记录或字段。 该字段来自键模式,一种仅包含这一字段的模式。我认为我的错误在于使用 GenericRecord,它没有实现 Serializable-Interface,但我看到 GenericRecord 经常用于这种序列化,所以它对我来说没有意义。

ConfluentRegistryAvroSerializationSchema 类取自 GitHub,因为它尚未包含在我们正在使用的当前 Flink 版本(1.9.1)中。我包括了必要的课程和更改的课程,我认为这可能不是我的问题的原因。 (Issue solved)

谁能帮我调试一下?我也很感激,如果你能告诉我实现相同目标的不同方法,到目前为止,Flink Avro 和 Confluent Schema Registry 的不兼容一直让我发疯。

【问题讨论】:

    标签: java apache-kafka apache-flink avro confluent-schema-registry


    【解决方案1】:

    问题在于 org.apache.avro.Schema$Field 类。该类不可序列化,导致此异常。该解决方案在 flink 文档中以及注释部分中都有提及

    Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.

    所以我们需要对我们收到的每条消息进行解析,我们不能在构造函数中只做一次,它与传递给构造函数本身一样。

    所以解决办法可以像下面的sn-p。我们将在构造函数中接受 avro 模式作为字符串,并将在 serialize 方法中创建 avro 模式。

    class AvroMessageSerializationSchema(topic: String, schemaString: String, schemaRegistry: String) extends KafkaSerializationSchema[GenericRecord] {
    
      private def getSchema(schema: String): Schema = {
        new Schema.Parser().parse(schema)
      }
    
      override def serialize(element: GenericRecord, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        val schema = getSchema(schemaString)
        val value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, schemaRegistry).serialize(element)
        new ProducerRecord[Array[Byte], Array[Byte]](topic, value)
      }
    }

    还有一件事,我们需要记住的是,提供 flink 序列化 avro 所需的 typeInformation,否则它将回退到 kyro 进行序列化。

    implicit val typeInformation: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
    

    avro serialization with flink

    【讨论】:

      【解决方案2】:

      我在部署 flink 作业时遇到同样的错误(原因:java.io.NotSerializableException: org.apache.avro.Schema$Field),这是我的序列化器和设计器。

          public static class AVROGeneratorSchema implements SerializationSchema<GenericRecord> {
      
          @Override
          public byte[] serialize(GenericRecord genericData) {
              StringBuffer sb = new StringBuffer();
              sb.append((String) genericData.get("field1"));
              sb.append("::");
              sb.append((String) genericData.get("field2"));
              sb.append("::");
              sb.append((String) genericData.get("field3"));
              return (sb.toString()).getBytes();
          }
      }
      

      还有Deser:-

          public static class AVROGeneratorDeSchema implements DeserializationSchema<GenericRecord> {
          @Override
          public GenericRecord deserialize(byte[] bytes) throws IOException {
              GenericRecord responseData = new GenericData.Record(new Schema.Parser().parse(schemaStr));
              String[] tokens = new String(bytes).split("::");
              responseData.put("field1", tokens[0]);
              responseData.put("field2", tokens[1]);
              responseData.put("field3", tokens[2]);
              return responseData;
          }
      
          @Override
          public TypeInformation<GenericRecord> getProducedType() {
              return TypeExtractor.getForClass(GenericRecord.class);
          }
          @Override
          public boolean isEndOfStream(GenericRecord genericData) {
              return false;
          }
      }
      

      这是错误堆栈:

      Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [field1 type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
          at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
          at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
          at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
          at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
          at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
          at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
      Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
          - custom writeObject data (class "java.util.ArrayList")
          - root object (class "org.apache.avro.Schema$LockableArrayList", [field1 type:STRING pos:0])
          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
          at java.util.ArrayList.writeObject(ArrayList.java:766)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
          at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
          at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
      

      我不想使用 Confluent 模式注册表,有什么线索吗?

      【讨论】:

        【解决方案3】:

        对于 Flink 为 avro 通用记录回退到 kryo 的问题是否有任何结论?

        我正在使用 scala 并添加了这样的类型信息:

        implicit val typeInformation: TypeInformation[GenericRecord] = TypeInformation.of( new TypeHint[GenericRecord] {
              new GenericRecordAvroTypeInfo(EventMessage.SCHEMA$)
            })
        

        流是这样设置的:

        DataStream[GenericRecord]
        

        但 Flink Runtime 仍然回退到 kryo,因为它无法识别 Avro Generic Record 并将其视为任何泛型类型。

        【讨论】:

        • 据我记得,Flink 输入GenericRecord 有问题,但输入Tuple 没有问题,所以将Type 切换为Tuple1&lt;GenericRecord&gt; 肯定可以。在您的情况下,将 TypeInformation.of() 更改为 new TupleTypeInfo&lt;Tuple1&lt;GenericRecord&gt;&gt;(new GenericRecordAvroTypeInfo(new Schema.Parser().parse(schemaString))) 将使这项工作。我无法让GenericRecord 在不包装的情况下工作,但它仍然是可能的。
        • 我把它改成了implicit val typeInformation: TypeInformation[Tuple1[GenericRecord]] = TypeInformation.of( new TupleTypeInfo[Tuple1[GenericRecord]](new GenericRecordAvroTypeInfo( EventMessage.SCHEMA$ )) ) 但编译器抱怨它并在TypeInformation.of() 调用中强调.of() 。说它不能超载...
        • 在 Java 中,TupleTypeInfoTypeInformation 的子类,因此可以直接使用而无需 TypeInformation.of() 部分。我认为它在 Scala 中应该是相似的
        【解决方案4】:

        异常消息告诉你哪个类不可序列化。

        Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
        

        问题在于您存储在 GenericSerializer 字段中的 Schema 类。

        你可以试试这个:

        public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{
        
            private final SerializationSchema<GenericRecord> valueDeserializer;
            private final SerializationSchema<GenericRecord> keyDeserializer;
        
            public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
                this.keyDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl);
                this.valueDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl); 
            }
        
            @Override
            public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
                byte[] key = keySerializer.serialize(element.f0);
                byte[] value = valueSerializer.serialize(element.f1);
        
                return new ProducerRecord<byte[], byte[]>(topic, key, value);
            }
        
        }
        

        ConfluentRegistryAvroSerializationSchema 是可序列化的,因此您可以安全地将其存储在您的 GenericSerializer 的字段中。

        它的性能也会更高,因为不会为每个传入记录重新实例化底层结构。

        【讨论】:

        • 我已经将 Schema$Field 确定为罪魁祸首,但我无法理解它可能导致问题的 where。我尝试将架构的解析移动到应用程序中的另一个点,只在我的构造函数中包含字符串,但我不知道存在类型为 Schema 的字段将是我的问题.非常感谢,这是一个很好的答案,我立即明白我的错在哪里。我现在遇到了不同的问题,但这很有帮助
        • 你不会知道,为什么会出现Caused by: java.lang.NullPointerException at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159) [...]com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 27 more?不幸的是,我无法禁用 Kryo 回退,因为我使用的是 GenericRecords
        • Kryo 序列化程序不适用于GenericRecords。 GenericRecords 既不是 JavaSerializable 也不是 KryoSerializable。您可以尝试将new AvroSerializer(GenericRecord.class, Schema(...)) 与您的州一起使用。
        • “与你的状态”是什么意思?我已经写了一个 Serializer 类,还是说我必须注册它?
        • 来自 KryoSerializers 的异常意味着您在某处使用 GenricTypeInformation。在运算符之间进行序列化或将其存储在状态(例如 ListState、MapState、ValueState 等)时。 Kryo 无法序列化 GenericRecord。
        猜你喜欢
        • 2018-04-08
        • 1970-01-01
        • 2015-09-27
        • 2021-10-23
        • 2023-03-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多