【发布时间】:2018-09-16 13:25:54
【问题描述】:
我正在使用 Spark 2.1.0 和 Kafka 0.9.0。
我正在尝试将批处理 Spark 作业的输出推送到 kafka。该作业应该每小时运行一次,但不是流式传输。
在网上寻找答案时,我只能找到 kafka 与 Spark 流的集成,而没有找到与批处理作业的集成。
有谁知道这样的事情是否可行?
谢谢
更新:
正如 user8371915 所说,我尝试按照Writing the output of Batch Queries to Kafka 中所做的操作。
我用的是火花壳:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
这是我尝试过的简单代码:
val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()
但我得到了错误:
java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided
知道这与什么有关吗?
谢谢
【问题讨论】:
标签: scala apache-spark apache-kafka apache-spark-sql