【问题标题】:Multiprocessing a list of RDDs多处理 RDD 列表
【发布时间】:2016-07-14 09:36:24
【问题描述】:

我正在尝试多处理一个列表的RDD,如下所示

from pyspark.context import SparkContext
from multiprocessing import Pool



def square(rdd_list):
    def _square(i):
        return i*i
    return rdd_list.map(_square)

sc = SparkContext('local', 'Data_Split')
data = sc.parallelize([1,2,3,4,5,6])

dataCollection = [data, data, data]

p = Pool(processes=2)
result = p.map(square, dataCollection)
print result[0].collect()

我期望输出中的 RDD 列表,其中每个元素都包含来自 data 的平方元素。

但是运行代码会出现以下错误:

例外:您似乎正在尝试广播 RDD 或 从动作或转换中引用 RDD。 RDD 转换 并且动作只能由驱动程序调用,不能在其他内部调用 转变;例如,rdd1.map(lambda x: rdd2.values.coun\t() * x) 无效,因为无法在 rdd1.map 转换中执行值转换和计数操作。更多 信息,请参阅 SPARK-5063。

我的问题是:-

1) 为什么代码没有按预期工作?我该如何解决这个问题?

2) 如果我使用 p.map (池)而不是简单的 ma​​p 在我的 RDD 列表中。

【问题讨论】:

  • 您尝试过使用多处理库吗?
  • 1) 因为你不处理普通的 Python 状态。使用线程stackoverflow.com/q/38048068/1560062(不,这里 GIL 不是问题)2) 除非您对资源进行微观管理,否则可能不会但没有真实的上下文,这只是猜测。
  • 您能否详细说明/提供对“普通 Python 状态”的引用?当您说普通的python状态(特定于问题中的代码)时,您是在谈论“副作用”吗?
  • 我的意思是 PySpark“驱动程序”只是 Java 的一个小客户端,而不是一个独立的驱动程序。所以与其说是 Python 对象,不如说是 JVM 和 Py4J 网关。关于线程,只有实现非阻塞提交和并行化一些辅助完成任务。这些不会触及核心处理。

标签: python apache-spark pyspark list-comprehension


【解决方案1】:

这是因为当您使用多进程时,RDD 必须在发送到其他进程之前进行序列化/腌制。每当尝试序列化 RDD 时,Spark 都会执行检查,并抛出该错误。

【讨论】:

  • 那么,有没有办法以编程方式序列化 RDD 并提交到 Pool ?另外,在您看来,这是比使用线程更好的方法吗(就运行时而言)?
  • 我看不出在 Spark 驱动程序代码中使用多进程或多线程的意义。在大多数情况下,您只需执行 1 次 RDD 就可以对数据进行分区以使 CPU 资源饱和。这样的话,不管两个RDD是顺序执行还是并行执行,总的运行时间都是一样的。
  • 为了添加更多上下文,我试图以一种简单的方式陈述我要解决的实际问题。我正在使用带有 LBFGS 优化的 LR 为一个与所有分类构建 n 个分类器,每个分类器从 RDD 列表中获取 rdd。我想知道是否可以同时构建两个模型(如果为池提供 2 个核心),前提是 RDD 列表中的所有元素彼此独立。
  • 我想知道您是否真的从这样做中受益。如果您只是利用常规的并行化自动实现,spark 无论如何都会做的只是以一次一件事的格式编写您的代码,那么您将摆脱很多责任。跨度>
  • 这正是我的疑问。 :) 我的分类器是一个队列预测模型。我观察到,当目标中的类数量增加时,构建所有模型所需的时间呈指数增长。我一直在寻找减少所需时间的方法(可能是同时构建两个模型,如果这样的话)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-03-30
  • 2017-06-08
  • 1970-01-01
  • 1970-01-01
  • 2017-04-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多