【问题标题】:pyspark map each row in dataframe and apply UDF which return dataframe [duplicate]pyspark 映射数据帧中的每一行并应用返回数据帧的 UDF [重复]
【发布时间】:2019-12-29 22:53:53
【问题描述】:

我有一个dataframe 有几行。我可以使用此代码循环通过此dataframe

for row in df.rdd.collect():

但这不会并行工作吧?所以我想要的是映射每一行并将其传递给 UDF 并根据行中的值返回另一个新数据帧(来自数据库)。

我试过了 df.rdd.map(lambda row:read_from_mongo(row,spark)).toDF()

但是我收到了这个错误:

_pickle.PicklingError:无法序列化对象:异常:您似乎正在尝试从 广播变量、动作或转换。 SparkContext 只能 用于驱动程序,而不是在工作人员上运行的代码中。更多 信息,请参阅 SPARK-5063。

如何并行循环 dataframe 并保持 dataframe 为每一行返回?

【问题讨论】:

标签: pyspark pyspark-sql pyspark-dataframes apache-spark-2.3


【解决方案1】:

创建的每个 Spark RDD 或 DataFrame 都与应用程序的 SparkContext 相关联,并且 SparkContext 只能在驱动程序代码中引用。您返回 DataFrame 的 UDF 尝试从工作人员而不是驱动程序引用 SparkContext。那么,为什么需要为每一行创建一个单独的 DataFrame?如果 - 您希望稍后将生成的 DataFrame 合并为一个。 - 第一个 DataFrame 足够小。 然后,您可以简单地收集 DataFrame 的内容并将其用作过滤器以从 Mongodb 返回行。在这里,为了并行性,您需要依赖您使用的连接器来连接到 Mongodb。

【讨论】:

  • 我正在使用 mongo spark 连接器读取数据。我可以通过先收集来循环该行。我想知道的是是否有机会并行化该循环,因为每一行都有独立的值。
  • 嗯,特别是关于那个循环,由于全局解释器锁,不可能在 Python 中对集合进行纯粹的并行计算。但是,您可以使用 concurrent.futures 并发收集 DataFrame,其性能几乎与并行执行循环一样好。
  • 不是在 python 中并行化,而是在 spark 中。我可以使用 sc.parallelize() 但不确定之后如何进行,我仍在试图弄清楚我们使用 sc.parallelize() 时发生了什么
猜你喜欢
  • 2021-06-20
  • 2021-04-25
  • 2017-12-05
  • 2018-09-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-12
相关资源
最近更新 更多