【问题标题】:How to add a column and a batch_Id value to a delta table using a running pyspark streaming job?如何使用正在运行的 pyspark 流作业将列和 batch_Id 值添加到增量表?
【发布时间】:2022-06-10 23:48:14
【问题描述】:

我正在尝试为当前批处理运行中的每一行添加一个批处理 ID,然后将其写入增量表。在我的情况下,一批是一个具有多个值的 CSV 文件。我使用函数生成我的批次 ID 值。当我将流作业设置为执行一次时,我可以成功添加正确的批处理 ID,但是当我将其设置为等待终止时,它只执行一次 generate_id() 函数,然后每次上传 CSV 时将该值添加为批处理 ID文件到我的 ADLS gen2 容器。我需要它来执行我的 generate_id() 函数并在每次获取新的 CSV 文件时获取新值。请在下面查看我的代码。我使用 Synapse 笔记本来执行我的代码。

batch_id = 0 
def generate_id():
    global batch_id 
    batch_id = batch_id + 1 
    return batch_id

from pyspark.sql.functions import lit

stream = spark \
  .readStream \
  .option("maxFilesPerTrigger", 1) \
  .schema(customSchema) \
.csv("abfss://synapse@{storageAccountName}.dfs.core.windows.net/delta/putty/streaming_test/csv_files/") \
  .withColumn("Batch_Id",lit(generate_id())
  .writeStream \
  .outputMode("append") \
  .format("delta") \
  .option("checkpointLocation", "abfss://synapse@{storageAccountName}.dfs.core.windows.net/delta/putty/streaming_test/_checkpoints") \
  .option("mergeSchema", "true") \
  .foreachBatch(addCol) \
  .awaitTermination()

这是我需要的:

File Number Value batch_Id
File1 Val1 1
File1 Val2 1
File1 Val3 1
File2 Val1 2
File3 Val1 3
File3 Val2 3

这是我目前得到的:

File Number Value batch_Id
File1 Val1 1
File1 Val2 1
File1 Val3 1
File2 Val1 1
File3 Val1 1
File3 Val2 1

我也尝试过使用 foreachbatch 函数,但这似乎不起作用:

def addCol(df, epochId):
    df.withColumn("Batch_Id",lit(generate_id()))

stream = spark \
  .readStream \
  .option("maxFilesPerTrigger", 1) \
  .schema(customSchema) \
.csv("abfss://synapse@{storageAccountName}.dfs.core.windows.net/delta/putty/streaming_test/csv_files/") \
  .writeStream \
  .outputMode("append") \
  .format("delta") \
  .option("checkpointLocation", "abfss://synapse@{storageAccountName}.dfs.core.windows.net/delta/putty/streaming_test/_checkpoints") \
  .option("mergeSchema", "true") \
  .foreachBatch(addCol) \
  .toTable("patients")
  .awaitTermination()

这是我在运行代码时遇到的错误。我不确定这是什么意思:

AnalysisException: The input source(foreachBatch) is different from the table patients's data source provider(delta).
Traceback (most recent call last):

  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1563, in toTable
    return self._sq(self._jwrite.toTable(tableName))

  File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(

  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None

pyspark.sql.utils.AnalysisException: The input source(foreachBatch) is different from the table patients's data source provider(delta).

我是 Spark 流媒体的新手,但当我保持流媒体工作活跃时,感觉应该可以实现这样的事情。任何帮助将不胜感激。

【问题讨论】:

    标签: python spark-streaming


    【解决方案1】:

    也许你可以尝试使用map()mapPartition() 函数来解决这个用例。 像下面这样的东西可能适用于您的情况。

    您可以为数据框中的每个行对象调用生成 Batch Id 函数。

    df.mapPartitions(iterator => {
        val resultList = new List
        entityIterator.foreach(rowObject => {
            val batchId = generateBatchId()
            val fileNumber = rowObject.getAs("fileNumber")
            val value = rowObject.getAs("value")
            val rowData = Row(fileNumber,value,batchId)
            itr.add(rowData)
           }
    }
        
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-20
      • 2022-10-25
      • 2022-08-20
      • 1970-01-01
      • 2019-01-26
      • 2021-11-28
      • 2015-04-09
      • 2018-02-23
      相关资源
      最近更新 更多