【问题标题】:Count number of rows in an RDD计算 RDD 中的行数
【发布时间】:2015-02-09 15:37:50
【问题描述】:

我在 java 中使用 spark,我有一个 500 万行的 RDD。有没有一种解决方案可以让我计算我的 RDD 的行数。我试过RDD.count(),但这需要很多时间。我已经看到我可以使用函数fold。但是我没有找到这个函数的java文档。 您能否告诉我如何使用它或告诉我另一种解决方案来获取我的 RDD 的行数。

这是我的代码:

JavaPairRDD<String, String> lines = getAllCustomers(sc).cache();
JavaPairRDD<String,String> CFIDNotNull = lines.filter(notNull()).cache();
JavaPairRDD<String, Tuple2<String, String>> join =lines.join(CFIDNotNull).cache();

double count_ctid = (double)join.count(); // i want to get the count of these three RDD
double all = (double)lines.count();
double count_cfid = all - CFIDNotNull.count();
System.out.println("********** :"+count_cfid*100/all +"% and now : "+ count_ctid*100/all+"%");

谢谢。

【问题讨论】:

    标签: java apache-spark


    【解决方案1】:

    您的想法是正确的:使用rdd.count() 来计算行数。没有更快的方法。

    我认为你应该问的问题是为什么rdd.count() 这么慢?

    答案是rdd.count() 是一个“动作”——它是一个急切的操作,因为它必须返回一个实际的数字。您在count() 之前执行的 RDD 操作是“转换”——它们将一个 RDD 懒惰地转换为另一个 RDD。实际上,转换并没有真正执行,只是排队。当你调用count() 时,你强制执行所有之前的惰性操作。现在需要加载输入文件,执行map()s 和filter()s,执行随机播放等,直到最后我们有了数据并且可以说出它有多少行。

    请注意,如果您调用count() 两次,这一切都会发生两次。计数返回后,丢弃所有数据!如果你想避免这种情况,请在 RDD 上调用cache()。然后对count() 的第二次调用将很快,并且派生的RDD 将更快地计算。但是,在这种情况下,RDD 必须存储在内存(或磁盘)中。

    【讨论】:

    • @Daniel Darabos 用于分析执行逻辑上不同的任务(读取、转换和写入)所花费的时间我的应用程序,我需要绕过Spark惰性评估。所以我在我的代码中插入了一些df.cache.count 调用。这会显着影响性能和/或产生其他影响吗?我在Spark 2.3.0 并使用Scala 2.11.11
    • 我认为它可能会对性能产生重大影响。如果您添加缓存,则将在此时存储和检索数据。即使没有序列化,这也不是微不足道的开销。但我也不知道更好的方法来做你想做的事情。您的基准仍然应该代表不同任务所花费的时间。此外,您还可以对无缓存版本进行基准测试,以查看缓存的整体效果。
    【解决方案2】:

    Daniel 对count 的解释是正确的。但是,如果您愿意接受一个近似值,您可以尝试countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] RDD 方法。 (但请注意,这被标记为“实验性”)。

    【讨论】:

      猜你喜欢
      • 2016-07-15
      • 1970-01-01
      • 2017-11-06
      • 1970-01-01
      • 1970-01-01
      • 2020-09-03
      • 2017-10-15
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多