【问题标题】:How to consume kafka events serialized to AVRO from Flink?如何使用从 Flink 序列化到 AVRO 的 kafka 事件?
【发布时间】:2021-04-23 12:18:51
【问题描述】:

我是 Scala 和 Apache Flink 的初学者,但到目前为止,一切对我来说都很顺利。我正在尝试使用从我的 Flink 应用程序序列化到 AVRO 的 Kafka 事件。我阅读了文档 (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema) 并在谷歌上搜索了很多小时,但我仍然在同一页面上。 我有一个案例类case class URLResponse(status: int, domain: String, url: String, queue: String, html: String) 和一个模式val schema: Schema = new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"URLResponse\",\"fields\": [{\"name\": \"status\", \"type\": \"long\"}, {\"name\": \"domain\", \"type\": \"string\"}, {\"name\": \"url\", \"type\": \"string\"}, {\"name\": \"queue\", \"type\": \"string\"}, {\"name\": \"html\", \"type\": \"string\"}]}")。我尝试了 3 种方法:

  1. val stream = env.addSource(new FlinkKafkaConsumer(kafkaTopic, AvroDeserializationSchema.forGeneric(schema), properties))运行时出现错误:
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
  1. val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties)) 构建失败
inferred type arguments [schemas.URLResponse] do not conform to method forSpecific's type parameter bounds [T <: org.apache.avro.specific.SpecificRecord]
      kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties))
  1. val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](kafkaTopic, new AvroDeserializationSchema[URLResponse](classOf[URLResponse]), properties)) 构建失败
constructor AvroDeserializationSchema in class AvroDeserializationSchema cannot be accessed in object MyApp
    val des: AvroDeserializationSchema[URLResponse] = new AvroDeserializationSchema[URLResponse](classOf[URLResponse])

请帮忙!首选方法是什么?为什么它不起作用?谢谢!

【问题讨论】:

    标签: scala apache-kafka apache-flink avro


    【解决方案1】:

    似乎建议使用第一种方法。并且提到的异常与 avro 反序列化的 scala 实现问题有关。如果我使用 java 实现(https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro),它工作正常。我的解决方案:

        val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
          kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
          new GenericRecordAvroTypeInfo(schema))
        val stream = new DataStream[GenericRecord](javaStream)
    

    【讨论】:

      猜你喜欢
      • 2019-08-03
      • 2021-10-20
      • 1970-01-01
      • 1970-01-01
      • 2021-07-01
      • 2019-09-18
      • 2017-05-06
      • 2016-12-07
      • 1970-01-01
      相关资源
      最近更新 更多