【发布时间】:2018-03-03 20:10:39
【问题描述】:
我有一个包含名为“counts”的列的数据框,我想将自定义函数“do_something”应用于列的每个元素,即每个数组。我不想修改数据框,我只想对列数进行单独的操作。该列的所有数组具有相同的大小。
+----------------------+---------------------------------------+
|id| counts|
+----------------------+---------------------------------------+
|1| [8.0, 2.0, 3.0|
|2| [1.0, 6.0, 3.0|
+----------------------+---------------------------------------+
当我尝试这个时:
df.select('counts').rdd.foreach(lambda x: do_something(x))
即使我尝试不使用 lambda,它也会给出相同的错误。
它在上面的行上失败了
Py4JJavaError Traceback(最近调用 最后)在() ----> 1 df.select('counts').rdd.foreach(lambda x: do_something(x))
/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in foreach(self, f) 745 f(x) 第746章 --> 747 self.mapPartitions(processPartition).count() # 强制评估 748 第749章
/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in count(self) 1002 3 1003 """ -> 1004 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1005 1006 def stats(self):
/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in sum(self) 993 6.0 第994章 --> 995 返回 self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 996 997 def count(self):
/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in fold(self, 零值,操作) 867 # 提供给每个分区的 zeroValue 与提供的分区不同 868#到最后的reduce调用 --> 869 vals = self.mapPartitions(func).collect() 第870章 第871章
/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in collect(self) 第769章 770 使用 SCCallSiteSync(self.context) 作为 css: --> 771 端口 = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 772 返回列表(_load_from_socket(端口,self._jrdd_deserializer)) 第773章
/usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py 在 调用(self, *args) 第811章 第812章 --> 813 答案,self.gateway_client,self.target_id,self.name) 814 815 用于 temp_args 中的 temp_arg:
/usr/hdp/2.5.3.0-37/spark/python/pyspark/sql/utils.py in deco(*a, **千瓦) 43 def deco(*a, **kw): 44 尝试: ---> 45 返回 f(*a, **kw) 46 除了 py4j.protocol.Py4JJavaError 作为 e: 47 秒 = e.java_exception.toString()
/usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py 在 get_return_value(answer, gateway_client, target_id, name) 306 引发 Py4JJavaError( 307 "调用 {0}{1}{2} 时出错。\n"。 --> 308 格式(target_id, ".", name), value) 309 其他: 310 引发 Py4JError(
尽管所有输入数组的大小相同。
big_list=[]
def do_something(i_array):
outputs = custom_library(i_array) # takes as input an array and returns 3 new lists
big_list.extend(outputs)
【问题讨论】:
-
每个数组需要做什么操作?
-
让我们从错误信息开始:这个错误是从哪里出现的?附:可能您不需要 lambda,只需折腾
do_something作为参数即可。 :) -
@Uvar 即使没有 lambda 我也会得到同样的错误
-
@Suresh 我需要获取列的每个数组并将其与函数 do_something 一起使用以创建新列表 big_list
-
@Vas 当然可以,功能没有任何改变
标签: python pyspark spark-dataframe rdd