【问题标题】:List Transformation With Lambdas in Spark在 Spark 中使用 Lambda 进行列表转换
【发布时间】:2017-05-08 18:57:30
【问题描述】:

我正在尝试获取包含整数范围对的 RDD,并对其进行转换,以便每一对都有第三项,该项迭代该范围中的可能值。基本上,我有这个:

[[1,10], [11,20], [21,30]]

我想结束这个:

[[1,1,10], [2,1,10], [3,1,10], [4,1,10], [5,1,10]...]

我要转换的文件非常大,这就是为什么我希望使用 PySpark 而不是仅在本地机器上使用 Python(我有一种方法可以在 CSV 文件上本地进行,但考虑到文件的大小,该过程需要几个小时)。到目前为止,我得到了这个:

a = [[1,10], [11,20], [21,30]]
b = sc.parallelize(a)
c = b.map(lambda x: [range(x[0], x[1]+1), x[0], x[1]])
c.collect()

产量:

>>> c.collect()
[[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 1, 10], [[11, 12, 13, 14, 15, 16, 17, 18, 19, 20], 11, 20], [[21, 22, 23, 24, 25, 26, 27, 28, 29, 30], 21, 30]]

我无法从这里弄清楚下一步需要做什么,迭代扩展的范围,并将每一个与范围分隔符配对。

有什么想法吗?

编辑 2017 年 5 月 8 日下午 3:00

适用于 CSV 输入的本地 Python 技术是:

import csv
import gzip
csvfile_expanded = gzip.open('C:\output.csv', 'wb')
ranges_expanded = csv.writer(csvfile_expanded, delimiter=',', quotechar='"')
csvfile = open('C:\input.csv', 'rb')
ranges = csv.reader(csvfile, delimiter=',', quotechar='"')
for row in ranges:
    for i in range(int(row[0]),int(row[1])+1):
         ranges_expanded.writerow([i,row[0],row[1])

我质疑的 PySpark 脚本以 CSV 文件开始,该文件已经加载到 HDFS 并转换为 RDD。

【问题讨论】:

    标签: python apache-spark lambda pyspark


    【解决方案1】:

    试试这个:

    c = b.flatMap(lambda x: ([y, x[0], x[1]] for y in xrange(x[0], x[1]+1)))
    

    flatMap() 确保您为范围的每个元素获得一个输出记录。还要注意外部 ( )xrange 的结合——这是一个生成器表达式,可避免在执行程序的内存中实现整个范围。

    注意:xrange() 是 Python2。如果您正在运行 Python3,请使用 range()

    【讨论】:

    • 效果很好!非常感谢您的帮助和解释。我不知道如何在 lambda 中插入 for 循环,但看到您的解决方案很有意义。
    猜你喜欢
    • 2015-10-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多