【问题标题】:Apache Spark map and reduce with passing valuesApache Spark 使用传递值映射和减少
【发布时间】:2014-11-25 07:57:59
【问题描述】:

我在从 Cassandra 加载的 RDD 上有一个简单的 mapreduce 作业。 代码看起来像这样

sc.cassandraTable("app","channels").select("id").toArray.foreach((o) => {

  val orders = sc.cassandraTable("fam", "table")
    .select("date", "f2", "f3", "f4")
    .where("id = ?", o("id")) # This o("id") is the ID i want later append to the finished list

  val month = orders
    .map( oo => {
      var total_revenue = List(oo.getIntOption("f2"), oo.getIntOption("f3"), oo.getIntOption("f4")).flatten.reduce(_ + _)
      (getDateAs("hour", oo.getDate("date")), total_revenue)
    })
    .reduceByKey(_ + _)
})

所以这段代码总结了收入并返回类似的东西

(2014-11-23 18:00:00, 12412)
(2014-11-23 19:00:00, 12511)

现在我想将其保存回 Cassandra 表 revenue_hour,但我需要该列表中的 ID,类似这样。

(2014-11-23 18:00:00, 12412, "CH1")
(2014-11-23 19:00:00, 12511, "CH1")

我怎样才能使这项工作不仅仅是一个(键,值)列表?我怎样才能传递更多不应该转换的值,而只是传递到最后,以便我可以将其保存回 Cassandra?

【问题讨论】:

  • 我不知道使用此代码的上下文,但对我来说,按 ID 计算所有 ID 的订单看起来更自然。就目前而言,这不是使用 Spark 的好案例。可能通过“常规”编程方式进行查询并在结果上运行本地计算会快得多。也就是说,您只需将“id”添加到.select("id", ...) 中的字段
  • 感谢您的回复,maasg。我更新了问题并为代码提供了更多上下文。在channels 表中存储了所有可能的ID。我们遍历它们并捕获每个 id 的收入,将其聚合并将其写入 C* 表中。这是使用 Spark 的好案例吗?我不能完全听从你的建议,也许你可以改写一下?

标签: scala cassandra apache-spark


【解决方案1】:

也许您可以使用一个类并通过流程使用它。我的意思是,定义 RevenueHour 类

case class RevenueHour(date: java.util.Date,revenue: Long, id: String)

然后在 map 阶段构建一个中间 RevenueHour,然后在 reduce 阶段构建另一个。

val map: RDD[(Date, RevenueHour)] = orders.map(row => 
  (
    getDateAs("hour", oo.getDate("date")), 
    RevenueHour(
      row.getDate("date"),
      List(row.getIntOption("f2"),row.getIntOption("f3"),row.getIntOption("f4")).flatten.reduce(_ + _),
      row.getString("id")
    )
  )
).reduceByKey((o1: RevenueHour, o2: RevenueHour) => RevenueHour(getDateAs("hour", o1.date), o1.revenue + o2.revenue, o1.id))

我使用 o1 RevenueHour 因为 o1 和 o2 将具有相同的键和相同的 id(因为之前的 where 子句)。

希望对你有帮助。

【讨论】:

  • 哇,非常感谢,代码看起来不错。我把它格式化了一下,因为我很难弄清楚什么东西属于哪里,我会马上检查出来。
  • 感谢编辑,我没有测试代码,但希望你能理解。
  • 好吧,结构看起来不错,但是当我运行代码时,每当我想在val mapfirst 上执行count 或类似的事情时,都会遇到序列化问题。错误是Job aborted due to stage failure: Task not serializable:我认为case类默认是可序列化的?
  • 您是否尝试过在案例类中添加“extends Serializable”?
  • 日本人做到了,但没有帮助。这是case class Revenue(date: String, revenue: Long, id: String) extends Serializable 的行。
【解决方案2】:

针对该问题提出的方法是通过迭代一组 id 并仅在数据的一个(可能很小的)子集上应用 Spark 作业来对数据的处理进行排序。

在不知道“通道”和“表”数据之间的关系如何的情况下,我看到了两个充分利用 Spark 并行处理数据能力的选项:

选项 1

如果“表”表中的数据(从这里称为“订单”)包含我们在报告中需要的所有 ID 集,我们可以将报告逻辑应用于整个表:

根据问题,我们将使用这个 C* 架构:

CREATE TABLE example.orders (id text,
      date TIMESTAMP,
      f2 decimal,
      f3 decimal,
      f4 decimal,
      PRIMARY KEY(id, date)
);

通过提供一个表示表架构的案例类,访问 cassandra 数据变得更加容易:

case class Order(id: String, date:Long, f2:Option[BigDecimal], f3:Option[BigDecimal], f4:Option[BigDecimal]) {
    lazy val total = List(f2,f3,f4).flatten.sum
}

然后我们可以根据cassandra表定义一个rdd。当我们提供案例类作为类型时,spark-cassandra 驱动程序可以直接执行转换,以方便我们:

val ordersRDD = sc.cassandraTable[Order]("example", "orders").select("id", "date", "f2", "f3", "f4")

val revenueByIDPerHour = ordersRDD.map{order => ((order.id, getDateAs("hour", order.date)), order.total)}.reduceByKey(_ + _) 

最后保存回 Cassandra:

revenueByIDPerHour.map{ case ((id,date), revenue) => (id, date, revenue)}
    .saveToCassandra("example","revenue", SomeColumns("id", "date", "total"))

选项 2

如果应该使用 ("app","channels") 表中包含的 id 来过滤 id 集(例如有效 id),那么我们可以将此表中的 id 与订单连接。该工作将与上一个类似,但增加了:

val idRDD = sc.cassandraTable("app","channels").select("id").map(_.getString)
val ordersRDD = sc.cassandraTable[Order]("example", "orders").select("id", "date", "f2", "f3", "f4")
val validOrders = idRDD.join(ordersRDD.map(order => (id,order))

这两种方式说明了如何使用 Cassandra 和 Spark,利用 Spark 操作的分布式特性。它也应该比对“通道”表中的每个 ID 执行查询要快得多。

【讨论】:

  • 感谢 maasg,您的出色回答。该代码看起来非常性感,并且对我重构当前的代码库有很大帮助。选项 1 实际上是我使用的选项。我有一个问题。对于选项 1,我们现在选择订单表中的每一行,如果数据存储在多个节点上,这是否会导致 cassandra 响应速度变慢,或者这无关紧要?我真的可以提高整个表的选择速度吗?选择速度取决于表大小还是吞吐量?
  • 从多个节点获取数据应该更快,因为您能够并行化数据摄取。选择速度将取决于表大小(多少数据)和吞吐量(每单位时间我可以传输多少数据)
  • 选择速度已经很明显了。您能否提供一些有关并行化日期摄取的更多信息以及如何实现这一点?
  • 你是如何运行 Spark 的?假设您在集群中运行 Spark,并行分布式处理是 Spark 计算模型所固有的。
  • Jap 在集群中运行 Spark。我想我知道您将日期摄取并行化是什么意思;)
猜你喜欢
  • 1970-01-01
  • 2016-01-22
  • 1970-01-01
  • 1970-01-01
  • 2016-03-04
  • 1970-01-01
  • 1970-01-01
  • 2015-06-14
  • 1970-01-01
相关资源
最近更新 更多