【问题标题】:Spark Window Function: Referencing different columns for rangeSpark Window 函数:为范围引用不同的列
【发布时间】:2017-12-18 22:09:30
【问题描述】:

我有一个包含start_timeend_time 列的DataFrame。我想设置窗口,每个观察的窗口是结束时间之前的两行,仅限于在观察的start_time 之前带有end_time 的数据。

示例数据:

data = [('a', 10, 12, 5),('b', 20, 25, 10),('c', 30, 60, 15),('d', 40, 45, 20),('e', 50, 70, 25)]
df = sqlContext.createDataFrame(data, ['name', 'start_time', 'end_time', 'resource'])
+----+----------+--------+--------+
|name|start_time|end_time|resource|
+----+----------+--------+--------+
|   a|        10|      12|       5|
|   b|        20|      25|      10|
|   c|        30|      60|      15|
|   d|        40|      45|      20|
|   e|        50|      70|      25|
+----+----------+--------+--------+

所以'e'的窗口应该包括'b'和'd',而不是'c'

不受结束时间

from pyspark.sql import Window        
from pyspark.sql import functions as func
window = Window.orderBy("name").rowsBetween(-2, -1)
df.select('*', func.avg("resource").over(window).alias("avg")).show()

我查看了rangeBetween(),但我无法找到引用当前行的start_time 的方法,或者我想通过其他行的end_time 来限制它。有Window.currentRow,但在这个例子中它只会引用resource的值

这可以使用 Window 来实现吗?我应该完全尝试其他东西吗?

编辑:如果重要,请使用 Spark 2.1.1 和 Python 2.7+。

【问题讨论】:

  • partitionBy 是什么?没有它,您最终会在同一个分区和单个执行程序上得到所有行,这将用大型数据集杀死它。
  • 是的,我的实际数据非常大,所以我有一个 partitionBy 似乎工作正常 - 它是 name 的组,每个组都有一个 a、b、c,等等。当我运行代码而不受只考虑结束时间

标签: apache-spark pyspark spark-dataframe window-functions pyspark-sql


【解决方案1】:

您实际上可以使用 groupBy 函数来聚合不同的分区,然后在相同的公共键上使用输出数据帧之间的内部连接。分区依据或窗口函数在 Spark 中需要很长时间,因此如果可以的话,最好使用 groupby。

【讨论】:

    【解决方案2】:

    我认为纯粹使用 Windows 是不可能的。从给定的行开始,您需要能够以反向排序顺序返回之前的行,直到您有两个满足您的条件的命中。

    您可以使用窗口函数来创建每行遇到的所有先前值的列表,然后使用带有一些纯 scala/python 的 UDF 来确定总和,考虑您的排除。

    在斯卡拉中:

    val window = Window.partitionBy(???).orderBy("end_time").rowsBetween(Long.MinValue, -1)
    
    val udfWithSelectionLogic = udf { values: Seq[Row] => INSERT_LOGIC_HERE_TO_CALCULATE_AGGREGATE }
    
    val dataPlus = data.withColumn("combined", struct($"start_time", $"end_time", $"resource"))
            .withColumn("collected", collect_list($"combined") over window)
            .withColumn("result", udfWithSelectionLogic($"collected"))
    

    这并不理想,但可能会有所帮助。

    【讨论】:

      猜你喜欢
      • 2019-07-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-29
      • 2017-06-08
      • 1970-01-01
      相关资源
      最近更新 更多