【问题标题】:How to get DataFrame in Structured Streaming?如何在结构化流中获取 DataFrame?
【发布时间】:2018-07-26 12:57:46
【问题描述】:

我想从 MQTT 接收 JSON 字符串并将它们解析为 DataFrames df。我该怎么做?

这是我发送到 MQTT 队列以便在 Spark 中处理的 Json 消息示例:

{
"id": 1,
"timestamp": 1532609003,
"distances": [2,5,7,8]
}

这是我的代码:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[4]") \
    .getOrCreate()

# Custom Structured Streaming receiver
reader = spark\
             .readStream\
             .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
             .option("topic","uwb/distances")\
             .option('brokerUrl', 'tcp://127.0.0.1:1883')\
             .load()\
             .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")


df = spark.read.json(reader.select("value").rdd)

# Start running the query that prints the running counts to the console
query = df \
    .writeStream \
    .format('console') \
    .start()

query.awaitTermination()

但是这段代码失败了:

py4j.protocol.Py4JJavaError: An error occurred while calling o45.javaToPython.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
mqtt

我尝试添加start如下:

df = spark.read.json(reader.select("value").rdd) \
    .writeStream \
    .format('console') \
    .start()

但是得到了同样的错误。我的目标是获得一个可以进一步通过 ETL 流程的 DataFrame df

更新:

标记为答案的线程没有帮助我解决问题。首先,当我使用 PySpark 时,它为 Scala 提供了解决方案。 其次,我测试了答案中提出的解决方案,它给我返回了空列json

reader = spark\
             .readStream\
             .schema(spark.read.json("mqtt_schema.json").schema) \
             .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
             .option("topic","uwb/distances")\
             .option('brokerUrl', 'tcp://127.0.0.1:1883')\
             .load()\
             .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")

json_schema = spark.read.json("mqtt_schema.json").schema
df = reader.withColumn('json', from_json(col('value'), json_schema))

query = df \
    .writeStream \
    .format('console') \
    .start()

【问题讨论】:

  • @user6910411:在你提到的胎面中使用了 Scala,而我使用的是 PySpark。

标签: python apache-spark pyspark spark-structured-streaming


【解决方案1】:

您必须使用from_json 或等效方法。 如果文档的结构看起来像问题中的样子

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

schema = StructType([
    StructField("id", LongType()),
    StructField("timestamp", LongType()),
    StructField("distances", ArrayType(LongType()))
])


ds = spark.readStream.load(...)

ds.withColumn("value", from_json(col("value").cast("string"), schema))

【讨论】:

  • 在控制台中我看到value 等于null。文档的结构与问题中显示的完全相同。
【解决方案2】:

我想这是因为你的 df 没有流式传输。试试看怎么样 reader.select("value").writestream

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-07-12
    • 2021-12-03
    • 2020-11-08
    • 2017-10-09
    • 1970-01-01
    • 1970-01-01
    • 2018-08-20
    相关资源
    最近更新 更多