【发布时间】:2018-03-06 02:56:57
【问题描述】:
我正在尝试为 GraphX 构建 Edge RDD。我正在读取 csv 文件并转换为 DataFrame 然后尝试转换为 Edge RDD:
val staticDataFrame = spark.
read.
option("header", true).
option("inferSchema", true).
csv("/projects/pdw/aiw_test/aiw/haris/Customers_DDSW-withDN$.csv")
val edgeRDD: RDD[Edge[(VertexId, VertexId, String)]] =
staticDataFrame.select(
"dealer_customer_number",
"parent_dealer_cust_number",
"dealer_code"
).map{ (row: Array) =>
Edge((
row.getAs[Long]("dealer_customer_number"),
row.getAs[Long]("parent_dealer_cust_number"),
row("dealer_code")
))
}
但我收到此错误:
<console>:81: error: class Array takes type parameters
val edgeRDD: RDD[Edge[(VertexId, VertexId, String)]] = staticDataFrame.select("dealer_customer_number", "parent_dealer_cust_number", "dealer_code").map((row: Array) => Edge((row.getAs[Long]("dealer_customer_number"), row.getAs[Long]("parent_dealer_cust_number"), row("dealer_code"))))
^
结果
staticDataFrame.select("dealer_customer_number", "parent_dealer_cust_number", "dealer_code").take(1)
是
res3: Array[org.apache.spark.sql.Row] = Array([0000101,null,B110])
【问题讨论】:
-
能否请您对缩进的代码重新运行编译?错误消息中的可怕行也完全不可读,但我不知道如何解决这个问题...使用灰色的
edit按钮。 -
好的。我将重新运行代码。
-
运行缩进代码后出现同样的错误:(
-
当然是同样的错误。但是现在行号其实是有意义的,因为它并不指向庞大的单行查询。
标签: scala apache-spark spark-graphx