【问题标题】:Apache Spark: How to "cache" a dataset so it is not re-computed for next computationApache Spark:如何“缓存”数据集,以免为下一​​次计算重新计算
【发布时间】:2021-09-05 02:03:29
【问题描述】:

请提出关于 Apache Spark 的小问题。

我有一个非常简单的 Spark 工作: (这里是用Java写的,但适用于其他语言)

final SparkSession     sparkSession       = SparkSession.builder().getOrCreate();
final Dataset<Row>     someVeryBigDataSet = sparkSession.read().format("org.apache.spark.sql.cassandra").options(properties).load();
final Dataset<Integer> integerDataSet     = someVeryBigDataSet.map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT());
final Dataset<Integer> goodIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger == 0);
final Dataset<Integer> badIntegerDataSet  = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger != 0);
LOGGER.info("good integer dataset size and bad integer dataset size:\n" + goodIntegerDataSet.count() + " " + badIntegerDataSet.count());
sparkSession.stop();

工作很简单:

  1. 从一些大数据表中提取一个非常大的数据集
  2. 将每一行转换为一个整数。为此,我使用了一个非常重的 计算,并且这个操作应该只执行一次。
  3. 将第 2 步的好整数结果与坏整数分开, 显示计数

问题是,我在第 2 步中看到了 map 方法,对数据库的每一行都执行了多次

我的理论(如果我错了,请纠正我),它是在第 3 行的 map 函数中第一次计算出来的。

但是在第 4 行和第 5 行,在这两个过滤器函数中,当我们需要计数时,管道再次需要步骤 2 的结果

由于地图功能只能运行一次,请问如何避免这种情况

谢谢

【问题讨论】:

  • 实际上,由于 spark 惰性求值(参见spark.apache.org/docs/latest/…,它适用于 RDD,但适用于 Dataframe),第 3 行的数据帧仅计算两次,即您调用的两次.count()LOGGER.info()。您可以将Dataframe 视为有关如何转换数据的计划,只有调用操作才会执行此计划。

标签: apache-spark


【解决方案1】:

创建integerDataSet时可以使用.cache()方法:

final Dataset<Integer> integerDataSet = someVeryBigDataSet
  .map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT())
  .cache();

它将您的数据帧保存在内存中,或者如果空间不足,则保存在磁盘中,并且每次调用此数据帧时,都会加载保存的数据帧,而无需重新计算。

缓存策略的更多细节:https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/

【讨论】:

    【解决方案2】:

    persist() 和 cache() 都是 Spark 优化技术,用于存储数据,但唯一的区别是 cache() 方法默认将数据存储在内存中(MEMORY_ONLY),而在 persist() 方法中,开发人员可以将存储级别定义为内存或磁盘。

    #cache DF 将数据存储在 MEMORY_ONLY 中

    df.cache()
    

    要检查数据帧是否被缓存,我们可以使用 df.is_cached 或 df.storageLevel.useMemory。这两种方法都将返回一个布尔值,如 True 或 False。

    #persist 具有默认存储级别的数据帧

    df.persist()
    

    #persist 数据帧与 MEMORY_AND_DISK_2

    df.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
    

    【讨论】:

      猜你喜欢
      • 2019-07-08
      • 1970-01-01
      • 2015-10-16
      • 2020-10-13
      • 1970-01-01
      • 2018-03-13
      • 2018-09-26
      • 2020-01-02
      • 1970-01-01
      相关资源
      最近更新 更多