【问题标题】:Joining not-pair RDDs in Spark在 Spark 中加入非配对 RDD
【发布时间】:2015-04-21 22:34:26
【问题描述】:

在电信行业的背景下,假设我们从 Cassandra 中的一些表中填充了几个现有的 RDD:

val callPrices: RDD[PriceRow]
val calls: RDD[CallRow]
val offersInCourse: RDD[OfferRow]

其中类型定义如下,

/** Represents the price per minute for a concrete hour */
case class PriceRow(
    val year: Int,
    val month: Int,
    val day: Int,
    val hour: Int,
    val basePrice: Float)

/** Call registries*/
case class CallRow(
    val customer: String,
    val year: Int,
    val month: Int,
    val day: Int,
    val hour: Int,
    val minutes: Int)

/** Is there any discount that could be applicable here? */
case class OfferRow(
    val offerName: String,
    val hour: Int,//[0..23]
    val discount: Float)//[0..1]

假设我们不能像这样使用flatMap来混合这三个RDD(因为RDD并不是真正的“单子”),

/** 
 * The final bill at a concrete hour for a call 
 * is defined as {{{ 
 *    def billPerHour(minutes: Int,basePrice:Float,discount:Float) = 
 *      minutes * basePrice * discount
 * }}}
 */
val bills: RDD[BillRow] = for{
    price <- callPrices
    call <- calls if call.hour==price.hour
    offer <- offersInCourse if offer.hour==price.hour
} yield BillRow(
    call.customer,
    call.hour,
    billPerHour(call.minutes,price.basePrice,offer.discount))

case class BillRow(
    val customer: String,
    val hour: DateTime,
    val amount: Float)

生成连接所有这三个 RDD 并代表具体客户账单的新 RDD 的最佳做法是什么?

【问题讨论】:

    标签: scala join apache-spark


    【解决方案1】:

    最后我使用了 Daniel 在 spark mail list 建议我的方法。

    所以我解决如下:

    type Key = (Int,Int,Int,Int)
    type Price = Int
    type CustomerCall = (String,Int)
    type Offer = (String,Float)
    
    val keyedCallPricesRdd: RDD[(Key,Price)] = callPrices.map{
        case PriceRow(year,month,day,hour,basePrice) =>
            ((year,month,day,hour),basePrice)
    }
    val keyedCallsRdd: RDD[(Key,CustomerCall)] = calls.map{
        case CallRow(customer,year,month,day,hour,minutes) =>
            ((year,month,day,hour),(customer,minutes))
    }
    val keyedOffersRdd: RDD[(Key,Offer)] = for{
        offer <- offersInCourse
        year <- List(2013,2014) //possible years I want to calculate
        month <- 1 to 12
        day <- 1 to 31
    } yield ((year,month,day,offer.hour),(offer.offerName,offer.discount))
    
    import org.apache.spark.SparkContext._
    
    keyedCallPricesRdd
        .join(keyedCallsRdd)
        .join(keyedOffersRdd)
        .map { 
    
        case (key:Key,(price:Price,call:CustomerCall,offer:Offer)) => 
            //do whatever you need... 
    
    }
    

    【讨论】:

      猜你喜欢
      • 2016-01-24
      • 2018-11-24
      • 1970-01-01
      • 2016-05-29
      • 2020-06-25
      • 2021-09-08
      • 1970-01-01
      • 2015-06-22
      • 1970-01-01
      相关资源
      最近更新 更多