【问题标题】:Apache Flink read Avro byte[] from KafkaApache Flink 从 Kafka 读取 Avro byte[]
【发布时间】:2017-05-06 10:03:15
【问题描述】:

在查看示例时,我看到了很多:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

我看到他们在这里已经知道架构。

在我将 byte[] 读入通用记录之前,我不知道架构 然后获取架构。 (因为它可能会因记录而异)

有人可以将我指向一个从byte[] 读取到映射过滤器的FlinkKafkaConsumer08,以便我可以删除一些前导位,然后将byte[] 加载到通用记录中吗?

【问题讨论】:

    标签: java stream apache-kafka apache-flink avro


    【解决方案1】:

    如果您使用 Confluent 的模式注册表,我相信首选的解决方案是使用 Confluent 提供的 Avro serde。这样,我们只需调用deserialize() 并且要使用的最新版本的 Avro 模式的解析是在幕后自动完成的,不需要字节操作。

    归结为这样的事情(scala中的示例代码,java解决方案将非常相似):

    import io.confluent.kafka.serializers.KafkaAvroDeserializer
    
    ...
    
    val valueDeserializer = new KafkaAvroDeserializer()
    valueDeserializer.configure(
      Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
      false)
    
    ...
    
    override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
                           topic: String, partition: Int, offset: Long): KafkaKV = {
    
        val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
        val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]
    
        KafkaKV(key, value)
        }
    
    ...
    

    此方法要求消息生产者也与模式注册表集成并在那里发布模式。这可以通过与上述非常相似的方式完成,使用 Confluent 的 KafkaAvroSerializer

    我在这里贴了详细的解释:How to integrate Flink with Confluent's schema registry

    【讨论】:

    • 感谢提醒,现在应该已经修复了。
    • 顺便说一句,我认为是 FLINK-9337 / 9338 添加了 Confluent 序列化程序包
    • 您好,发送,我正在使用您描述和构建具有通用记录的数据流的方式。但是如何以 parquet 格式编写此流,因为 parquet writer 需要架构,并且我的记录具有在架构注册表中注册的不同架构。
    【解决方案2】:

    我正在做类似的事情(我正在使用 09 消费者)

    在您的主代码中传入您的自定义反序列化器:

    FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                    parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                    parameterTool.getProperties());
    

    自定义反序列化架构读取字节,找出架构和/或从架构注册表中检索它,反序列化为 GenericRecord 并返回 GenericRecord 对象。

    public class MyDeserializationSchema<T> implements DeserializationSchema<T> {
    
    
        private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;
    
        @Override
        public T deserialize(byte[] arg0) throws IOException {
            //do your stuff here, strip off your bytes
            //deserialize and create your GenericRecord 
            return (T) (myavroevent);
        }
    
        @Override
        public boolean isEndOfStream(T nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<T> getProducedType() {
            return TypeExtractor.getForClass(avrotype);
        }
    
    }
    

    【讨论】:

    • Wow 开箱即用。谢谢,现在我看这个很明显。
    猜你喜欢
    • 2016-11-08
    • 2021-11-30
    • 2018-02-01
    • 2018-05-09
    • 2019-08-01
    • 2021-06-14
    • 2018-05-13
    • 1970-01-01
    • 2019-07-08
    相关资源
    最近更新 更多