【问题标题】:How to ensure all data belonging to a user goes to the same file when using spark?使用spark时如何确保属于用户的所有数据都转到同一个文件?
【发布时间】:2019-04-10 10:48:26
【问题描述】:

我们有一个用例来准备一个 spark 作业,该作业将从多个提供程序读取数据,其中包含有关以任意顺序存在的用户的信息,并将它们写回 S3 中的文件。现在,条件是,用户的所有数据都必须存在于单个文件中。大约有 100 万独立用户,每个用户最多有大约 10KB 的数据。我们考虑最多创建 1000 个文件,并让每个文件包含大约 1000 个用户的记录。

我们使用 java 数据框 API 来针对 spark 2.4.0 创建作业。我无法理解这样做最合乎逻辑的方式是什么?我是否应该对用户 ID 进行分组操作,然后以某种方式收集行,除非我达到 1000 个用户,然后翻转(如果可能的话)或者有更好的方法。非常感谢任何帮助或正确方向的提示。

更新

按照答案中的建议后,我继续使用以下代码 sn-p,但我仍然看到正在写入 200 个文件,而不是 1000 个。

Properties props = PropLoader.getProps("PrepareData.properties");
SparkSession spark = SparkSession.builder().appName("prepareData").master("local[*]")
    .config("fs.s3n.awsAccessKeyId", props.getProperty(Constants.S3_KEY_ID_KEY))
    .config("fs.s3n.awsSecretAccessKey", props.getProperty(Constants.S3_SECERET_ACCESS_KEY)).getOrCreate();

Dataset<Row> dataSet = spark.read().option("header", true).csv(pathToRead);
dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.close();

但是,如果我使用 100,而不是 1000,那么我会看到 100 个文件。然后我点击@Alexandros 分享的链接,下面的代码 sn-p 在它们各自的目录中生成了超过 20000 个文件,而且执行时间也像疯了一样。

dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);

【问题讨论】:

标签: java apache-spark


【解决方案1】:

您可以使用重新分区然后合并功能。

 Df.repartion(user_id).coalese(1000)

 Df.repartion(user_id,1000)

第一个保证不会有任何空分区,而在第二个解决方案中,某些分区可能是空的。

参考:Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?

https://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/DataFrame.html#coalesce(int)

更新:

为了完成这项工作

dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.sql.shuffle.partitions(默认值:200)。由于它不提供 1000 个文件,但适用于 100 个文件。要使其正常工作,您必须首先将其遣返到 1000 个分区,这与方法 2 相同。

dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);

我认为上面的代码将创建一百万个或更多文件而不是 1000 个。

dataSet.repartition(1000, dataSet.col("idvalue")).write().parquet(pathToWrite);

它将创建 1000 个文件,但您必须在完成文件写入后通过读取每个文件来创建 id 和文件之间的映射。

【讨论】:

  • 我猜默认情况下我们有 200 个分区。使用您提到的第一种方法,当我们执行合并到 1000,即增加分区数时,是否有可能将某些用户的数据拆分到多个分区?
  • 我们将分区增加到 100 万个,然后将其减少到 1000 个。我认为与 shuffle=false 合并,默认情况下它只是组合现有分区以获得所需数量的分区。在您的情况下,合并将通过合并 1000 个现有分区来创建新分区。参考:linkedin.com/pulse/… 此外,一旦您使用 df.write(path) 写入数据帧,您将无法跟踪哪些值在哪个文件中,一旦您完成写入文件,您将需要创建另一个数据结构来跟踪它。
  • 您好,您能否查看问题中的更新,因为该建议不起作用。我在这里错过了什么吗?
  • 更新部分的第一个选项有效,但使用 --conf spark.sql.shuffle.partitions=1000 选项
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-15
  • 2011-02-22
  • 1970-01-01
  • 2013-09-10
相关资源
最近更新 更多