【发布时间】:2018-01-18 17:47:44
【问题描述】:
我想将 Twitter 中的数据写入 Kafka。出于教育目的,我尝试使用结构化流来做到这一点。我创建了一个基于 socket-Source 的 Twitter-Source,效果很好。
我的来源设置如下:
val tweets = spark
.readStream
.format("twitter")
.option("query", terms)
.load()
.as[SparkTweet]
这为我提供了一个很好的分析查询数据集。太好了!
接下来,我想将略微激发的模式中的每条推文持久化到 Kafka 中:
val kafkaOutStream = tweets
.toJSON.as("value")
.writeStream
.queryName("stream_to_kafka")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 second"))
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","tweets")
.start
这很简单!除了,它不起作用。在QueryExecution.scala 中,调用传递到assertSupported 并最终被抛出,因为
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;
我没想到toJSON 是一个纯粹的批处理操作,但是没有它,并且使用select($"text" as "value") 代替,代码会起作用。
现在,我有点吃惊,希望有人能解释一下为什么 toJSON 不应该与流兼容(这是一个错误吗?缺少的功能?),并告诉我是否有结构化流式获取将我的对象序列化到 Kafka 中。
【问题讨论】:
-
@user6910411 我认为编辑后的标题不能正确代表问题的内容。毕竟,问题和答案都是关于 DataFrame.toJSON,以及它在特定版本的 Spark 中的实现细节。
标签: scala apache-spark apache-kafka spark-structured-streaming