【发布时间】:2015-04-21 22:34:26
【问题描述】:
在电信行业的背景下,假设我们从 Cassandra 中的一些表中填充了几个现有的 RDD:
val callPrices: RDD[PriceRow]
val calls: RDD[CallRow]
val offersInCourse: RDD[OfferRow]
其中类型定义如下,
/** Represents the price per minute for a concrete hour */
case class PriceRow(
val year: Int,
val month: Int,
val day: Int,
val hour: Int,
val basePrice: Float)
/** Call registries*/
case class CallRow(
val customer: String,
val year: Int,
val month: Int,
val day: Int,
val hour: Int,
val minutes: Int)
/** Is there any discount that could be applicable here? */
case class OfferRow(
val offerName: String,
val hour: Int,//[0..23]
val discount: Float)//[0..1]
假设我们不能像这样使用flatMap来混合这三个RDD(因为RDD并不是真正的“单子”),
/**
* The final bill at a concrete hour for a call
* is defined as {{{
* def billPerHour(minutes: Int,basePrice:Float,discount:Float) =
* minutes * basePrice * discount
* }}}
*/
val bills: RDD[BillRow] = for{
price <- callPrices
call <- calls if call.hour==price.hour
offer <- offersInCourse if offer.hour==price.hour
} yield BillRow(
call.customer,
call.hour,
billPerHour(call.minutes,price.basePrice,offer.discount))
case class BillRow(
val customer: String,
val hour: DateTime,
val amount: Float)
生成连接所有这三个 RDD 并代表具体客户账单的新 RDD 的最佳做法是什么?
【问题讨论】:
标签: scala join apache-spark