【发布时间】:2021-09-05 02:03:29
【问题描述】:
请提出关于 Apache Spark 的小问题。
我有一个非常简单的 Spark 工作: (这里是用Java写的,但适用于其他语言)
final SparkSession sparkSession = SparkSession.builder().getOrCreate();
final Dataset<Row> someVeryBigDataSet = sparkSession.read().format("org.apache.spark.sql.cassandra").options(properties).load();
final Dataset<Integer> integerDataSet = someVeryBigDataSet.map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT());
final Dataset<Integer> goodIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger == 0);
final Dataset<Integer> badIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger != 0);
LOGGER.info("good integer dataset size and bad integer dataset size:\n" + goodIntegerDataSet.count() + " " + badIntegerDataSet.count());
sparkSession.stop();
工作很简单:
- 从一些大数据表中提取一个非常大的数据集
- 将每一行转换为一个整数。为此,我使用了一个非常重的 计算,并且这个操作应该只执行一次。
- 将第 2 步的好整数结果与坏整数分开, 显示计数
问题是,我在第 2 步中看到了 map 方法,对数据库的每一行都执行了多次。
我的理论(如果我错了,请纠正我),它是在第 3 行的 map 函数中第一次计算出来的。
但是在第 4 行和第 5 行,在这两个过滤器函数中,当我们需要计数时,管道再次需要步骤 2 的结果。
由于地图功能只能运行一次,请问如何避免这种情况?
谢谢
【问题讨论】:
-
实际上,由于 spark 惰性求值(参见spark.apache.org/docs/latest/…,它适用于 RDD,但适用于 Dataframe),第 3 行的数据帧仅计算两次,即您调用的两次
.count()在LOGGER.info()。您可以将Dataframe视为有关如何转换数据的计划,只有调用操作才会执行此计划。
标签: apache-spark