【发布时间】:2018-03-08 18:59:52
【问题描述】:
我的“asdasd.csv”文件具有以下结构。
Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
好的,我得到下面的 {key,value} 元组来操作它。
# x y z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
# part A (key) part B (value)
我计算平均值的代码如下,我必须从每列 X、Y Z 中计算每个键的平均值。
rdd_ori = sc.textFile("asdasd.csv") \
.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
我的问题是我尝试了该代码,但它在其他 PC 上运行良好,使用相同的 MV 进行开发 (PySpark Py3)
这是一个例子,这个代码是正确的:
但我不知道为什么会出现这个错误,重要的部分是 Strong。
----------------------------------- ---------------------------- Py4JJavaError Traceback(最近调用 最后)在() 9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[ 0][2])) 10 ---> 11 print(meanRDD.take(1))
/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
第1342章 numPartsToTry,totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 个项目 += res/opt/spark/current/python/pyspark/context.py in runJob(self, rdd, partitionFunc,分区,allowLocal) 第990章 第991章 --> 992 端口 = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 993 返回列表(_load_from_socket(端口,mappedRDD._jrdd_deserializer)) 第994章
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py 在 调用(self, *args) 1131 回答 = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_args in temp_args:
/opt/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 尝试: ---> 63 返回 f(*a, **kw) 64 除了 py4j.protocol.Py4JJavaError 作为 e: 65 秒 = e.java_exception.toString()
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py 在 get_return_value(answer, gateway_client, target_id, name) 第317章 318 “调用 {0}{1}{2} 时出错。\n”。 --> 319 格式(target_id,“.”,名称),值) 320 其他: 第321章
Py4JJavaError: 调用时出错 z:org.apache.spark.api.python.PythonRDD.runJob。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 127.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0 在阶段 127.0 (TID 102, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (最近 最后调用):文件 “/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”,行 177,主要 进程()文件“/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”,行 172,处理中 serializer.dump_stream(func(split_index, iterator), outfile) 文件“/opt/spark/current/python/pyspark/rdd.py”,第 2423 行,在 管道函数 返回 func(split, prev_func(split, iterator)) 文件“/opt/spark/current/python/pyspark/rdd.py”,第 2423 行,在 管道函数 return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 346, in func 返回 f(迭代器)文件“/opt/spark/current/python/pyspark/rdd.py”,第 1842 行,在 本地结合 merge.mergeValues(iterator) 文件“/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py”,行 238,在合并值中 d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError: 'float' 对象不可下标
【问题讨论】:
-
您是否有任何理由不为此使用数据帧?
-
Ramesh Maharjan - 你能帮我解决stackoverflow.com/questions/69831826/… 的问题吗?
标签: python apache-spark pyspark