【问题标题】:max() operation on spark without reducing the row count在不减少行数的情况下对 spark 进行 max() 操作
【发布时间】:2017-09-06 13:12:29
【问题描述】:

假设我们有一个包含三列的数据集,分别是客户 ID、操作和操作时间。

 1, ACTION_1, 100
 1, ACTION_2, 101
 1, ACTION_3, 102
 2, ACTION_1, 100
 2, ACTION_2, 105
 2, ACTION_3, 102
 3, ACTION_1, 120
 3, ACTION_2, 111
 3, ACTION_3, 103

我们希望在过滤某些特定操作(例如 ACTION_2)时获取每个客户的最后操作时间。如下:

 1, ACTION_2, 102
 2, ACTION_2, 105
 3, ACTION_2, 120

我们期待了解此问题的任何类型的解决方案。

【问题讨论】:

  • 你尝试过什么吗?
  • 如果我们尝试获取最后一个动作时间,我们会丢失我们想要的动作数据,在我们的例子中是 ACTION_2。
  • 所以只需为每个客户获取最后一个action time 并创建一个具有“ACTION_2”值的新列
  • 我们想要这个解决方案的原因是因为每个动作类型都有自己的计数。例如 (ACTION_2, 5) 5 次。为简单起见,我们省略了此信息。
  • 你需要一个窗口上的排名函数

标签: python apache-spark bigdata


【解决方案1】:

创建数据框:

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

data = [
 (1, 'ACTION_1', 100),
 (1, 'ACTION_2', 101),
 (1, 'ACTION_3', 102),
 (2, 'ACTION_1', 100),
 (2, 'ACTION_2', 105),
 (2, 'ACTION_3', 102),
 (3, 'ACTION_1', 120),
 (3, 'ACTION_2', 111),
 (3, 'ACTION_3', 103)]

df = sqlContext.createDataFrame(data, ['customerid', 'action', 'actiontime'])
df.show()

在由客户 ID 划分的窗口上使用 max 函数

from pyspark.sql import Window
from pyspark.sql.functions import max
w = Window.partitionBy(df.customerid)

df1 = df.withColumn('actiontime', max('actiontime').over(w))
df1.show()

根据条件过滤数据:

df2 = df1.where(df1.action == 'ACTION_2')
df2.show()
+----------+--------+----------+
|customerid|  action|actiontime|
+----------+--------+----------+
|         1|ACTION_2|       102|
|         3|ACTION_2|       120|
|         2|ACTION_2|       105|
+----------+--------+----------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-03-21
    • 1970-01-01
    • 1970-01-01
    • 2016-05-03
    • 2021-07-29
    • 2021-02-23
    • 1970-01-01
    相关资源
    最近更新 更多