【问题标题】:If I cache a Spark Dataframe and then overwrite the reference, will the original data frame still be cached?如果我缓存一个 Spark Dataframe 然后覆盖引用,原始数据帧还会被缓存吗?
【发布时间】:2020-05-31 23:32:22
【问题描述】:

假设我有一个生成 (py)spark 数据帧的函数,将数据帧缓存到内存中作为最后一个操作。

def gen_func(inputs):
   df = ... do stuff...
   df.cache()
   df.count()
   return df

据我了解,Spark 的缓存工作如下:

  1. cache/persist 加上一个动作(count())对数据调用时 帧,它从它的 DAG 计算并缓存到内存中,附加 指向引用它的对象。
  2. 只要存在对该对象的引用,可能在其他函数/其他范围内,df 将继续被缓存,并且所有依赖于 df 的 DAG 都将使用内存中缓存的数据作为起点。
  3. 如果对 df 的所有引用都被删除,Spark 会将缓存设置为内存以进行垃圾回收。它可能不会立即被垃圾回收,从而导致一些短期内存块(尤其是如果您生成缓存数据并过快丢弃它们会导致内存泄漏),但最终会被清除。

我的问题是,假设我使用gen_func 生成数据框,但随后覆盖原始数据框引用(可能使用filterwithColumn)。

df=gen_func(inputs)
df=df.filter("some_col = some_val")

在 Spark 中,RDD/DF 是不可变的,因此过滤器后重新分配的 df 和过滤器前的 df 指的是两个完全不同的对象。在这种情况下,对原始 df 的引用 cache/counted 已被覆盖。这是否意味着缓存的数据帧不再可用并且将被垃圾收集?这是否意味着新的后置过滤器df 将从头开始计算所有内容,尽管它是从先前缓存的数据帧生成的?

我问这个问题是因为我最近正在用我的代码修复一些内存不足的问题,在我看来,缓存可能是问题所在。但是,我还没有真正了解使用缓存的安全方法的全部细节,以及如何意外地使缓存内存无效。我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?

【问题讨论】:

  • 您正在解决内存不足的问题,因为您在最后一次操作后没有取消您的 df。您需要在动作的沿袭图结束后进行清理,以便执行程序内存可以有效地处理下一个 df。
  • 在 spark 中做 df=df 是不好的做法,因为数据框和列是不可变的。
  • 我想知道您是否能够弄清楚这一点。我有类似的东西:一个函数do_stuff(df)=> df,我将此函数应用于变量链。 Df 一直被覆盖。很高兴了解窗帘后面发生了什么。干杯。

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

我做了几个实验,如下所示。显然,数据帧一旦被缓存,保持缓存状态(如 getPersistentRDDs 和查询计划 - InMemory 等所示),即使所有 Python 引用都被使用 del 完全覆盖或删除,并显式调用垃圾回收。

实验 1:

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

df2.select('*').explain()

del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

结果:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}

>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}

>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
         +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#172L AS col1#174L]
                  +- *(1) Scan ExistingRDD[_1#172L]

>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}

实验 2:

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

df.select('*').explain()

del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

结果:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}

>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}

>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
         +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#218L AS col1#220L]
                  +- *(1) Scan ExistingRDD[_1#218L]

>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}

实验 3(对照实验,证明unpersist 有效)

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df2 = df.filter('col1 != 2')
df2.select('*').explain()

df.unpersist()
df2.select('*').explain()

结果:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}

>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
         +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#310L AS col1#312L]
                  +- *(1) Scan ExistingRDD[_1#310L]

>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}

>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
   +- *(1) Scan ExistingRDD[_1#310L]

回答OP的问题:

这是否意味着缓存的数据帧不再可用并且将被垃圾回收?这是否意味着新的后过滤器 df 将从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?

实验表明两者都。数据帧保持缓存状态,不会被垃圾回收,新的数据帧是根据查询计划使用缓存的(不可引用的)数据帧计算的。

一些与缓存使用相关的有用功能(如果您不想通过 Spark UI 执行此操作)是:

sc._jsc.getPersistentRDDs(),显示缓存的 RDD/数据帧列表,以及

spark.catalog.clearCache(),清除所有缓存的 RDD/数据帧。

我在执行上述操作时是否偏离了最佳实践?

我无权就此评判您,但正如其中一位 cmets 所建议的,请避免重新分配给 df,因为数据帧是不可变的。试着想象你正在使用 scala 进行编码,并且你将 df 定义为 val。做df = df.filter(...) 是不可能的。 Python 本身无法强制执行此操作,但我认为最佳做法是避免覆盖任何数据帧变量,这样如果您不再需要缓存的结果,您可以随时调用 df.unpersist()

【讨论】:

    【解决方案2】:

    想提出几点希望澄清 Spark 在缓存方面的行为。

    1. 当你有一个

      df = ... do stuff...
      df.cache()
      df.count()
      

    ...然后在您的应用程序中的其他地方

       another_df = ... do *same* stuff...
       another_df.*some_action()*
    

    ...,您会期望 another_df 重用缓存的 df 数据帧。毕竟,重用先前计算的结果是缓存的目标。意识到这一点,Spark 开发人员决定使用分析的逻辑计划作为识别缓存数据帧的“关键”,而不是仅仅依赖应用程序端的引用。 在 Spark 中,CacheManager 是跟踪缓存计算的组件,在索引序列cachedData 中:

      /**
       * Maintains the list of cached plans as an immutable sequence.  Any updates to the list
       * should be protected in a "this.synchronized" block which includes the reading of the
       * existing value and the update of the cachedData var.
       */
      @transient @volatile
      private var cachedData = IndexedSeq[CachedData]()
    

    在查询计划期间(在缓存管理器阶段),会扫描此结构以查找正在分析的计划的所有子树,以查看是否已经计算了其中的任何一个。如果找到匹配项,Spark 会将此子树替换为来自 cachedData 的相应 InMemoryRelation

    1. cache()persist() 的简单同义词)函数通过在 CacheManager 中调用 cacheQuery(...) 来存储具有存储级别 MEMORY_AND_DISK 的数据帧
          /**
           * Caches the data produced by the logical representation of the given [[Dataset]].
           * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
           * recomputing the in-memory columnar representation of the underlying table is expensive.
           */
          def cacheQuery(...
    

    请注意,这与使用MEMORY_ONLY 级别的RDD 缓存不同。一旦缓存的数据帧保持缓存在内存中或本地执行器磁盘上,直到它们被明确地unpersist'ed,或者 CacheManager 的clearCache() 被调用。当执行程序存储内存完全填满时,缓存块开始使用 LRU(最近最少使用)推送到磁盘,但绝不会简单地“丢弃”。

    好问题,顺便说一句……

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-05-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-19
      相关资源
      最近更新 更多