【问题标题】:Is is possible to parse JSON string from Kafka topic in real time using Spark Streaming SQL?是否可以使用 Spark Streaming SQL 实时解析来自 Kafka 主题的 JSON 字符串?
【发布时间】:2018-09-19 13:39:26
【问题描述】:

我有一个 Pyspark 笔记本,它连接到 kafka 代理并创建一个名为 temp 的 spark writeStream。 Kafka 主题中的数据值是 json 格式,但我不确定如何创建一个可以实时解析这些数据的 spark sql 表。我知道的唯一方法是创建表的副本,将其转换为 RDD 或 DF,然后将值解析为另一个 RDD 和 DF。在写入流时是否可以在实时处理中完成此操作?

代码:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers","localhost:9092") \
    .option("subscribe","hoteth") \
    .option("startingOffsets", "earliest") \
    .load()

ds = df.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)", "timestamp")
ds.writeStream.queryName("temp").format("memory").start()
spark.sql("select * from temp limit 5").show()

输出:

+----+--------------------+--------------------+
| key|               value|           timestamp|
+----+--------------------+--------------------+
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
+----+--------------------+--------------------+

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql spark-streaming


    【解决方案1】:

    我可以解决这个问题的一种方法是像在 Hive HQL 中那样横向查看 json_tuple。我仍在寻找一种解决方案,它可以直接从流中解析数据,这样就不会花费额外的处理时间来使用查询进行解析。

    spark.sql("""
        select value, v1.transaction,ticker,price
        from temp 
        lateral view json_tuple(value,"e","s","p") v1 as transaction, ticker,price
        limit 5
        """).show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-22
      • 2018-01-09
      • 2016-04-08
      • 2016-11-03
      • 2019-07-12
      • 2017-09-30
      • 2021-01-05
      • 2017-03-18
      相关资源
      最近更新 更多