【发布时间】:2021-04-23 12:18:51
【问题描述】:
我是 Scala 和 Apache Flink 的初学者,但到目前为止,一切对我来说都很顺利。我正在尝试使用从我的 Flink 应用程序序列化到 AVRO 的 Kafka 事件。我阅读了文档 (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema) 并在谷歌上搜索了很多小时,但我仍然在同一页面上。
我有一个案例类case class URLResponse(status: int, domain: String, url: String, queue: String, html: String) 和一个模式val schema: Schema = new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"URLResponse\",\"fields\": [{\"name\": \"status\", \"type\": \"long\"}, {\"name\": \"domain\", \"type\": \"string\"}, {\"name\": \"url\", \"type\": \"string\"}, {\"name\": \"queue\", \"type\": \"string\"}, {\"name\": \"html\", \"type\": \"string\"}]}")。我尝试了 3 种方法:
-
val stream = env.addSource(new FlinkKafkaConsumer(kafkaTopic, AvroDeserializationSchema.forGeneric(schema), properties))运行时出现错误:
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
-
val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties))构建失败
inferred type arguments [schemas.URLResponse] do not conform to method forSpecific's type parameter bounds [T <: org.apache.avro.specific.SpecificRecord]
kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties))
-
val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](kafkaTopic, new AvroDeserializationSchema[URLResponse](classOf[URLResponse]), properties))构建失败
constructor AvroDeserializationSchema in class AvroDeserializationSchema cannot be accessed in object MyApp
val des: AvroDeserializationSchema[URLResponse] = new AvroDeserializationSchema[URLResponse](classOf[URLResponse])
请帮忙!首选方法是什么?为什么它不起作用?谢谢!
【问题讨论】:
标签: scala apache-kafka apache-flink avro