【问题标题】:Is there a way to control the number of partitions when reading a text file in PySpark在 PySpark 中读取文本文件时有没有办法控制分区数
【发布时间】:2016-03-19 19:45:14
【问题描述】:

我正在使用 PySpark 中的以下命令读取文本文件

rating_data_raw = sc.textFile("/<path_to_csv_file>.csv")

有没有办法指定 RDD rating_data_raw 应该分成多少个分区?我想指定大量的分区以获得更大的并发性。

【问题讨论】:

标签: python apache-spark pyspark rdd


【解决方案1】:

正如其他用户所说,您可以通过在textFile 的可选参数minPartitions 中设置来设置在读取文件时将创建的最小分区数。

rating_data_raw = sc.textFile("/<path_to_csv_file>.csv", minPartitions=128)

另一种方法是使用repartitioncoalesce,如果需要减少分区数可以使用coalesce,否则可以使用repartition

rating_data_raw = sc.textFile("/<path_to_csv_file>.csv").repartition(128)

【讨论】:

    【解决方案2】:

    也可以读取 .csv 文件,然后使用 df 到 RDD 转换检查分区。我在下面留下一个示例结构。

    dataset = spark.read.csv("data.csv", header=True, inferSchema='True')
    colsDrop = ("data_index", "_c17", "song_title", "artist")
    df = dataset.drop(*colsDrop)
    rdd = sc.parallelize(df.collect()).partitionBy(8)
    

    这里 .partitionBy() 允许您控制 RDD 对象的分区号。也可以使用 .getNumPartition() 方法找出这些数字。

    唯一需要注意的是,在 CPU 上分配的分区数多于线程数不会给我们带来速度提升。

    例如,我的 CPU 中的线程数是 8,您可以在下面看到一个示例时间分布。

    如你所见,我在 8 个分区后无法加快速度。

    【讨论】:

    • 另外,如果你想通过 df 或 sql 使用 RDD API,默认分区号为 1。trainingData=df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1]))print("Partition: ",trainingData.getNumPartitions())响应:分区:1
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-08-04
    • 1970-01-01
    • 2020-12-29
    • 2010-11-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多