【问题标题】:How to pass a array column and convert it to a numpy array in pyspark如何传递数组列并将其转换为pyspark中的numpy数组
【发布时间】:2019-10-02 21:56:49
【问题描述】:

我有一个如下的数据框:

from pyspark import SparkContext, SparkConf,SQLContext
import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.functions import lit,countDistinct,udf,array,struct
import pyspark.sql.functions as F
config = SparkConf("local")
sc = SparkContext(conf=config)
sqlContext=SQLContext(sc)

@udf("float")
def myfunction(x):
    y=np.array([1,3,9])
    x=np.array(x)
    return cosine(x,y)


df = sqlContext.createDataFrame([("doc_3",1,3,9), ("doc_1",9,6,0), ("doc_2",9,9,3) ]).withColumnRenamed("_1","doc").withColumnRenamed("_2","word1").withColumnRenamed("_3","word2").withColumnRenamed("_4","word3")


df2=df.select("doc", array([c for c in df.columns if c not in {'doc'}]).alias("words"))

df2=df2.withColumn("cosine",myfunction("words"))

这会引发错误:

19/10/02 21:24:58 错误执行器:阶段 1.0 (TID 1) 中任务 0.0 中的异常

net.razorvine.pickle.PickleException:预期零参数 ClassDict (用于 numpy.dtype)的构造 net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) 在 net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) 在 net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) 在 net.razorvine.pickle.Unpickler.load(Unpickler.java:99) 在 net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)

我不确定为什么不能将列表类型转换为 numpy 数组?任何帮助表示赞赏

【问题讨论】:

  • 我假设余弦返回一个 numpy 数组?如果是这样,请尝试cosine(x,y).item()。还请在您的示例代码中包含相关的导入。
  • TypeError: 预期的字符串或 Unicode 对象,未找到类型
  • 将余弦的导入添加到您的问题中,我会看看它。
  • @cronoik x=np.array(x) 这种转换正确吗?我正在将 pyspark 列转换为 numpy 数组
  • 是的,这是正确的。在执行期间,x 是某个行和列的单个值。

标签: numpy pyspark pyspark-dataframes


【解决方案1】:

这与您的previous question 中的问题基本相同。你创建了一个 udf 并告诉 spark 这个函数将返回一个float,但你返回一个numpy.float64 类型的对象。

您可以通过调用item() 将numpy 类型转换为python 类型,如下所示:

import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.functions import lit,countDistinct,udf,array,struct
import pyspark.sql.functions as F


@udf("float")
def myfunction(x):
    y=np.array([1,3,9])
    x=np.array(x)
    return cosine(x,y).item()


df = spark.createDataFrame([("doc_3",1,3,9), ("doc_1",9,6,0), ("doc_2",9,9,3) ]).withColumnRenamed("_1","doc").withColumnRenamed("_2","word1").withColumnRenamed("_3","word2").withColumnRenamed("_4","word3")


df2=df.select("doc", array([c for c in df.columns if c not in {'doc'}]).alias("words"))

df2=df2.withColumn("cosine",myfunction("words"))

df2.show(truncate=False)

输出:

+-----+---------+----------+ 
| doc |   words |   cosine | 
+-----+---------+----------+ 
|doc_3|[1, 3, 9]|      0.0 | 
|doc_1|[9, 6, 0]|0.7383323 | 
|doc_2|[9, 9, 3]|0.49496463| 
+-----+---------+----------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-02-10
    • 1970-01-01
    • 1970-01-01
    • 2021-06-09
    • 2017-03-08
    • 1970-01-01
    • 2021-12-18
    相关资源
    最近更新 更多