【问题标题】:Spark code giving error火花代码给出错误
【发布时间】:2014-12-21 01:30:52
【问题描述】:

运行以下我编写的 SPARK 代码时出现错误。我正在尝试根据键找到所有向量的总和。每个输入行以 key(integer) 开头,然后是 127 个浮点数,这是一个具有 127 维的单个向量,即每行以一个键和一个向量开头。


from cStringIO import StringIO

class testing:
    def __str__(self):
        file_str = StringIO()
        for n in self.vector:
            file_str.write(str(n)) 
            file_str.write(" ")
        return file_str.getvalue()
    def __init__(self,txt="",initial=False):
        self.vector = [0.0]*128
        if len(txt)==0:
            return
        i=0
        for n in txt.split():
            if i<128:
                self.vector[i]=float(n)
                i = i+1
                continue
            self.filename=n
            break
def addVec(self,r):
    a = testing()
    for n in xrange(0,128):
        a.vector[n] = self.vector[n] + r.vector[n]
    return a

def InitializeAndReturnPair(string,first=False):
    vec = testing(string,first)
    return 1,vec


from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()

input.txt 中的示例行

6.0 156.0 26.0 3.0 1.0 0.0 2.0 1.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 9.0 1.0 0.0 0.0 9.0 83.0 13.0 1.0 0.0 9.0 42.0 7.0 41.0 71.0 74.0 123.0 35.0 17.0 7.0 2.0 156.0 27.0 6.0 33.0 11.0 2.0 0.0 0.0 0.0 4.0 4.0 1.0 3.0 2.0 4.0 4.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 47.0 2.0 2.0 7.0 59.0 90.0 15.0 11.0 156.0 14.0 1.0 4.0 9.0 11.0 2.0 29.0 35.0 6.0 5.0 9.0 35.0 6.0 5.0 9.0 2.0 1.0 3.0 1.0 0.0 0.0 0.0 1.0 5.0 25.0 14.0 27.0 2.0 0.0 2.0 86.0 48.0 10.0 6.0 156.0 23.0 1.0 2.0 21.0 6.0 0.0 3.0 31.0 10.0 4.0 3.0 0.02

以下是我遇到的错误。此错误来自代码的最后一行,即output.reduceByKey

错误消息 - http://pastebin.com/0tqiiJQm

不太确定如何解决这个问题。我尝试使用MarshalSerializer,但它给出了同样的问题。

------------------答案------------- -----------------------

我从apache user list 得到了同样问题的答案。基本上,在集群中运行的映射器/归约器没有类定义,我们必须通过在不同的模块中编写类并在使用配置 SparkContext 时附加来传递类

sc.addPyFile(os.path( HOMEDirectory + "module.py"))

感谢大家帮助我。

【问题讨论】:

    标签: bigdata apache-spark apache-spark-sql


    【解决方案1】:

    您可以使用与 spark 配合得很好的 numpy 数组。

    import numpy as np
    
    def row_map(text):
        split_text = text.split()
        # create numpy array from elements besides the first element 
        # which is the key
        return split_text(0), np.array([float(v) for v in split_text[1:]])
    
    from pyspark import SparkConf, SparkContext
    conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
    sc = SparkContext(conf = conf)
    
    inp = sc.textFile("input.txt")
    output = inp.map(row_map).cache()
    #Below line is throwing error
    print output.reduceByKey(lambda a,b : np.add(a,b)).collect()
    

    更简洁和pythonic。

    【讨论】:

    • 这个解决方案很酷而且效果很好。如果我想返回自定义类对象而不是数组,你能建议我可能出错的地方吗?我打算连同这个数组一起返回其他对象。
    • 您遇到问题的原因是测试类不可序列化。查看 python 文档以获取有关如何修复 docs.python.org/2/library/… 的更多信息,您的另一个选择是在收集输出后创建类,例如: data = output.reduceByKey(lambda a,b : np.add(a,b )).collect() final_output = [testing(value) for value in data] 只有当数据的大小足够小,可以以非并行方式处理时,这才有效。
    • 我已经在 Apache spark 用户列表中发布了相同的问题,并且我得到了关于如何使用自定义类的答案,我将在问题本身中添加以进行知识共享。我会接受你的回答,因为它在一定程度上解决了我所面临的问题
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-08
    • 2016-03-30
    • 2020-07-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多