需要这样操作的上下文在 Spark 中是比较少见的。除了一两个例外,Spark API 期望通用 Vector 类而不是特定实现(SparseVector、DenseVector)。对于来自o.a.s.mllib.linalg.distributed的分布式结构也是如此
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val df = Seq[(Long, Vector)](
(1L, Vectors.dense(1, 2, 3)), (2L, Vectors.sparse(3, Array(1), Array(3)))
).toDF("id", "v")
new RowMatrix(df.select("v")
.map(_.getAs[Vector]("v")))
.columnSimilarities(0.9)
.entries
.first
// apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,2,1.0)
不过你可以像这样使用 UDF:
val asDense = udf((v: Vector) => v.toDense)
df.withColumn("vd", asDense($"v")).show
// +---+-------------+-------------+
// | id| v| vd|
// +---+-------------+-------------+
// | 1|[1.0,2.0,3.0]|[1.0,2.0,3.0]|
// | 2|(3,[1],[3.0])|[0.0,3.0,0.0]|
// +---+-------------+-------------+
请记住,自 2.0 版以来,Spark 提供了两种不同且兼容的 Vector 类型:
o.a.s.ml.linalg.Vector
o.a.s.mllib.linalg.Vector
每个都有对应的 SQL UDT。见MatchError while accessing vector column in Spark 2.0