【发布时间】:2019-03-20 20:16:18
【问题描述】:
我有一个 Python 中的 Azure Databricks 脚本,它使用结构化流从事件中心读取 JSON 消息,处理消息并将结果保存在 Data Lake Store 中。 消息从读取 Twitter API 推文的 Azure 逻辑应用程序发送到事件中心。
我正在尝试反序列化事件中心消息的正文,以便处理其内容。消息体首先从二进制转换为字符串值,然后使用from_json函数反序列化为结构类型,如本文所述:https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
这是一个代码示例(带有混淆的参数):
from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import DateType, StringType, StructType
EVENT_HUB_CONN_STRING = 'Endpoint=sb://myehnamespace.servicebus.windows.net/;SharedAccessKeyName=Listen;SharedAccessKey=xxx;EntityPath=myeh'
OUTPUT_DIR = '/mnt/DataLake/output'
CHECKPOINT_DIR = '/mnt/DataLake/checkpoint'
event_hub_conf = {
'eventhubs.connectionString' : EVENT_HUB_CONN_STRING
}
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
schema = StructType() \
.add('FetchTimestampUtc', DateType()) \
.add('Username', StringType()) \
.add('Name', StringType()) \
.add('TweetedBy', StringType()) \
.add('Location', StringType()) \
.add('TweetText', StringType())
stream_data_body = stream_data \
.select(stream_data.body) \
.select(from_json('body', schema).alias('body')) \
.select(to_json('body').alias('body'))
# This works (bare string value, no deserialization):
# stream_data_body = stream_data.select(stream_data.body)
stream_data_body \
.writeStream \
.outputMode('append') \
.format('json') \
.option('path', OUTPUT_DIR) \
.option('checkpointLocation', CHECKPOINT_DIR) \
.start() \
.awaitTermination()
这里我实际上还没有做任何处理,只是一个微不足道的反序列化/序列化。
上述脚本确实向 Data Lake 生成输出,但结果 JSON 对象为空。以下是输出示例:
{}
{}
{}
脚本中的注释代码确实会产生输出,但这只是字符串值,因为我们没有包含反序列化:
{"body":"{\"FetchTimestampUtc\": 2018-10-16T09:21:40.6173187Z, \"Username\": ... }}
我想知道反斜杠是否应该加倍,就像上面链接中给出的示例一样?这可以通过from_json 函数的 options 参数实现:“控制解析的选项。接受与 json 数据源相同的选项。”但我还没有找到选项格式的文档。
任何想法为什么反序列化/序列化不起作用?
【问题讨论】:
-
spark.apache.org/docs/2.2.1/api/java/org/apache/spark/sql/… 此处的 json 部分包含您可以应用于 Spark 中的 JSON 阅读器的所有选项。一个有趣的是allowBackslashEscapingAnyCharacter,但我会让你告诉我们哪一个适合你的情况......
-
PS 选项也作为 JSON 传递,例如{'allowUnquotedFieldNames':'true'}
-
感谢@KyleHale 的链接!我尝试使用 'allowUnquotedFieldNames':'true' 但它没有解决问题。我很确定这个问题与模式定义有关,我也就此与 Databricks 团队联系过,但我仍然无法确定问题到底是什么,因为没有错误消息. ://
标签: azure pyspark azure-eventhub databricks spark-structured-streaming