【问题标题】:Pyspark - Update dataframe inside loopPyspark - 更新循环内的数据框
【发布时间】:2019-06-18 02:30:50
【问题描述】:

我在 for 循环中编写了一些更新数据帧的代码,但得到了奇怪的结果:

def _simulate_walks(self):

    # sample starting nodes
    aprox_sample_rate = 1.0 * self._num_of_walks / self._vertices.count()
    starting_nodes = self._vertices.sample(True, aprox_sample_rate)
    starting_nodes.show()

    # iterate over walks
    alias_draw_udf = F.udf(Node2Vec._alias_draw, T.StringType())
    single_list_udf = F.udf(lambda e: [e], T.ArrayType(T.StringType()))
    append_list_udf = F.udf(lambda l,e: l+[e], T.ArrayType(T.StringType()))
    for i in range(self._walk_length):
        if i == 0:
            chosen_path = starting_nodes.join(self._nodes_alias, F.col(self._src_col) == F.col('NODE'))\
                .withColumn('CHOSEN_NODE', alias_draw_udf('ALIAS'))\
                .withColumn('PATH', single_list_udf('NODE'))\
                .selectExpr('NODE as LAST_NODE', 'CHOSEN_NODE', 'PATH').persist()
            #chosen_path.show()
        else:
            chosen_path = chosen_path.join(self._edges_alias, (F.col('LAST_NODE') == F.col(self._src_col)) &
                                                (F.col('CHOSEN_NODE') == F.col(self._dst_col)))\
                .withColumn('NEW_CHOSEN_NODE', alias_draw_udf('ALIAS'))
            #chosen_path.show()
            chosen_path = chosen_path\
                .selectExpr('CHOSEN_NODE as LAST_NODE', 'NEW_CHOSEN_NODE as CHOSEN_NODE', 'PATH')
            #chosen_path.show()


        chosen_path = chosen_path.withColumn('NEW_PATH', append_list_udf('PATH', 'CHOSEN_NODE'))\
                .selectExpr('LAST_NODE', 'CHOSEN_NODE', 'NEW_PATH as PATH')
        chosen_path.show(5, False)

但是,当我在循环中添加持久命令时:

chosen_path = chosen_path.withColumn('NEW_PATH', append_list_udf('PATH', 'CHOSEN_NODE'))\
                .selectExpr('LAST_NODE', 'CHOSEN_NODE', 'NEW_PATH as PATH').persist()

代码完美运行。

我知道在 spark 中代码在执行操作之前是惰性求值的,但我不认为它会导致意外结果。

不用说,由于内存消耗高,每次迭代都坚持不是一个好的解决方案,我想知道解决这个问题的最佳做法是什么(也许取消所有内存并在坚持新数据帧之后立即?)。

谢谢。

【问题讨论】:

  • 您使用的是哪个版本的 spark ?从 2.4.0 开始提供数组操作函数,并且可以轻松替换您的 UDF。
  • @Steven 2.1 恐怕。

标签: python apache-spark dataframe pyspark


【解决方案1】:

为什么不使用 functools.reduce()? 示例:

from functools import reduce
def join_all_dataframes(dfs: List[DataFrame], on_columns: List[str]) -> DataFrame
   return reduce(lambda x, y: x.join(y, on_columns, 'outer'), 
                 dfs).dropDuplicates()

【讨论】:

  • @我的代码比几个连接复杂一点,所以对我来说不实用,而且保存所有生成的数据帧太贵了。
猜你喜欢
  • 2018-12-04
  • 1970-01-01
  • 2012-12-30
  • 1970-01-01
  • 2020-07-12
  • 1970-01-01
  • 2010-12-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多