【问题标题】:pySpark count IDs on conditionpySpark 根据条件计算 ID
【发布时间】:2018-07-13 06:03:50
【问题描述】:

我有以下数据集并使用 PySpark

df = sparkSession.createDataFrame([(5, 'Samsung', '2018-02-23'),
                                   (8, 'Apple', '2018-02-22'),
                                   (5, 'Sony', '2018-02-21'),
                                   (5, 'Samsung', '2018-02-20'),
                                   (8, 'LG', '2018-02-20')],
                                   ['ID', 'Product', 'Date']
                                  )

+---+-------+----------+
| ID|Product|      Date|
+---+-------+----------+
|  5|Samsung|2018-02-23|
|  8|  Apple|2018-02-22|
|  5|   Sony|2018-02-21|
|  5|Samsung|2018-02-20|
|  8|     LG|2018-02-20|
+---+-------+----------+
# Each ID will appear ALWAYS at least 2 times (do not consider the case of unique IDs in this df)

每个 ID 仅在代表较高频率时才应增加 PRODUCT 计数器。 如果频率相同,则应以最近的日期决定哪个产品获得 +1。

从上面的示例中,所需的输出将是:

+-------+-------+
|Product|Counter|
+-------+-------+
|Samsung|      1|
|  Apple|      1|
|   Sony|      0|
|     LG|      0|
+-------+-------+


# Samsung - 1 (preferred twice by ID=5)
# Apple - 1 (preferred by ID=8 more recently than LG)
# Sony - 0 (because ID=5 preferred Samsung 2 time, and Sony only 1) 
# LG - 0 (because ID=8 preferred Apple more recently) 

使用 PySpark 实现这一结果的最有效方法是什么?

【问题讨论】:

  • 如果产品是最新的并且只有一次首选,那么输出应该是什么。另一种情况是该产品多次受到青睐但最近不太受欢迎?
  • Samsung 为 1,因为 ID 只能将 +1 分配给一种产品,即与最高频率(或在相同频率的情况下,最近日期)相关联的产品
  • 如果给定的 ID 只有一条记录,则不必考虑。在这里,我只处理不唯一的 ID

标签: python apache-spark pyspark


【解决方案1】:

IIUC,您想为每个ID 挑选最常见的产品,使用 最近的Date

首先,我们可以使用以下方法获取每个产品/ID 对的计数:

import pyspark.sql.functions as f
from pyspark.sql import Window

df = df.select(
    'ID',
    'Product',
    'Date', 
    f.count('Product').over(Window.partitionBy('ID', 'Product')).alias('count')
)
df.show()
#+---+-------+----------+-----+
#| ID|Product|      Date|count|
#+---+-------+----------+-----+
#|  5|   Sony|2018-02-21|    1|
#|  8|     LG|2018-02-20|    1|
#|  8|  Apple|2018-02-22|    1|
#|  5|Samsung|2018-02-23|    2|
#|  5|Samsung|2018-02-20|    2|
#+---+-------+----------+-----+

现在您可以使用Window 为每个 ID 对每个产品进行排名。我们可以使用pyspark.sql.functions.desc()countDate 进行降序排序。如果row_number() 等于 1,则表示该行是第一行。

w = Window.partitionBy('ID').orderBy(f.desc('count'), f.desc('Date'))
df = df.select(
    'Product',
    (f.row_number().over(w) == 1).cast("int").alias('Counter')
)
df.show()
#+-------+-------+
#|Product|Counter|
#+-------+-------+
#|Samsung|      1|
#|Samsung|      0|
#|   Sony|      0|
#|  Apple|      1|
#|     LG|      0|
#+-------+-------+

最后groupBy() 产品并选择Counter 的最大值:

df.groupBy('Product').agg(f.max('Counter').alias('Counter')).show()
#+-------+-------+
#|Product|Counter|
#+-------+-------+
#|   Sony|      0|
#|Samsung|      1|
#|     LG|      0|
#|  Apple|      1|
#+-------+-------+

更新

这里有一个更简单的方法:

w = Window.partitionBy('ID').orderBy(f.desc('count'), f.desc('Date'))
df.groupBy('ID', 'Product')\
    .agg(f.max('Date').alias('Date'), f.count('Product').alias('Count'))\
    .select('Product', (f.row_number().over(w) == 1).cast("int").alias('Counter'))\
    .show()
#+-------+-------+
#|Product|Counter|
#+-------+-------+
#|Samsung|      1|
#|   Sony|      0|
#|  Apple|      1|
#|     LG|      0|
#+-------+-------+

【讨论】:

  • 这样计数器始终为 1。我希望每个唯一 id 在满足条件时增加产品的计数器
  • 我想它可以通过将 max() 更改为 sum() 来修复
  • @Alex 您能否请edit 提出您的问题,并举个例子说明它会有所作为。
猜你喜欢
  • 2018-08-07
  • 1970-01-01
  • 2019-05-29
  • 2022-01-15
  • 1970-01-01
  • 2021-12-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多