【问题标题】:WriteStream is not able to write Data in Delta TableWriteStream 无法在 Delta 表中写入数据
【发布时间】:2021-07-21 12:20:18
【问题描述】:

我正在尝试使用以下代码从流式传输路径连接流式传输 Json 文件

Schema1= "customerId STRING,orderId STRING,products ARRAY<STRUCT<productId: STRING,quantity: STRING,soldPrice: STRING>>,salesRepId STRING,shippingAddress STRUCT<address: STRING,attention: String,city: STRING,state: STRING,zip: STRING>,submittedAt TIMESTAMP";
streamingDF = (spark.readStream.schema(Schema1)\
  .option("maxFilesPerTrigger", 1).json(stream_path))

在 streamingDF Streaming Dataset 中进行几次转换并尝试使用以下代码写入 Delta 表

streamingDF.writeStream.outputMode("append")\
  .option("checkpointLocation", orders_checkpoint_path)\
  .partitionBy("submitted_yyyy_mm")\
  .table("sachin")

但是这些记录没有插入到我们的增量表中,而且当我检查仪表板时,它显示 numInputRows 为 0

Screenshot of streaming while writestream being executed

为什么这些记录没有追加到增量表中?

【问题讨论】:

  • 根据图表,您没有任何输入数据
  • 尝试删除检查点文件夹并再次运行。 dbutils.fs.rm(orders_checkpoint_path, True)
  • 是的,当我们在删除检查点路径后运行它时它会起作用

标签: pyspark databricks spark-structured-streaming delta-lake


【解决方案1】:

而不是使用 .table() 使用 .start() 然后路径表路径而不是表名:

   streamingDF.writeStream.outputMode("append")
  .option("checkpointLocation", orders_checkpoint_path)
  .partitionBy("submitted_yyyy_mm")
  .start("/pathtotable/sachin")

【讨论】:

    猜你喜欢
    • 2019-04-16
    • 1970-01-01
    • 1970-01-01
    • 2022-08-05
    • 2014-04-08
    • 1970-01-01
    • 2021-10-16
    • 2018-07-02
    • 1970-01-01
    相关资源
    最近更新 更多