【发布时间】:2014-12-21 01:30:52
【问题描述】:
运行以下我编写的 SPARK 代码时出现错误。我正在尝试根据键找到所有向量的总和。每个输入行以 key(integer) 开头,然后是 127 个浮点数,这是一个具有 127 维的单个向量,即每行以一个键和一个向量开头。
from cStringIO import StringIO
class testing:
def __str__(self):
file_str = StringIO()
for n in self.vector:
file_str.write(str(n))
file_str.write(" ")
return file_str.getvalue()
def __init__(self,txt="",initial=False):
self.vector = [0.0]*128
if len(txt)==0:
return
i=0
for n in txt.split():
if i<128:
self.vector[i]=float(n)
i = i+1
continue
self.filename=n
break
def addVec(self,r):
a = testing()
for n in xrange(0,128):
a.vector[n] = self.vector[n] + r.vector[n]
return a
def InitializeAndReturnPair(string,first=False):
vec = testing(string,first)
return 1,vec
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()
input.txt 中的示例行
6.0 156.0 26.0 3.0 1.0 0.0 2.0 1.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 9.0 1.0 0.0 0.0 9.0 83.0 13.0 1.0 0.0 9.0 42.0 7.0 41.0 71.0 74.0 123.0 35.0 17.0 7.0 2.0 156.0 27.0 6.0 33.0 11.0 2.0 0.0 0.0 0.0 4.0 4.0 1.0 3.0 2.0 4.0 4.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 47.0 2.0 2.0 7.0 59.0 90.0 15.0 11.0 156.0 14.0 1.0 4.0 9.0 11.0 2.0 29.0 35.0 6.0 5.0 9.0 35.0 6.0 5.0 9.0 2.0 1.0 3.0 1.0 0.0 0.0 0.0 1.0 5.0 25.0 14.0 27.0 2.0 0.0 2.0 86.0 48.0 10.0 6.0 156.0 23.0 1.0 2.0 21.0 6.0 0.0 3.0 31.0 10.0 4.0 3.0 0.02
以下是我遇到的错误。此错误来自代码的最后一行,即output.reduceByKey
错误消息 - http://pastebin.com/0tqiiJQm
不太确定如何解决这个问题。我尝试使用MarshalSerializer,但它给出了同样的问题。
------------------答案------------- -----------------------
我从apache user list 得到了同样问题的答案。基本上,在集群中运行的映射器/归约器没有类定义,我们必须通过在不同的模块中编写类并在使用配置 SparkContext 时附加来传递类
sc.addPyFile(os.path( HOMEDirectory + "module.py"))
感谢大家帮助我。
【问题讨论】:
标签: bigdata apache-spark apache-spark-sql