【发布时间】:2021-07-21 12:20:18
【问题描述】:
我正在尝试使用以下代码从流式传输路径连接流式传输 Json 文件
Schema1= "customerId STRING,orderId STRING,products ARRAY<STRUCT<productId: STRING,quantity: STRING,soldPrice: STRING>>,salesRepId STRING,shippingAddress STRUCT<address: STRING,attention: String,city: STRING,state: STRING,zip: STRING>,submittedAt TIMESTAMP";
streamingDF = (spark.readStream.schema(Schema1)\
.option("maxFilesPerTrigger", 1).json(stream_path))
在 streamingDF Streaming Dataset 中进行几次转换并尝试使用以下代码写入 Delta 表
streamingDF.writeStream.outputMode("append")\
.option("checkpointLocation", orders_checkpoint_path)\
.partitionBy("submitted_yyyy_mm")\
.table("sachin")
但是这些记录没有插入到我们的增量表中,而且当我检查仪表板时,它显示 numInputRows 为 0
Screenshot of streaming while writestream being executed
为什么这些记录没有追加到增量表中?
【问题讨论】:
-
根据图表,您没有任何输入数据
-
尝试删除检查点文件夹并再次运行。
dbutils.fs.rm(orders_checkpoint_path, True) -
是的,当我们在删除检查点路径后运行它时它会起作用
标签: pyspark databricks spark-structured-streaming delta-lake