【发布时间】:2018-10-31 08:35:53
【问题描述】:
我是 Pyspark 的新手,并试图弄清楚如何将数据存储在数据框中。我有大小为n x 8 的表,其中n 非常大。
假设df 有x1、x2、x3、x4、x5、x6、x7、x8 列。
8 列中的 4 列在循环中保持不变(x1、x2、x3、x4)和 4 列变化(x5、x6、x7、x8) .
for i in range(1, iterations):
df = df.alias("L").join(df_second.alias("R"), (df.x1 == df_second.x1), how="left")\
.select("L.*", sum(col("w")*col("x7")).over(Window.partitionBy("x1")).alias("x8"))\
.distinct()\
.sort('x1')
df = df.withColumn("x5", col('x6'))
df = df.withColumn("x6", col('x5') - col("x1")*(col("x3") - col("x4")))
df = df.withColumn("x6", when(df.sampled > 0, df.x2).otherwise(df.x6))
df = df.withColumn("x7", 2*col('x6') - col("x5")*col("x8"))
每次迭代都会变慢,并且 n 大于 50000 我遇到了内存问题。我研究并了解了cache 和persist 功能,但无法弄清楚如何以正确的方式使用它们。
我的问题如下:
- 我是否应该将常量列和更改列存储在不同的数据框中(分别为
df1和df2)?如果是这样我应该使用d1.persist吗? - 如果我执行
df = df.cache()并使用df = df.withColumn("x5", col('x6'))之类的操作覆盖缓存的df,会发生什么?df是否仍会被缓存,还是我应该先从内存中清除df,然后再执行df = df.withColumn("x5", col('x6')).cache()? - 如何为片段制作火花拆分数据并使用这些数据,这样我就不会耗尽内存?
- 我试图通过调用
df = sc.parallelize(df.collect()).toDF().cache()来强制计算惰性转换,这大大减慢了程序的速度,但每次迭代都需要几乎相同的时间。有什么正确的方法吗?检查点让它变得更慢。
【问题讨论】:
-
这可以帮助to save tables as temporary in memory 和这个to unpersist 数据帧。当您在
for块内进行更改时,会创建一个新的数据框。所以,也许,保存它,取消持久化并阅读帮助 -
@Gocht 非常感谢!它看起来像我正在寻找的东西!