【问题标题】:The right way to aggregate and combine RDDs聚合和组合 RDD 的正确方法
【发布时间】:2018-02-10 23:30:42
【问题描述】:

我有一个客户表,其中包含有关每个客户的多个流程的信息。

目标是为每个客户和每个流程提取特征。这意味着每个特性主要是对 .groupby(customerID, processID) 对象的聚合或排序比较计算。

但是,我们的目标是能够随着时间的推移添加越来越多的功能。所以基本上用户应该能够定义一个带有一些过滤器、指标和聚合的新函数,并将这个新函数添加到对表进行操作的函数池中。

输出应该是一个 customerID、processID 表,包含所有功能。

所以我开始了一个最小的工作示例:

l = [('CM1','aa1', 100,0.1),('CM1','aa1', 110,0.2),\
     ('CM1','aa1', 110,0.9),('CM1','aa1', 100,1.5),\
     ('CX2','bb9', 100,0.1),('CX2','bb9', 100,0.2),\
    ('CX2','bb9', 110,6.0),('CX2','bb9', 100,0.18)]

rdd = sc.parallelize(l)

df = sqlContext.createDataFrame(rdd,['customid','procid','speed','timestamp'])

+--------+------+-----+---------+
|customid|procid|speed|timestamp|
+--------+------+-----+---------+
|     CM1|   aa1|  100|      0.1|
|     CM1|   aa1|  110|      0.2|
|     CM1|   aa1|  110|      0.9|
|     CM1|   aa1|  100|      1.5|
|     CX2|   bb9|  100|      0.1|
|     CX2|   bb9|  100|      0.2|
|     CX2|   bb9|  110|      6.0|
|     CX2|   bb9|  100|     0.18|
+--------+------+-----+---------+

然后我定义了 2 个任意特征,这些特征由这些函数提取:

def extr_ft_1 (proc_data, limit=100):

    proc_data = proc_data.filter(proc_data.speed > limit).agg(count(proc_data.speed))

    proc_data = proc_data.select(col('count(speed)').alias('speed_feature'))

    proc_data.show()

    return proc_data


def extr_ft_0 (proc_data):

    max_t = proc_data.agg(spark_max(proc_data.timestamp))

    min_t = proc_data.agg(spark_min(proc_data.timestamp))

    max_t = max_t.select(col('max(timestamp)').alias('max'))

    min_t = min_t.select(col('min(timestamp)').alias('min'))

    X = max_t.crossJoin(min_t)

    X = X.withColumn('time_feature', X.max+X.min)

    X = X.drop(X.min).drop(X.max)

    X.show()

    return (X)

它们返回仅包含聚合值的 1 元素 RRD。 接下来,将所有特征函数应用于给定进程,并组合成每个进程的结果 RDD:

def get_proc_features(proc, data, *features):

    proc_data = data.filter( data.customid == proc)

    features_for_proc = [feature_value(proc_data) for feature_value in features]



    for number, feature in enumerate(features_for_proc):

        if number == 0:

            l = [(proc,'dummy')]

            rdd = sc.parallelize(l)

            df = sqlContext.createDataFrame(rdd,['customid','dummy']) 

            df = df.drop(df.dummy)

            df.show()

            features_for_proc_rdd = feature

            features_for_proc_rdd = features_for_proc_rdd.crossJoin(df)

            continue

        features_for_proc_rdd = features_for_proc_rdd.crossJoin(feature)

        features_for_proc_rdd.show()

    return features_for_proc_rdd

他们的最后一步是将包含每个进程的特征的所有行附加到一个数据帧:

for number, proc in  enumerate(customer_list_1):

    if number == 0:

        #results = get_trip_features(trip, df, extr_ft_0, extr_ft_1)
        results = get_proc_features(proc, df, *extr_feature_funcs)

        continue

    results = results.unionAll(get_proc_features(proc, df, *extr_feature_funcs))

results.show()

转换链是这样的:

为客户 1 获取功能 1 和 2:

+------------+
|time_feature|
+------------+
|         1.6|
+------------+

+-------------+
|speed_feature|
+-------------+
|            2|
+-------------+

将它们组合成:

+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
|         1.6|     CM1|            2|
+------------+--------+-------------+

对客户 2 执行相同操作,并将所有 RDD 附加到最终结果 RDD:

+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
|         1.6|     CM1|            2|
|         6.1|     CX2|            1|
+------------+--------+-------------+

如果我在集群上运行代码,它适用于 2 个客户。 但是当我在相当数量的客户上测试它时,我得到的主要是 GC 和堆内存错误。

我在这里与许多 RDD 合作吗?恐怕我的代码效率很低,但我不知道从哪里开始优化它。我想我只是在最后调用一个动作(我在实时模式下放弃所有 show() 并只收集最后一个 RDD)。 非常感谢您的帮助。

【问题讨论】:

  • 您为集群使用了哪些配置(执行器数量、内存...)您的数据大小是多少。请注意,如果您在不增加内存开销的情况下增加了内存,它将无效
  • 数据大约 160 TB,我认为是 200 个执行者。关于我现在不记得的记忆。
  • 您的代码需要重构,问题不在于 RDD,而在于您过滤它以处理单一键然后交叉连接。遍历值会使您失去 pyspark 的分布式方面。最好的方法是使用数据框和窗口函数。请记住,如果您不需要另一张工作台的功能,则应始终保留一张工作台。我会帮你的,但首先customer_list_1extr_feature_funcs 是什么?
  • customer_list_1 = ['CM1', 'CX2'] 和 extr_feature_funcs = (extr_ft_0, extr_ft_1)。它原本是一个参数组,可以添加任何希望应用于数据的特征提取函数。我看到了你的正确答案玛丽,我现在在 moba 上,并在家里尝试一下

标签: python apache-spark pyspark rdd pyspark-sql


【解决方案1】:

您的代码需要重构,问题不是 RDD,而是您过滤它以处理单一键然后交叉连接的事实。遍历值会使您失去 pyspark 的分布式方面。请记住,如果您不需要另一张工作台的功能,则应始终保留一张工作台。

最好的方法是使用数据框和窗口函数。

首先让我们重写你的函数:

import pyspark.sql.functions as psf
def extr_ft_1 (proc_data, w, limit=100):
    return proc_data.withColumn(
        "speed_feature", 
        psf.sum((proc_data.speed > limit).cast("int")).over(w)
    )

def extr_ft_0(proc_data, w):
    return proc_data.withColumn(
        "time_feature", 
        psf.min(proc_data.timestamp).over(w) + psf.max(proc_data.timestamp).over(w)
    )

w 是一个窗口规范:

from pyspark.sql import Window

w = Window.partitionBy("customid")
df1 = extr_ft_1(df, w)
df0 = extr_ft_0(df1, w)
df0.show()

    +--------+------+-----+---------+-------------+------------+
    |customid|procid|speed|timestamp|speed_feature|time_feature|
    +--------+------+-----+---------+-------------+------------+
    |     CM1|   aa1|  100|      0.1|            2|         1.6|
    |     CM1|   aa1|  110|      0.2|            2|         1.6|
    |     CM1|   aa1|  110|      0.9|            2|         1.6|
    |     CM1|   aa1|  100|      1.5|            2|         1.6|
    |     CX2|   bb9|  100|      0.1|            1|         6.1|
    |     CX2|   bb9|  100|      0.2|            1|         6.1|
    |     CX2|   bb9|  110|      6.0|            1|         6.1|
    |     CX2|   bb9|  100|     0.18|            1|         6.1|
    +--------+------+-----+---------+-------------+------------+

在这里,我们永远不会丢失信息(我们保留所有行),因此如果您想添加额外的功能,您可以。如果您想要最终的汇总结果,只需通过 groupBy("customid") 运行它。

请注意,您还可以修改窗口规范中的聚合键,例如包含 procid

【讨论】:

  • 非常感谢玛丽!我今天测试了它,它就像一个魅力:) 没有内存错误,脚本运行速度比预期的快。
  • 没问题,很高兴能帮上忙 :)
猜你喜欢
  • 2016-03-20
  • 2019-11-02
  • 1970-01-01
  • 1970-01-01
  • 2016-09-10
  • 1970-01-01
  • 2018-04-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多