【发布时间】:2018-07-31 13:23:49
【问题描述】:
我有以下代码:
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors, VectorUDT
import pyspark.sql.functions as F
dot_udf = F.udf(lambda x,y: float(x.dot(y)), DoubleType())
l = [(Vectors.dense([1, 2, 3, 4 ,5]), Vectors.dense([5, 4, 3, 2, 1]),),
(Vectors.dense([0, 4, 8, 2, 1]), None,),
(None, Vectors.dense([5, 0, 3, 9, 1]),),
]
def finish(row):
new_row = []
new_row.append(None if row['my_row_1'] == None else Vectors.dense(row['my_row_1']))
new_row.append(None if row['my_row_2'] == None else Vectors.dense(row['my_row_2']))
return new_row
with (SparkSession
.builder
.appName('test_mtassoni')
.getOrCreate()) as spark:
schema = StructType([StructField('my_row_1', VectorUDT(), True),
StructField('my_row_2', VectorUDT(), True)])
df = spark.createDataFrame(l, schema)
rdd = df.rdd
rdd = rdd.map(finish)
out_schema = StructType([StructField('my_row_1', VectorUDT(), True),
StructField('my_row_2', VectorUDT(), True)])
fdf = spark.createDataFrame(rdd, schema=out_schema)
fdf = fdf.withColumn('row_sim', F.when(((F.col('my_row_1').isNull()) |
(F.col('my_row_2').isNull())),
np.nan).otherwise(dot_udf(fdf.my_row_1, fdf.my_row_2))
)
fdf.show()
在最后一个命令上出现以下 TypeError 失败:
TypeError: Cannot treat type <type 'NoneType'> as a vector
有人知道如何解决吗?非常感谢您提前。
【问题讨论】:
-
您确定不想从
row['my_row_1']中过滤掉Nones 吗?finish函数正在返回一个列表,其中可能包含Nones。 -
我不想过滤掉
Nones。这只是一个示例数据集,但实际上我有更多列,我不想仅仅因为在那里找到None而从数据框中删除条目。
标签: python apache-spark pyspark