【问题标题】:Pass additional arguments to foreachBatch in pyspark将附加参数传递给 pyspark 中的 foreachBatch
【发布时间】:2019-09-22 05:25:26
【问题描述】:

我在 pyspark 结构化流中使用 foreachBatch 使用 JDBC 将每个微批处理写入 SQL Server。我需要对多个表使用相同的过程,并且我想通过为表名添加一个附加参数来重用相同的编写器函数,但我不确定如何传递表名参数。

示例here 非常有用,但在 python 示例中,表名是硬编码的,看起来在 scala 示例中它们引用了一个全局变量(?)我想传递表到函数中。

上面链接的python示例中给出的函数是:

def writeToSQLWarehose(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

我想用这样的东西:

def writeToSQLWarehose(df, epochId, tableName):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", tableName) \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

但我不确定如何通过 foreachBatch 传递附加参数。

【问题讨论】:

  • 你找到解决办法了吗?

标签: apache-spark pyspark spark-structured-streaming databricks


【解决方案1】:

这样的事情应该可以工作。

streamingDF.writeStream.foreachBatch(lambda df,epochId: writeToSQLWarehose(df, epochId,tableName )).start()

【讨论】:

  • 非常好!在文档的其他任何地方都找不到这样的解决方案!
【解决方案2】:

如果您需要运行多个流,Samellas 的解决方案将不起作用。 foreachBatch 函数被序列化并发送到 Spark worker。该参数似乎仍然是worker中的共享变量,并且可能在执行过程中发生变化。

我的解决方案是在批处理数据框中添加参数作为文字列(将银色数据湖表路径传递给合并操作):

.withColumn("dl_tablePath", func.lit(silverPath))
.writeStream.format("delta")
.foreachBatch(insertIfNotExisting)

在批处理函数insertIfNotExisting中,我拿起参数,放下参数列:

def insertIfNotExisting(batchDf, batchId):
  tablePath = batchDf.select("dl_tablePath").limit(1).collect()[0][0]
  realDf = batchDf.drop("dl_tablePath")

【讨论】:

    猜你喜欢
    • 2011-12-21
    • 2013-01-09
    • 1970-01-01
    • 2015-07-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-26
    • 2011-07-10
    相关资源
    最近更新 更多