【问题标题】:Sum the Distance in Apache-Spark dataframes对 Apache-Spark 数据帧中的距离求和
【发布时间】:2016-12-09 07:39:46
【问题描述】:

以下代码给出了一个数据框,每列中包含三个值,如下所示。

import org.graphframes._
    import org.apache.spark.sql.DataFrame
    val v = sqlContext.createDataFrame(List(
      ("1", "Al"),
      ("2", "B"),
      ("3", "C"),
      ("4", "D"),
      ("5", "E")
    )).toDF("id", "name")

    val e = sqlContext.createDataFrame(List(
      ("1", "3", 5),
      ("1", "2", 8),
      ("2", "3", 6),
      ("2", "4", 7),
      ("2", "1", 8),
      ("3", "1", 5),
      ("3", "2", 6),
      ("4", "2", 7),
      ("4", "5", 8),
      ("5", "4", 8)
    )).toDF("src", "dst", "property")
val g = GraphFrame(v, e)
val paths: DataFrame = g.bfs.fromExpr("id = '1'").toExpr("id = '5'").run()
paths.show()
val df=paths
df.select(df.columns.filter(_.startsWith("e")).map(df(_)) : _*).show

以上代码的输出如下::

    +-------+-------+-------+                                                       
    |     e0|     e1|     e2|
    +-------+-------+-------+
    |[1,2,8]|[2,4,7]|[4,5,8]|
    +-------+-------+-------+

在上面的输出中,我们可以看到每一列都有三个值,它们可以解释如下。

e0 : 
source 1, Destination 2 and distance 8  

e1:
source 2, Destination 4 and distance 7

e2:
source 4, Destination 5 and distance 8

基本上e0e1e3 是边缘。我想对每列的第三个元素求和,即添加每条边的距离以获得总距离。我怎样才能做到这一点?

【问题讨论】:

    标签: scala apache-spark spark-dataframe graphframes


    【解决方案1】:

    可以这样做:

    val total = df.columns.filter(_.startsWith("e"))
     .map(c => col(s"$c.property")) // or col(c).getItem("property")
     .reduce(_ + _)
    
    df.withColumn("total", total)
    

    【讨论】:

    • .property 是否意味着您要访问的列元素的通用占位符?
    • @evan058 OP 尝试访问的列是图框边缘。表示为具有三个字段的结构(srcdstproperty)。所以它是列的元素。
    【解决方案2】:

    我会收集要汇总的列,然后在 UDF 上使用 foldLeft

    scala> val df = Seq((Array(1,2,8),Array(2,4,7),Array(4,5,8))).toDF("e0", "e1", "e2")
    df: org.apache.spark.sql.DataFrame = [e0: array<int>, e1: array<int>, e2: array<int>]
    
    scala> df.show
    +---------+---------+---------+
    |       e0|       e1|       e2|
    +---------+---------+---------+
    |[1, 2, 8]|[2, 4, 7]|[4, 5, 8]|
    +---------+---------+---------+
    
    scala> val colsToSum = df.columns
    colsToSum: Array[String] = Array(e0, e1, e2) 
    
    scala> val accLastUDF = udf((acc: Int, col: Seq[Int]) => acc + col.last)
    accLastUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,List(IntegerType, ArrayType(IntegerType,false)))
    
    scala> df.withColumn("dist", colsToSum.foldLeft(lit(0))((acc, colName) => accLastUDF(acc, col(colName)))).show
    +---------+---------+---------+----+
    |       e0|       e1|       e2|dist|
    +---------+---------+---------+----+
    |[1, 2, 8]|[2, 4, 7]|[4, 5, 8]|  23|
    +---------+---------+---------+----+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-12-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-04-22
      • 2020-10-30
      相关资源
      最近更新 更多