【发布时间】:2014-11-25 07:57:59
【问题描述】:
我在从 Cassandra 加载的 RDD 上有一个简单的 map 和 reduce 作业。
代码看起来像这样
sc.cassandraTable("app","channels").select("id").toArray.foreach((o) => {
val orders = sc.cassandraTable("fam", "table")
.select("date", "f2", "f3", "f4")
.where("id = ?", o("id")) # This o("id") is the ID i want later append to the finished list
val month = orders
.map( oo => {
var total_revenue = List(oo.getIntOption("f2"), oo.getIntOption("f3"), oo.getIntOption("f4")).flatten.reduce(_ + _)
(getDateAs("hour", oo.getDate("date")), total_revenue)
})
.reduceByKey(_ + _)
})
所以这段代码总结了收入并返回类似的东西
(2014-11-23 18:00:00, 12412)
(2014-11-23 19:00:00, 12511)
现在我想将其保存回 Cassandra 表 revenue_hour,但我需要该列表中的 ID,类似这样。
(2014-11-23 18:00:00, 12412, "CH1")
(2014-11-23 19:00:00, 12511, "CH1")
我怎样才能使这项工作不仅仅是一个(键,值)列表?我怎样才能传递更多不应该转换的值,而只是传递到最后,以便我可以将其保存回 Cassandra?
【问题讨论】:
-
我不知道使用此代码的上下文,但对我来说,按 ID 计算所有 ID 的订单看起来更自然。就目前而言,这不是使用 Spark 的好案例。可能通过“常规”编程方式进行查询并在结果上运行本地计算会快得多。也就是说,您只需将“id”添加到
.select("id", ...)中的字段 -
感谢您的回复,maasg。我更新了问题并为代码提供了更多上下文。在
channels表中存储了所有可能的ID。我们遍历它们并捕获每个 id 的收入,将其聚合并将其写入 C* 表中。这是使用 Spark 的好案例吗?我不能完全听从你的建议,也许你可以改写一下?
标签: scala cassandra apache-spark