【问题标题】:Iterating user-defined class objects inside a pyspark RDD在 pyspark RDD 中迭代用户定义的类对象
【发布时间】:2016-04-08 21:47:17
【问题描述】:

我正在从 csv 读取数据并将该数据转换为 python 类对象。但是当我尝试使用用户定义的类对象迭代该 rdd 时,我会收到类似的错误,

_pickle.PicklingError: Can't pickle <class '__main__.User'>: attribute lookup User on __main__ failed

我在这里添加了部分代码,

class User:
    def __init__(self, line):
        self.user_id = line[0]
        self.location = line[1]
        self.age = line[2]

def create_user(line):
    user = User(line)
    return user

def print_user(line):
    user = line
    print(user.user_id)

conf = (SparkConf().setMaster("local").setAppName("exercise_set_2").set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
users = sc.textFile("BX-Users.csv").map(lambda line: line.split(";"))  
users_objs = users.map(lambda entry: create_user(entry))
users_objs.map(lambda entry: print_user(entry))

对于上面的代码,我得到的结果是,

PythonRDD[93] at RDD at PythonRDD.scala:43

CSV 数据源 URL(需要 zip 提取):HERE

更新: 将代码更改为包含 collect 将再次导致错误,我仍然必须尝试使用​​ Pickle。我以前没试过,如果你有样品,我可以很容易地做到。

users_objs = users.map(lambda entry: create_user(entry)).collect()

【问题讨论】:

    标签: python apache-spark ipython pyspark


    【解决方案1】:

    当你使用时

    def create_user(line):
        user = User(line)
        return user
    

    直接在 map 调用中,这意味着您的节点必须可以访问 User 类。通常这意味着它需要可序列化/可提取。节点将如何使用该类,或者知道它是什么(除非你有一个通用的 NFS 挂载或其他东西)?这就是为什么你得到那个泡菜错误。要使您的 User 类可挑选,请阅读以下内容:https://docs.python.org/2/library/pickle.html

    此外,您没有在 RDD 上执行 collect(),这就是您看到 PythonRDD[93] at RDD at PythonRDD.scala:43 的原因。它仍然只是一个 RDD,你的数据在节点上。

    【讨论】:

    • 更新了问题。我会尝试使用 Picklable 并回复您。
    【解决方案2】:

    好的,找到解释了。 将类存储在单独的文件中将使类自动可挑选。所以我将 User 类存储在 user.py 中 并将以下导入添加到我的代码中。

    from user import User
    

    User.py 的内容

    class User:
        def __init__(self, line):
            self.user_id = line[0]
            self.location = line[1]
            self.age = line[2]
    

    正如前面的回答中提到的,我可以对创建的用户对象进行用户收集(一种 RDD 方法)。所以下面的代码会打印出我想要的所有用户ID。

    for user_obj in users.map(lambda entry: create_user(entry)).collect():
        print_user(user_obj)
    

    【讨论】:

    • 这可能适用于本地配置,但它不适用于集群/客户端,除非您的节点共享 NFS 挂载。
    • 这是一个快速修复,但是当我更改 Picklable 类时我会更新我的答案。
    猜你喜欢
    • 2016-09-26
    • 2017-07-28
    • 2015-12-22
    • 1970-01-01
    • 1970-01-01
    • 2016-08-11
    • 2017-01-09
    • 2011-09-18
    • 1970-01-01
    相关资源
    最近更新 更多