【问题标题】:Retrive only X records from PySpark GroupBy / OrderBy without aggregation仅从 PySpark GroupBy / OrderBy 检索 X 记录而不进行聚合
【发布时间】:2021-02-01 13:47:58
【问题描述】:

假设我有一个这样的数据集(更大的 ofc):

Product Serial Feature 1 Feature 2
1234 123 1.1 2.2
1234 123 1.3 2.3
5678 456 1.4 2.4
5678 456 1.5 2.5
9111 567 1.6 2.6
1234 124 1.7 2.7

将产品视为一种产品类型,将序列视为特定部件的标识号。我想按“产品”和“序列”对值进行排序,并按相同的列(“产品”和“序列”)对数据进行分组。之后,我想遍历这些组,例如,我可以获取属于组 2-4 的数据。在 pandas 中,我通过以下方式实现了这一点:

ds = dataset.reset_index(drop=True)
ds.sort_values(by=['Product','Serial'], inplace=True)
ds = ds.groupby(['Product','Serial'])

#Add index to a list for later

listDF = []
dsSize = 0

for key,item in ds:
    listDF.append(key)
    dsSize = dsSize +1
print("Group Size: ", dsSize)

稍后我会简单地遍历这个索引列表并获得我想要的组。

for i in trainInterval:
    sTrain.append(ds.get_group(listDF[i]))

#Create new datasets
train = pd.DataFrame()
train = train.append([pd.DataFrame(i) for i in sTrain], ignore_index = True)

其中 trainInterval 是我根据需要更改的范围。 出于可扩展性的目的,我正在尝试将所有代码转换为 PySpark,尽管我已经阅读了 RDD 编程指南,但我在基本功能方面遇到了一些麻烦。到目前为止,我已经尝试了两种方法,但我都陷入了同一阶段:

1:

ds.orderBy(col("Product"),col("Serial"))
ds = screwingProfile.groupBy(col("Product"),col("Serial"))

我不知道从哪里开始,因为似乎不可能迭代 DataGroup 对象(我可能做错了什么)。

2:

w1 = Window.partitionBy('Product','Serial').orderBy('Product','Serial')

我不知道下一步该做什么。我在网上看到的大多数解决方案都对列进行了一些数学运算,我只是希望它们按指示进行分组,这样我就可以编写一个数据帧以发送到我的机器学习算法。获得这些组后,我可以将它们附加在一起(与联合?)?

编辑: 为清楚起见,此表说明了“分组”df。我有一个产品有多个系列,而一个系列有多个记录。

+-------+------+--------+--------+
|Product|Serial|Feature1|Feature2|
+-------+------+--------+--------+
|   1234|   123|     1.1|     2.2|
|       |      |     1.3|     2.4|
|       |      |     1.7|     2.7|
|       |      |     3  |     2.8|
|       |      |     2  |     2.9|
|       |      |     1.4|     2.4|
|       |   124|     1.7|     2.6|
|       |      |     1.2|     2.4|
|       |      |     1.9|     2.2|
|       |      |    3 .1|     2.3|
|       |      |     2  |     2.4|
|       |      |     1.9|     2.4|
|  ...  |  ... |   ...  |   ...  |
|  ...  |  ... |   ...  |   ...  |
|  ...  |  ... |   ...  |   ...  |
+-------+------+--------+--------+

编辑 2:

我能够使用此代码继续我的任务:

    w = Window.partitionBy('product').orderBy(F.asc('serial'))
    x = screwingProfile.withColumn("rank",rank().over(w)) \
        .orderBy(column_list)

现在我得到了这个输出:

+--------+-------+-------+--------+----+
| Product| Serial|Featur1|Featur2 |rank|
+--------+-------+-------+--------+----+
|    A   |  123  |    1.1|     9.1|   1|
|    A   |  123  |    2.1|       1|   1|
|    A   |  123  |    1.1|     9.2|   1|
|    A   |  456  |    3.1|     7.3|4112|

这与我想要的非常接近,但对于“第二组”,我期望排名为 2 而不是 4112。也许我不应该使用排名并使用其他任何东西?我尝试了“monotonically_increasing_id()”,但它在窗口函数中不受支持。还尝试了dense_rank,起初看起来它完成了我想要的,但是在下一个系列中,说789它的等级值再次是1而不是3。

提前致谢

【问题讨论】:

    标签: python pyspark window


    【解决方案1】:

    使用row_number 会给你正确的顺序

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-09-15
      • 2019-06-16
      • 2018-07-05
      • 1970-01-01
      • 2019-01-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多