我们可以将您的意见视为
input=[('E7750A37CAB07D0DFF0AF7E3573AC141',0.03333333333333333,0.44,1.0,0.0,0.0,3.5),('778C92B26AE78A9EBDF96B49C67E4007',0.03333333333333333,0.71,1.0,0.0,1.0,4.0),('BE317B986700F63C43438482792C8654',0.03333333333333333,0.48,1.0,0.0,0.0,4.0)]
首先,您可以使用reduceByKey()函数按键根据分组添加元素。
但要使用它,我们必须创建一个 PairRDD,它只是一个元组的 RDD,其中第一个元素始终是键(尽管您可以使用 RDD 上的 keyBy 函数更改它)。
首先,读取输入:
input = sc.parallelize(input) #creating an RDD
在我们的输入中,第一个元素是键。现在,我们要将每个数字及其关联的键放入输入中。我们想要这样的东西:
('E7750A37CAB07D0DFF0AF7E3573AC141', 0.0), ('E7750A37CAB07D0DFF0AF7E3573AC141', 0.03333333333333333), ('E7750A37CAB07D0DFF0AF7E3573AC141', 0.44), ('E7750A37CAB07D0DFF0AF7E3573AC141', 1.0), ('E7750A37CAB07D0DFF0AF7E3573AC141', 0.0), ('E7750A37CAB07D0DFF0AF7E3573AC141', 0.0) , ('E7750A37CAB07D0DFF0AF7E3573AC141', 3.5), ('778C92B26AE78A9EBDF96B49C67E4007', 0.0), ('778C92B26AE78A9EBDF96B49C67E4007', 0.03333333333333333), ('778C92B26AE78A9EBDF96B49C67E4007', 0.71), ('778C92B26AE78A9EBDF96B49C67E4007', 1.0), ('778C92B26AE78A9EBDF96B49C67E4007', 0.0), ( '778C92B26AE78A9EBDF96B49C67E4007', 1.0), ('778C92B26AE78A9EBDF96B49C67E4007', 4.0), ('BE317B986700F63C43438482792C8654', 0.0), ('BE317B986700F63C43438482792C8654', 0.03333333333333333), ('BE317B986700F63C43438482792C8654', 0.48), ('BE317B986700F63C43438482792C8654', 1.0), ('BE317B986700F63C43438482792C8654 ', 0.0), ('BE317B986700F63C43438482792C8654', 0.0), ('BE317B986700F63C43438482792C8654', 4.0)
为此,我们可以使用 lambda 函数来遍历 RDD 中的每个元素(例如,('E7750A37CAB07D0DFF0AF7E3573AC141',0.03333333333333333,0.44,1.0,0.0,0.0,3.5)),并在每个元素下使用列表推导迭代整数元素,例如,
(lambda x: [(x[0],y) for y in x])
在列表推导中,我们不想自己创建 x[0] 的元组。因此,使用 if else 删除它。
lambda x: [(x[0],y) if y != x[0] else (x[0],0.000) for y in x]
现在,我们可以这样写:
input2 = input.map(lambda x: [(x[0],y) if y != x[0] else (x[0],0.000) for y in x])
input2.collect()
[[('E7750A37CAB07D0DFF0AF7E3573AC141', 0.0), ('E7750A37CAB07D0DFF0AF7E3573AC141', 0.03333333333333333), ('E7750A37CAB07D0DFF0AF7E3573AC141', 0.44), ('E7750A37CAB07D0DFF0AF7E3573AC141', 1.0), ('E7750A37CAB07D0DFF0AF7E3573AC141', 0.0), ('E7750A37CAB07D0DFF0AF7E3573AC141', 0.0), ('E7750A37CAB07D0DFF0AF7E3573AC141', 3.5)], [('778C92B26AE78A9EBDF96B49C67E4007', 0.0), ('778C92B26AE78A9EBDF96B49C67E4007', 0.03333333333333333), ('778C92B26AE78A9EBDF96B49C67E4007', 0.71), ('778C92B26AE78A9EBDF96B49C67E4007', 1.0), ('778C92B26AE78A9EBDF96B49C67E4007', 0.0), ('778C92B26AE78A9EBDF96B49C67E4007', 1.0), ('778C92B26AE78A9EBDF96B49C67E4007', 4.0)], [('BE317B986700F63C43438482792C8654', 0.0), ('BE317B986700F63C43438482792C8654', 0.03333333333333333), ('BE317B986700F63C43438482792C8654', 0.48), ('BE317B986700F63C43438482792C8654', 1.0), ('BE317B986700F63C43438482792C8654', 0.0), ('BE317B986700F63C43438482792C8654', 0.0), ('BE317B986700F63C43438482792C8654', 4)]]
在上面的输出中,我们得到了列表列表,因此我们需要将其展平为一个列表。
input3 = input2.flatMap(lambda x: x)
input3.collect()
我们可以将所有这些放在一行中:
input2 = input.flatMap(lambda x: [(x[0],y) if y != x[0] else (x[0],0.000) for y in x])
最后,使用reduceByKey:
from operator import add
finalOutput = input2.reduceByKey(add)
finalOutput.collect()
[('778C92B26AE78A9EBDF96B49C67E4007', 6.743333333333333), ('BE317B986700F63C43438482792C8654', 5.513333333333334), ('E7750A37CAB07D0DFF0AF7E3573AC141', 4.973333333333334)]
希望我的回答对你有帮助!