【问题标题】:loading data into azure blob using spark stream in Azure Databricks在 Azure Databricks 中使用 spark 流将数据加载到 azure blob
【发布时间】:2020-08-29 08:52:48
【问题描述】:

我正在 Azure Databricks 中尝试此代码:

jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

// readstream from azure event hub
df = spark.readStream.format("eventhubs").options(**ehConf).schema(jsonSchema).load() 
 streamingCountsDF = (df.withWatermark("Time", "500 milliseconds").groupBy(
      df.body,
      window(df.enqueuedTime, "1 hour"))
    .count()
)

//writing stream to azure blob
 streamingCountsDF.writeStream.format("parquet").option("path", file_location).option("checkpointLocation", "/tmp/checkpoint").start() 

file_location is the azure blob url.

我在最后一步遇到了错误:

org.apache.spark.sql.AnalysisException:当流式DataFrames/DataSets上没有水印的流式聚合时,不支持追加输出模式;;

我们如何解决这个问题?

【问题讨论】:

  • 谁能回答这个问题。我正在尝试做的是将事件中心数据流式传输到 azure blob。

标签: pyspark spark-streaming azure-blob-storage databricks azure-databricks


【解决方案1】:

根据我们使用的查询,我们需要选择合适的 输出模式。选择错误的一项会导致运行时异常,如下所示。

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on 
streaming DataFrames/DataSets without watermark;

参考:您可以阅读更多关于不同查询与不同输出模式的兼容性here

在结构化流中,流处理的输出是数据帧或表格。查询的输出模式表示这个无限输出表是如何写入接收器的,在我们的示例中是控制台。

共有三种输出模式:

追加 - 在这种模式下,只有在最后一个触发器(批处理)中到达的记录将被写入接收器。这支持简单的转换,如选择、过滤等。由于这些转换不会更改为早期批次计算的行,因此附加新行可以正常工作。

完成 - 在这种模式下,每次完成的结果表都会被写入接收器。通常与聚合查询一起使用。在聚合的情况下,结果的输出将随着新数据的到来而不断变化。

更新 - 在这种模式下,只有从上次触发器更改的记录将被写入接收器。

【讨论】:

  • 那么,这里我们需要使用Update吗?
猜你喜欢
  • 2020-03-15
  • 2019-08-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-03-08
  • 2020-11-15
  • 2020-07-07
  • 1970-01-01
相关资源
最近更新 更多