【发布时间】:2020-05-07 12:58:43
【问题描述】:
我想在 spark 中创建一个 udf 以使用不适合 scala fp 样式且仅修改其内部状态的 Java 数据结构。为简化起见,这是我正在使用的骨架 java 类。
public class MagicStats {
List<Long> rawData;
public void processDataPoint(long dataPoint) {
rawData.add(dataPoint);
//some magic processing
}
public void merge(MagicStats anotherMagicStats) {
//merge with another instance to combine states
}
public long eval() {
//do some magic with data
//return result
}
}
关于我在这门课上要做什么的更多背景知识。我有一些按天存储的数据,对于每天的分区,我会生成一些汇总统计信息,包括计数、平均值等,以及这个特殊的 MagicStats(将由课堂上的eval() 获得)并将它们保存在数据库中。这个 MagicStats 的特别之处在于:
- 我需要数据的每日 MagicStats 结果。
- 我需要每月汇总每日 MagicStats 结果(无法从每日结果进行算术计算,只能由班级处理)。
如您所见,第二个要求意味着我必须为每个每日分区拍摄 MagicStats 对象的快照,并将其作为原始字节存储在数据库的列中,以便在每月聚合时我可以重建所有30 个来自字节数组的内存中的 MagicStats 对象并调用merge(MagicStats) and then eval() 以正确聚合。
现在是困难的部分。如何创建一个不从输入流返回结果而是修改 java 对象的内部状态的 udf?这是我卡在的地方(下面的伪代码):
//input_monthly_data
// +----------+------+
// | day | value|
// +----------+------+
// |2020-01-01| 3000|
// |2020-01-02| 4500|
// |2020-01-03| 3500|
// |..........| ....|
// +----------+------+
val df = spark.read.json("input_monthly_data.json")
df.groupby("day").agg(MyUDF(data).as("daily stats").select("daily stats", "avg", "count").saveAsTable()
class MyUDF extends UserDefinedFunction {
def apply(input: Long): Column = {
//create a static MagicStats instance
//update the state of the instance per data point
//serialize this instance to bytes after done for each day's partition
//return the bytes for later persistence
}
}
//output_monthly_data
// +----------+------+-----------------+
// | day | count| MagicStats bytes|
// +----------+------+-----------------+
// |2020-01-01| 10 | some binary. |
// |2020-01-02| 20 | some binary. |
// |2020-01-03| 25 | some binary. |
// |..........| ....| some binary. |
// +----------+------+-----------------|
任何关于如何使这个 UDF 工作或以其他方式实现我的目标的建议将不胜感激!
【问题讨论】:
标签: scala apache-spark apache-spark-sql