【问题标题】:How to store the data in the dataframes in Pyspark如何将数据存储在 Pyspark 的数据框中
【发布时间】:2018-10-31 08:35:53
【问题描述】:

我是 Pyspark 的新手,并试图弄清楚如何将数据存储在数据框中。我有大小为n x 8 的表,其中n 非常大。 假设dfx1x2x3x4x5x6x7x8 列。

8 列中的 4 列在循环中保持不变(x1x2x3x4)和 4 列变化(x5x6x7x8) .

for i in range(1, iterations):
   df = df.alias("L").join(df_second.alias("R"), (df.x1 == df_second.x1), how="left")\
        .select("L.*", sum(col("w")*col("x7")).over(Window.partitionBy("x1")).alias("x8"))\
        .distinct()\
        .sort('x1')
   df = df.withColumn("x5", col('x6'))
   df = df.withColumn("x6", col('x5') - col("x1")*(col("x3") - col("x4")))
   df = df.withColumn("x6", when(df.sampled > 0, df.x2).otherwise(df.x6))
   df = df.withColumn("x7", 2*col('x6') - col("x5")*col("x8"))

每次迭代都会变慢,并且 n 大于 50000 我遇到了内存问题。我研究并了解了cachepersist 功能,但无法弄清楚如何以正确的方式使用它们。

我的问题如下:

  1. 我是否应该将常量列和更改列存储在不同的数据框中(分别为df1df2)?如果是这样我应该使用d1.persist吗?
  2. 如果我执行df = df.cache() 并使用df = df.withColumn("x5", col('x6')) 之类的操作覆盖缓存的df,会发生什么? df 是否仍会被缓存,还是我应该先从内存中清除 df,然后再执行 df = df.withColumn("x5", col('x6')).cache()
  3. 如何为片段制作火花拆分数据并使用这些数据,这样我就不会耗尽内存?
  4. 我试图通过调用
    df = sc.parallelize(df.collect()).toDF().cache() 来强制计算惰性转换,这大大减慢了程序的速度,但每次迭代都需要几乎相同的时间。有什么正确的方法吗?检查点让它变得更慢。

【问题讨论】:

  • 这可以帮助to save tables as temporary in memory 和这个to unpersist 数据帧。当您在 for 块内进行更改时,会创建一个新的数据框。所以,也许,保存它,取消持久化并阅读帮助
  • @Gocht 非常感谢!它看起来像我正在寻找的东西!

标签: python dataframe pyspark


【解决方案1】:

@Gocht 的解决方案可能更好。但这是另一种强制计算和持久性的方法。

def cache_on_parquet(df):
    TEMP = 'temp.parquet'
    df.write.parquet(TEMP, mode='overwrite')
    return sqlc.read.parquet(TEMP)

经过一次或几次迭代

df = cache_on_parquet(df)

【讨论】:

    猜你喜欢
    • 2020-12-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-22
    • 1970-01-01
    • 2020-06-15
    • 1970-01-01
    • 2016-11-12
    相关资源
    最近更新 更多