【发布时间】:2018-09-19 13:39:26
【问题描述】:
我有一个 Pyspark 笔记本,它连接到 kafka 代理并创建一个名为 temp 的 spark writeStream。 Kafka 主题中的数据值是 json 格式,但我不确定如何创建一个可以实时解析这些数据的 spark sql 表。我知道的唯一方法是创建表的副本,将其转换为 RDD 或 DF,然后将值解析为另一个 RDD 和 DF。在写入流时是否可以在实时处理中完成此操作?
代码:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","hoteth") \
.option("startingOffsets", "earliest") \
.load()
ds = df.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)", "timestamp")
ds.writeStream.queryName("temp").format("memory").start()
spark.sql("select * from temp limit 5").show()
输出:
+----+--------------------+--------------------+
| key| value| timestamp|
+----+--------------------+--------------------+
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
+----+--------------------+--------------------+
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql spark-streaming