【问题标题】:Stateful aggregation function in PySparkPySpark 中的状态聚合函数
【发布时间】:2018-10-02 15:16:00
【问题描述】:

在 PySpark 中,我正在尝试定义一个自定义聚合器正在累积状态。在 Spark 2.3 中可以吗?

AFAIK,现在可以通过使用 PandasUDFType.GROUPED_AGG 关键字调用 pandas_udf,从 Spark 2.3 开始在 PySpark 中定义自定义 UDAF(参见 How to define and use a User-Defined Aggregate Function in Spark SQL?)。但是,鉴于它只是将函数作为参数,我认为在聚合期间不可能携带状态。

从 Scala 中,我看到可以通过扩展 UserDefinedAggregateFunctionorg.apache.spark.sql.expressions.Aggregator 来进行有状态聚合,但是我只能在 python 端做类似的事情吗?

【问题讨论】:

  • 我对 python 语法不是很熟悉,pyspark 中没有 flatMapGroupWithState 吗?
  • 好像不是!
  • 你想要python中的一切吗?
  • 到目前为止这是目标:)
  • 更深入地研究这一点,似乎 2.3 中的新功能是能够从 python 调用 scala/java UDAF。 SPARK-19439

标签: scala apache-spark pyspark apache-spark-sql


【解决方案1】:

您可以使用accumulator

您可以利用内置的 Spark Streaming state management

用于 SQL 的简单累加器示例

from  pyspark.sql.types import IntegerType

# have some data
df = spark.range(10).toDF("num")

# have a table
df.createOrReplaceTempView("num_table")

# have an accumulator
accSum = sc.accumulator(0)

# have a function that accumulates
def add_acc(int_val):
  accSum.add(int_val)
  return int_val

# register function as udf
spark.udf.register("reg_addacc", add_acc, IntegerType())

# use in sql
spark.sql("SELECT sum(reg_addacc(num)) FROM num_table").show()

# get value from accumulator
print(accSum.value)

45

【讨论】:

  • 但似乎它们不能通过 spark-sql 获得,对吧?累加器似乎是用于常规火花任务的“状态管理”部分似乎是指火花流
  • 我确信我已经看到了一个 udf,它在一个可以从 sql 调用的累加器中管理状态。但是,我现在找不到它。我得试试看。
  • 使用 accumulator / udf / sql 示例更新答案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-07-31
  • 2020-07-07
  • 2021-06-06
  • 1970-01-01
  • 2014-08-08
  • 2021-01-22
  • 2016-06-29
相关资源
最近更新 更多