【问题标题】:Pyspark - TypeError: 'float' object is not subscriptable when calculating mean using reduceByKeyPyspark - TypeError:'float'对象在使用reduceByKey计算平均值时不可下标
【发布时间】:2018-03-08 18:59:52
【问题描述】:

我的“asdasd.csv”文件具有以下结构。

 Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

好的,我得到下面的 {key,value} 元组来操作它。

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
#           part A (key)               part B (value) 

我计算平均值的代码如下,我必须从每列 X、Y Z 中计算每个键的平均值。

rdd_ori = sc.textFile("asdasd.csv") \
        .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
            .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
            .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

我的问题是我尝试了该代码,但它在其他 PC 上运行良好,使用相同的 MV 进行开发 (PySpark Py3)

这是一个例子,这个代码是正确的:

但我不知道为什么会出现这个错误,重要的部分是 Strong

----------------------------------- ---------------------------- Py4JJavaError Traceback(最近调用 最后)在() 9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[ 0][2])) 10 ---> 11 print(meanRDD.take(1))

/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
第1342章 numPartsToTry,totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 个项目 += res

/opt/spark/current/python/pyspark/context.py in runJob(self, rdd, partitionFunc,分区,allowLocal) 第990章 第991章 --> 992 端口 = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 993 返回列表(_load_from_socket(端口,mappedRDD._jrdd_deserializer)) 第994章

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py 在 调用(self, *args) 1131 回答 = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_args in temp_args:

/opt/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 尝试: ---> 63 返回 f(*a, **kw) 64 除了 py4j.protocol.Py4JJavaError 作为 e: 65 秒 = e.java_exception.toString()

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py 在 get_return_value(answer, gateway_client, target_id, name) 第317章 318 “调用 {0}{1}{2} 时出错。\n”。 --> 319 格式(target_id,“.”,名称),值) 320 其他: 第321章

Py4JJavaError: 调用时出错 z:org.apache.spark.api.python.PythonRDD.runJob。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 127.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0 在阶段 127.0 (TID 102, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (最近 最后调用):文件 “/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”,行 177,主要 进程()文件“/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”,行 172,处理中 serializer.dump_stream(func(split_index, iterator), outfile) 文件“/opt/spark/current/python/pyspark/rdd.py”,第 2423 行,在 管道函数 返回 func(split, prev_func(split, iterator)) 文件“/opt/spark/current/python/pyspark/rdd.py”,第 2423 行,在 管道函数 return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 346, in func 返回 f(迭代器)文件“/opt/spark/current/python/pyspark/rdd.py”,第 1842 行,在 本地结合 merge.mergeValues(iterator) 文件“/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py”,行 238,在合并值中 d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError: 'float' 对象不可下标

【问题讨论】:

标签: python apache-spark pyspark


【解决方案1】:

reduceByKey 的工作原理如下。我以您的示例为例,即使用您传递给reduceByKey

的以下数据
#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
#           part A (key)               part B (value)       counter

让我一步一步来

执行以下mapValues函数后

rdd_ori.mapValues(lambda x: (x,1))

rdd 数据 看起来像

((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))

所以当reduceByKey被调用为

.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))

并且将所有具有相同键的行分组,并将值传递给reducyByKeylambda函数

由于在您的情况下,所有键都是相同的,因此值将在以下迭代中传递给 ab 变量。

在第一次迭代中,a((-5.9427185, 0.6761626999999999, 8.128204), 1)b((-5.958191, 0.6880646, 8.135345), 1) 所以计算部分 (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) 是正确的并通过。

在第二次迭代中,a(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) 的输出,即(-11.910430999999999, 1.3582764, 16.271881, 2)

因此,如果您查看数据的格式,a 中没有这样的a[0][0]。你可以得到a[0]a[1] ..等等。所以这就是问题所在。这也是错误消息所暗示的内容

TypeError: 'float' 对象不可下标

解决方案是格式化数据,以便您可以将a 访问为a[0][0],如果您将reduceByKey 格式化为以下格式,则可以做到这一点。

.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))

但这会给你最后一个 mapValues 函数带来麻烦

.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

作为您的价值观,a 在 lambda 函数中,属于((-23.848236199999995, 2.6879882999999998, 32.604461), 4) 所以a[0] 表示(-23.848236199999995, 2.6879882999999998, 32.604461)a[1] 表示4 并且没有更多所以你会遇到

IndexError: 元组索引超出范围

所以你的最后一个mapValues 应该是

.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

总的来说,以下代码应该适合你

rdd_ori = sc.textFile("asdasd.csv") \
    .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))\
    .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

我希望我已经解释得足够清楚了。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-31
    • 2020-02-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多