【问题标题】:How to work around DataSet.toJSON being incompatible with Structured Streaming如何解决 DataSet.toJSON 与结构化流不兼容的问题
【发布时间】:2018-01-18 17:47:44
【问题描述】:

我想将 Twitter 中的数据写入 Kafka。出于教育目的,我尝试使用结构化流来做到这一点。我创建了一个基于 socket-Source 的 Twitter-Source,效果很好。

我的来源设置如下:

val tweets = spark
  .readStream
  .format("twitter")
  .option("query", terms)
  .load()
  .as[SparkTweet]

这为我提供了一个很好的分析查询数据集。太好了!

接下来,我想将略微激发的模式中的每条推文持久化到 Kafka 中:

val kafkaOutStream = tweets
  .toJSON.as("value")
  .writeStream
  .queryName("stream_to_kafka")
  .outputMode(OutputMode.Append())
  .trigger(Trigger.ProcessingTime("1 second"))
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic","tweets")
  .start

这很简单!除了,它不起作用。在QueryExecution.scala 中,调用传递到assertSupported 并最终被抛出,因为

Exception in thread "main" org.apache.spark.sql.AnalysisException:
    Queries with streaming sources must be executed with writeStream.start();;

我没想到toJSON 是一个纯粹的批处理操作,但是没有它,并且使用select($"text" as "value") 代替,代码会起作用。

现在,我有点吃惊,希望有人能解释一下为什么 toJSON 不应该与流兼容(这是一个错误吗?缺少的功能?),并告诉我是否有结构化流式获取将我的对象序列化到 Kafka 中。

【问题讨论】:

  • @user6910411 我认为编辑后的标题不能正确代表问题的内容。毕竟,问题和答案都是关于 DataFrame.toJSON,以及它在特定版本的 Spark 中的实现细节。

标签: scala apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

这有点冗长,但to_json 函数应该可以解决问题:

import org.apache.spark.sql.functions.{to_json, struct, col}

tweets.select(to_json(struct(df.columns map col: _*)).alias("value"))
  .writeStream
  ...

toJSON 的问题似乎是this conversion to RDD

val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
  ...

并且(正如maasgthe comments 中指出的那样)似乎已经在开发版本中得到解决。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2021-07-16
  • 2021-06-24
  • 2017-10-02
  • 2013-07-18
  • 2014-05-28
  • 1970-01-01
  • 1970-01-01
  • 2015-10-18
相关资源
最近更新 更多