【问题标题】:Spark RDDs: How to join value in a Map to a row in an RDDSpark RDD:如何将 Map 中的值连接到 RDD 中的行
【发布时间】:2018-04-08 13:33:09
【问题描述】:

我有一个 csv 文件作为 RDD 加载到 Spark 中:

val home_rdd = sc.textFile("hdfs://path/to/home_data.csv")
val home_parsed = home_rdd.map(row => row.split(",").map(_.trim))
val home_header = home_parsed.first
val home_data = home_parsed.filter(_(0) != home_header(0))

home_data 然后是:

scala> home_data
res17: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at filter at <console>:30

scala> home_data.take(3)
res20: Array[Array[String]] = Array(Array("7129300520", "20141013T000000", 221900, "3", "1", 1180, 5650, "1", 0, 0, 3, 7, 1180, 0, 1955, 0, "98178", 47.5112, -122.257, 1340, 5650), Array("6414100192", "20141209T000000", 538000, "3", "2.25", 2570, 7242, "2", 0, 0, 3, 7, 2170, 400, 1951, 1991, "98125", 47.721, -122.319, 1690, 7639), Array("5631500400", "20150225T000000", 180000, "2", "1", 770, 10000, "1", 0, 0, 3, 6, 770, 0, 1933, 0, "98028", 47.7379, -122.233, 2720, 8062))

我还有一个 csv 的邮政编码作为 RDD 加载到社区,然后用于创建一个 Map[String,String] 的地图,其中包含:

val zip_rdd = sc.textFile("hdfs://path/to/zipcodes.csv")
val zip_parsed = zip_rdd.map(row => row.split(",").map(_.trim))
val zip_header = zip_parsed.first
val zip_data = zip_parsed.filter(_(0) != zip_header(0))
val zip_map = zip_data.map(row => (row(0), row(1))).collectAsMap
val zip_ind = home_header.indexOf("zipcode") //to get the zipcode column in home_data

地点:

scala> zip_map.take(3)
res21: scala.collection.Map[String,String] = Map(98151 -> Seattle, 98052 -> Redmond, 98104 -> Seattle)

接下来我要做的是遍历home_data 并使用每行中的邮政编码值(zip_ind = 16)从zip_map 获取邻域值并将该值附加到末尾行。

val zip_processed = home_data.map(row => row :+ zip_map.get(row(zip_ind)))

但每次从 zip_map 获取时,都会出现故障,因此它只会将 None 附加到 home_data 中每一行的末尾

scala> zip_processed.take(3)
res19: Array[Array[java.io.Serializable]] = Array(Array("7129300520", "20141013T000000", 221900, "3", "1", 1180, 5650, "1", 0, 0, 3, 7, 1180, 0, 1955, 0, "98178", 47.5112, -122.257, 1340, 5650, None), Array("6414100192", "20141209T000000", 538000, "3", "2.25", 2570, 7242, "2", 0, 0, 3, 7, 2170, 400, 1951, 1991, "98125", 47.721, -122.319, 1690, 7639, None), Array("5631500400", "20150225T000000", 180000, "2", "1", 770, 10000, "1", 0, 0, 3, 6, 770, 0, 1933, 0, "98028", 47.7379, -122.233, 2720, 8062, None))

我正在尝试调试它,但不确定为什么它在 zip_map.get(row(zip_ind)) 失败。

我对 Scala 相当熟悉,所以也许我做了一些错误的假设,但我试图弄清楚如何更好地理解 map 函数中发生的事情。

【问题讨论】:

    标签: scala csv apache-spark mapping hdfs


    【解决方案1】:

    Map.get() 在没有匹配时返回None。您可以使用getOrElse 将 Map 值附加到后备:

    val home_data = sc.parallelize(Array(
      Array("7129300520", "20141013T000000", 221900, "3", "1", 1180, 5650, "1", 0, 0, 3, 7, 1180, 0, 1955, 0, "98178", 47.5112, -122.257, 1340, 5650),
      Array("6414100192", "20141209T000000", 538000, "3", "2.25", 2570, 7242, "2", 0, 0, 3, 7, 2170, 400, 1951, 1991, "98125", 47.721, -122.319, 1690, 7639),
      Array("5631500400", "20150225T000000", 180000, "2", "1", 770, 10000, "1", 0, 0, 3, 6, 770, 0, 1933, 0, "98028", 47.7379, -122.233, 2720, 8062)
    ))
    
    val zip_ind = 16
    val zip_map: Map[String, String] = Map("98178" -> "A", "98028" -> "B")
    
    val zip_processed = home_data.map(row => row :+ zip_map.getOrElse(row(zip_ind).toString, "N/A"))
    
    zip_processed.collect
    // res1: Array[Array[Any]] = Array(
    //   Array(7129300520, 20141013T000000, 221900, 3, 1, 1180, 5650, 1, 0, 0, 3, 7, 1180, 0, 1955, 0, 98178, 47.5112, -122.257, 1340, 5650, A),
    //   Array(6414100192, 20141209T000000, 538000, 3, 2.25, 2570, 7242, 2, 0, 0, 3, 7, 2170, 400, 1951, 1991, 98125, 47.721, -122.319, 1690, 7639, N/A),
    //   Array(5631500400, 20150225T000000, 180000, 2, 1, 770, 10000, 1, 0, 0, 3, 6, 770, 0, 1933, 0, 98028, 47.7379, -122.233, 2720, 8062, B)
    // )
    

    【讨论】:

    • 谢谢,但我相信我试过了,我相信问题更多在于在任何一种方法中,如果我使用 getOrElse,所有输出都是 NoneN/A 所以问题是即使我知道每行中索引 16 处的值映射到 zip_map 中的某些内容,也没有任何匹配项
    • 不幸的是,我无法复制所描述的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-02-27
    • 2018-04-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-23
    • 2016-05-29
    相关资源
    最近更新 更多