【问题标题】:How to create a new RDD without nested transformation如何在没有嵌套转换的情况下创建新的 RDD
【发布时间】:2018-09-18 02:05:54
【问题描述】:

我想创建一个包含以下格式记录的 RDD:

(行程, (起点站详情), (终点站详情))

import org.apache.spark._

val input1 = sc.textFile("data/trips/*")
val header1 = input1.first // to skip the header row
val trips = input1.filter(_ != header1).map(_.split(","))

val input2 = sc.textFile("data/stations/*")
val header2 = input2.first // to skip the header row
val stations = input2.filter(_!=header2).map(_.split(",")).keyBy(_(0).toInt)

def pjoined (joined: (Array[String], Array[String], Array[String])) = {
    println(""+joined._1.deep.mkString(",")+"; "+joined._2.deep.mkString(",")+"; "+joined._3.deep.mkString(","))
}

val joinedtrips = trips.map(tup => (tup, (stations.filter(_._1==tup(4).toInt).first._2), (stations.filter(_._1==tup(7).toInt).first._2)))
joinedtrips.take(5).foreach(pjoined)

倒数第二行失败并出现以下错误:

org.apache.spark.SparkException:RDD 转换和动作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。

实现这一目标的正确有效方法是什么?

stations.csv:

station_id,name,lat,long,dockcount,landmark,installation,notes
2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013,
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013,
...

trips.csv:

Trip ID,Duration,Start Date,Start Station,Start Terminal,End Date,End Station,End Terminal,Bike #,Subscription Type,Zip Code
4258,114,8/29/2013 11:33,San Jose City Hall,10,8/29/2013 11:35,MLK Library,11,107,Subscriber,95060
4265,151,8/29/2013 11:40,San Francisco City Hall,58,8/29/2013 11:42,San Francisco City Hall,58,520,Subscriber,94110
...

stations.csv 中的station_id 与trips.csv 中的Start Terminal(索引4)和End Terminal(索引7)匹配

【问题讨论】:

  • 有什么理由不使用较新的 DataFrame API(它更易于使用且更清晰)?
  • 您好 Shaido,感谢您将问题编辑为更好的格式。它来自我正在阅读的一些学习材料。它尚未涵盖 DataFrame,但我会研究它。感谢您分享这个想法。
  • 如果是RDD或者dataframe,这里你要做的是使用join。可以在此处查看如何在 RDD 上完成此操作的一些示例:stackoverflow.com/questions/27437507/…
  • 为什么你的标题行没有过滤条件。 filter(× =》 !x.contains("station_id,name,lat,long,dockcount,landmark") . 这个很简单

标签: scala csv apache-spark rdd


【解决方案1】:

有两种方法。另外,请阅读 Shaido 的评论以使用 Dataframe。

val bcStations = sc.broadcast(stations.collectAsMap)

val joined = trips.map(trip =>{
    (trip, bcStations.value.getOrElse(trip(4).toInt, Nil), bcStations.value.getOrElse(trip(7).toInt, Nil))
})

println(joined.toDebugString)

joined.take(1)

val mapStations = stations.collectAsMap

val joinedtrips = trips.map(trip => {
    (trip, mapStations.getOrElse(trip(4).toInt, Nil), mapStations.getOrElse(trip(7).toInt, Nil))
})

joinedtrips.take(1)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-14
    • 1970-01-01
    • 1970-01-01
    • 2017-01-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多