【问题标题】:Spark deduplication of RDD to get bigger RDD对RDD进行Spark重复数据删除以获得更大的RDD
【发布时间】:2017-06-07 15:51:26
【问题描述】:

我有一个从磁盘加载的数据框

df_ = sqlContext.read.json("/Users/spark_stats/test.json")

它包含 500k 行。
我的脚本在这个大小上运行良好,但我想在 5M 行上测试它,有没有办法将 df 复制 9 次? (对我来说,在 df 中有重复并不重要)

我已经在使用 union 但它真的太慢了​​(因为我认为它每次都从磁盘读取)

df = df_
for i in range(9): 
    df = df.union(df_)

您是否有一个干净的方法来做到这一点?

谢谢

【问题讨论】:

  • 从数据源读取数据后使用 .cache()。
  • 谢谢,谢谢,效果很好

标签: duplicates pyspark union


【解决方案1】:

你可以使用爆炸。它应该只从原始磁盘读取一次:

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([StructField("f1", StringType()), StructField("f2", StringType())])

data = [("a", "b"), ("c", "d")]
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)

# Create an array with as many values as times you want to duplicate the rows
dups_array = [lit(i) for i in xrange(9)]
duplicated = df.withColumn("duplicate", array(*dups_array)) \
               .withColumn("duplicate", explode("duplicate")) \
               .drop("duplicate")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-21
    • 2015-03-05
    • 2015-07-26
    • 2021-06-04
    • 1970-01-01
    相关资源
    最近更新 更多