【问题标题】:Reducing by (K,V) pairs and sort by V按 (K,V) 对减少并按 V 排序
【发布时间】:2019-05-03 14:29:31
【问题描述】:

我对 pyspark 和 RDD 非常陌生。抱歉,如果这个问题非常初级。

我已使用以下代码对数据进行映射和清理:

delay = datasplit.map(lambda x: ((x[33], x[8], x[9]))).filter(lambda x: x[0]!= u'0.00').filter(lambda x: x[0]!= '')

但现在我需要以某种方式转换为以下输出:

(124, u'"OO""N908SW"')
(432, u'"DL""N810NW"')

其中第一个是上面提到的x[33] 的总和,当按 x[8] 和 x[9] 的组合分组时

我已完成映射并获得以下输出(接近)

lines = delay.map(lambda x: (float(x[0]), [x[1], x[2]]))

输出:

[(-10.0, [u'OO', u'N908SW']),(62, [u'DL', u'N810NW]), (-6.0, [u'WN', w'N7811F'])]

但我不知道如何减少或组合 x[1]x[2] 以创建上面显示的输出。

提前致谢。

【问题讨论】:

    标签: python pyspark rdd reduce


    【解决方案1】:

    您可以在下面创建key like并应用reduceByKey然后映射以获得统一的key:

    from operator import add
    result = delay.map(lambda x: ((x[1], x[2]), x[0])) \
                      .reduceByKey(add).map(lambda x: (x[0][1] + x[0][2], x[1]))
    

    【讨论】:

    • 谢谢。我试过这个,但我收到一条错误消息:IndexError: tuple index out of range
    【解决方案2】:

    作为一般的经验法则,您需要尽可能少的 python 操作。

    我将您的代码减少为一个map 和一个reduce

    import operator
    
    delay_sum = datasplit\
        .map(lambda x: (x[8]+x[9], float(x[33]) if any(x[33]) else 0.0))\
        .reduceByKey(operator.add)
    

    不用说,这些操作在使用 spark 数据帧时通常运行得更快。

    【讨论】:

    • 这可能是错误的!正如您假设 x[8] + x[9] 与一对 (x[8], x[9]) 有一对一的映射,但通常情况并非如此。
    • 这是 OP 要求的。这是针对他的具体案例和假设的解决方案,而不是针对任何一般未指定的案例。
    • OP只是提供了一个样本!因此,此代码仅适用于他的示例,而不是更多。
    猜你喜欢
    • 2018-07-08
    • 2013-04-24
    • 1970-01-01
    • 2022-11-29
    • 2016-11-07
    • 2011-04-21
    • 1970-01-01
    • 2012-08-26
    • 1970-01-01
    相关资源
    最近更新 更多