【发布时间】:2019-03-28 18:09:06
【问题描述】:
我有一个要迭代的数据框,但我不想将数据框转换为数据集。 我们必须将 spark scala 代码转换为 pyspark,而 pyspark 不支持数据集。
我通过转换为数据集尝试了以下代码
data in file:
abc,a
mno,b
pqr,a
xyz,b
val a = sc.textFile("<path>")
//creating dataframe with column AA,BB
val b = a.map(x => x.split(",")).map(x =>(x(0).toString,x(1).toString)).toDF("AA","BB")
b.registerTempTable("test")
case class T(AA:String, BB: String)
//creating dataset from dataframe
val d = b.as[T].collect
d.foreach{ x=>
var m = spark.sql(s"select * from test where BB = '${x.BB}'")
m.show()
}
Without converting to dataset it gives error i.e. with
val d = b.collect
d.foreach{ x=>
var m = spark.sql(s"select * from test where BB = '${x.BB}'")
m.show()
}
它给出错误: 错误:值 BB 不是 org.apache.spark.sql.ROW 的成员
【问题讨论】:
-
以下是在 Scala 中,但尝试使用 filter 参数。您无需更改数据框。这个想法是您将 DF1 更改为 Array 并检查数组元素是否在数据框中。要逐个检查元素,您将使用循环机制逐个检查元素。
val bArray = b.selectExpr("BB").rdd.map(x=>x.mkString).collect var iterator = 1 var m = b while(iterator <= bArray.length) { m = b.filter($"BB".isin(bArray(iterator - 1)) m.collect iterator = iterator + 1}
标签: apache-spark pyspark apache-spark-sql apache-spark-dataset