【发布时间】:2020-09-13 03:55:30
【问题描述】:
我正在编写一个使用结构化流的 Spark 应用程序。该应用程序从 Kafka 主题 topic1 读取消息,构造一条新消息,将其序列化为 Array[Byte] 并将它们发布到另一个 Kafka 主题 topic2。
序列化为字节数组很重要,因为我使用topic2 的下游消费者也使用的特定序列化器/反序列化器。
不过,我在制作 Kafka 时遇到了麻烦。我什至不知道该怎么做..网上只有很多关于排队 JSON 数据的例子。
代码 -
case class OutputMessage(id: String, bytes: Array[Byte])
implicit val encoder: Encoder[OutputMessage] = org.apache.spark.sql.Encoders.kryo
val outputMessagesDataSet: DataSet[OutputMessage] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.load()
.select($"value")
.mapPartitions{r =>
val messages: Iterator[OutputMessage] = createMessages(r)
messages
}
outputMessagesDataSet
.writeStream
.selectExpr("CAST(id AS String) AS key", "bytes AS value")
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("topic", "topic2")
.option("checkpointLocation", loc)
.trigger(trigger)
.start
.awaitTermination
但是,这会引发异常 org.apache.spark.sql.AnalysisException: cannot resolve 'id' given input columns: [value]; line 1 pos 5;
如何以id 作为键、bytes 作为值来排队到 Kafka?
【问题讨论】:
标签: scala apache-spark apache-kafka