【问题标题】:Implementing logic in Scala for Spark在 Scala 中为 Spark 实现逻辑
【发布时间】:2015-08-21 01:42:06
【问题描述】:

所以我有一些表格数据:

node    parent   value
c1      p1       2
p1               3
c2      p1       1
c11     c1       1
c12     c1       1

其中有一棵树由节点和父列表示(父母可能有多少个孩子)并且每个孩子的值的总和 = 父母的值。 (我的意思是,这应该是正确的,但它可能不是,这就是我想要检查的)

我想做的事:

我想检查“每个孩子的价值之和 = 父母的价值”对于输入数据中给定的每个父母是否成立。

我在 Spark 中使用 Scala 来实现这个逻辑,所以我想在功能上做到这一点。

到目前为止我做了什么:

输入是一个 csv 文件,我从中创建了一个数据框,并执行以下操作给我,父 => 子列表,这是我知道我需要的信息。

tree = fileDataFrame.select(parent, node).map( x => (x(0), x(1)).groupByKey()

同样,我得到了节点 => 值,这又是有用的信息:

values = fileDataFrame.select(node, value).map( x => (x(0), x(1))

我不知道从这里去哪里。我想添加所有孩子的值(使用reduceByKey我猜)但我还没有设置,因为我有父=>孩子列表,我需要父母=>值列表孩子们。

我对函数式编程很陌生,所以我的大脑仍然在循环思考。

实施此检查的好方法是什么? Spark 允许以下转换 (http://spark.apache.org/docs/latest/programming-guide.html#transformations)

提前致谢,欢迎提出任何建议!

【问题讨论】:

  • 由于您的数据本质上是一个层次结构,您可以使用 GraphX spark 库将其作为图形问题来解决。

标签: scala functional-programming apache-spark


【解决方案1】:

我不明白你的问题,但我认为以下方法应该可行。

首先创建一个具有以下架构的数据框

root
 |-- node: string (nullable = true)
 |-- parent: string (nullable = true)
 |-- value: integer (nullable = true)

为孩子汇总数据:

val children = df.groupBy($"parent").agg(sum($"value").alias("csum"))

加入原始数据:

df
  .select($"node", $"value")
  .join(children, df("node") <=> children("parent"))
  .select($"node", ($"value" === $"csum").alias("holds"))

GraphX as suggested by @mattinbits 的类似解决方案:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val nodes: RDD[(VertexId, (String, Int))] = sc.parallelize(Array(
    (0L, ("p1", 3)),
    (1L, ("c1", 2)),
    (2L, ("c2", 1)),
    (11L, ("c11",  1)),
    (12L, ("c12", 1))
))

val relationships: RDD[Edge[String]] = sc.parallelize(Array(
    Edge(1L, 0L, "child"),
    Edge(2L, 0L, "child"),
    Edge(11L, 1L, "child"),
    Edge(12L, 1L, "child")
))

val graph = Graph(nodes, relationships)


graph.aggregateMessages[(Int, Int)](
    triplet => triplet.sendToDst(triplet.dstAttr._2, triplet.srcAttr._2),
    (a, b) => (a._1,  a._2 + b._2)
).map{case (id, (expected, actual)) => expected == actual}.reduce(_ & _)

【讨论】:

  • 执行 "val children = df.groupBy($"parent").agg(sum($"value"))" 结果 org.apache.spark.sql.DataFrame = [SUM(value ): double] 并且父信息丢失。做 children("parent") 会导致错误。如何保留家长信息?
  • 另外,感谢 GraphX 方法,我将比较两种方法的性能,看看哪一种更好。
  • On Spark >= 1.4.0 组变量被保留。如果有问题,您可以简单地在agg 子句中添加列:agg($"parent", sum($"value").alias("csum"))
  • 谢谢,这就是我要找的!现在看起来微不足道,但我对函数式编程很陌生
  • 说实话这里没有太多功能性的东西。除非你算上SQL as a functional language,但我很高兴能帮上忙。
【解决方案2】:

试试:

val nodeParents: RDD[(String, String, Long)] = // ...
val nodes: RDD[(String, Long)] = nodeParents.map { case(n, _, v) => (n, v) }
val parents: RDD[(String, Long)] = nodeParents.filter { case(_, p, _) => p != ""}
                                              .map { case(_, p, v) => (p, v) }
                                              .reduceByKey(_ + _)
val joined: RDD[(String, (Long, Long))] = parents.join(nodes)

你的例子:

> nodes: [(c1, 2), (p1, 3), (c2, 1), (c11, 1), (c12, 1)]
> parents: [(c1, 2), (p1, 3)]
> joined: [(c1, (2, 2)), (p1, (3, 3))]

【讨论】:

    猜你喜欢
    • 2017-11-22
    • 1970-01-01
    • 2018-02-22
    • 1970-01-01
    • 2023-04-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多