【问题标题】:apply custom function to a column of array type ofdataframe将自定义函数应用于数据框数组类型的列
【发布时间】:2018-03-03 20:10:39
【问题描述】:

我有一个包含名为“counts”的列的数据框,我想将自定义函数“do_something”应用于列的每个元素,即每个数组。我不想修改数据框,我只想对列数进行单独的操作。该列的所有数组具有相同的大小。

+----------------------+---------------------------------------+
|id|              counts|
+----------------------+---------------------------------------+
|1|          [8.0, 2.0, 3.0|
|2|          [1.0, 6.0, 3.0|                
+----------------------+---------------------------------------+

当我尝试这个时:

df.select('counts').rdd.foreach(lambda x: do_something(x))

即使我尝试不使用 lambda,它也会给出相同的错误。

它在上面的行上失败了

Py4JJavaError Traceback(最近调用 最后)在() ----> 1 df.select('counts').rdd.foreach(lambda x: do_something(x))

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in foreach(self, f) 745 f(x) 第746章 --> 747 self.mapPartitions(processPartition).count() # 强制评估 748 第749章

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in count(self) 1002 3 1003 """ -> 1004 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1005 1006 def stats(self):

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in sum(self) 993 6.0 第994章 --> 995 返回 self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 996 997 def count(self):

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in fold(self, 零值,操作) 867 # 提供给每个分区的 zeroValue 与提供的分区不同 868#到最后的reduce调用 --> 869 vals = self.mapPartitions(func).collect() 第870章 第871章

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in collect(self) 第769章 770 使用 SCCallSiteSync(self.context) 作为 css: --> 771 端口 = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 772 返回列表(_load_from_socket(端口,self._jrdd_deserializer)) 第773章

/usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py 在 调用(self, *args) 第811章 第812章 --> 813 答案,self.gateway_client,self.target_id,self.name) 814 815 用于 temp_args 中的 temp_arg:

/usr/hdp/2.5.3.0-37/spark/python/pyspark/sql/utils.py in deco(*a, **千瓦) 43 def deco(*a, **kw): 44 尝试: ---> 45 返回 f(*a, **kw) 46 除了 py4j.protocol.Py4JJavaError 作为 e: 47 秒 = e.java_exception.toString()

/usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py 在 get_return_value(answer, gateway_client, target_id, name) 306 引发 Py4JJavaError( 307 "调用 {0}{1}{2} 时出错。\n"。 --> 308 格式(target_id, ".", name), value) 309 其他: 310 引发 Py4JError(

尽管所有输入数组的大小相同。

big_list=[]
def do_something(i_array):
    outputs = custom_library(i_array) # takes as input an array and returns 3 new lists
    big_list.extend(outputs)

【问题讨论】:

  • 每个数组需要做什么操作?
  • 让我们从错误信息开始:​​这个错误是从哪里出现的?附:可能您不需要 lambda,只需折腾 do_something 作为参数即可。 :)
  • @Uvar 即使没有 lambda 我也会得到同样的错误
  • @Suresh 我需要获取列的每个数组并将其与函数 do_something 一起使用以创建新列表 big_list
  • @Vas 当然可以,功能没有任何改变

标签: python pyspark spark-dataframe rdd


【解决方案1】:

你的UDF修改了一个python对象,即:

  • 在数据框外部,即使函数有效,您也无法访问该值,因为您没有将其返回到数据框的行中
  • 巨大,它的元素数量至少是数据框中行数的三倍

您可以尝试这样做:

def do_something(i_array):
    outputs = custom_library(i_array)
    return outputs

import pyspark.sql.functions as psf
do_something_udf = psf.udf(do_something, ArrayType(ArrayType(DoubleType()))

DoubleType() 或您返回的任何类型

df.withColumn("outputs", psf.explode(do_something_udf("count")))

您的行数将是 df 的三倍

【讨论】:

  • 我们可以索引从do_something_udf返回的wrappedArray来形成列,而不是explode。所以,我认为,每行都会有内部列表作为单独的列,并且行数没有变化。跨度>
猜你喜欢
  • 2017-01-28
  • 2020-10-28
  • 2023-03-06
  • 2021-09-20
  • 1970-01-01
  • 1970-01-01
  • 2018-11-10
  • 1970-01-01
  • 2021-08-03
相关资源
最近更新 更多