【问题标题】:How to pass data as a tuple into an rdd in Spark using Scala如何使用 Scala 将数据作为元组传递到 Spark 中的 rdd
【发布时间】:2019-06-02 07:16:20
【问题描述】:

我有一组坐标 (x, y) 作为 csv 文件中的数据。我想将这些 x 和 y 作为元组传递给 RDD[(Double, Double)] 并将其命名为点。我尝试了以下方法,但由于某种原因,我收到一条错误消息。 “构造函数无法实例化为预期类型,找到:Array[T],必需:String”。

// Load the data
val data = sc.textFile("data.csv")

// Read the data as an RDD[(Double, Double)]
val points = data.map(line => line.split(",").map{ case Array(x, y) => (x.toDouble, y.toDouble)} )

编辑:有什么方法可以过滤这些点,以便我可以处理为空的值(如果数据集中的 x 或 y 或两者都为空)?本质上,我想检查元组是否总是包含 2 个元素。我试过这样的事情

val points = data.map(line => line.split(",").filter(!_.isEmpty)).map{ case Array(x, y) => (x.toDouble, y.toDouble)}.filter(_.size > 1)

但我得到一个错误类型不匹配,预期:(​​双,双)=>布尔,实际:(双,双)=>任何

【问题讨论】:

    标签: scala apache-spark tuples rdd


    【解决方案1】:

    使用下面的代码。您必须在 split 的输出上调用第二个映射,即数组列表

    // Load the data
          val data = sc.textFile("data.csv")
    
          // Read the data as an RDD[(Double, Double)]
          val points = data.map(line => line.split(",")).map{ case Array(x, y) => (x.toDouble, y.toDouble)}
    

    【讨论】:

    • 有什么方法可以过滤这些点,以便我可以处理空值(如果数据集中的 x 或 y 或两者都为空)?本质上,我想检查元组是否总是包含 2 个元素。我尝试了类似val points = data.map(line => line.split(",").filter(!_.isEmpty)).map{ case Array(x, y) => (x.toDouble, y.toDouble)}.filter(_.size > 1) 的方法,但出现错误类型不匹配,预期:(Double, Double) => Boolean, actual: (Double, Double) => Any
    • 你必须像 val points = data.map(line => line.split(",")).filter(arr => arr.length ==2).map 一样在拆分后使用过滤器{ case Array(x, y) => (x.toDouble, y.toDouble)}
    【解决方案2】:

    您的方法几乎是正确的,但您应该使用:

    val points = data.map(line => {
      val Array(x, y) = line.split(",")
      (x.toDouble, y.toDouble)
    })
    

    或者:

    val points = data.map(line => {
      line.split(",") match {
        case Array(x, y) => (x.toDouble, y.toDouble)
      }
    })
    

    你的方法的问题是你在line.split(",")上调用map,即你在Àrray[String]上调用map,所以你尝试(模式)匹配StringArray(x,y)

    【讨论】:

      【解决方案3】:

      Apache spark 具有读取 csv 文件的 api。我更喜欢使用 api 而不是使用 textFile 来读取 csv 文件,因为它在内部处理缺失值或 null。这是我的data.csv 文件的内容:

      12,13
      12.3,25.6
      12.4
      ,34.5
      

      可以通过以下方式生成所需的输出:

      import org.apache.spark.sql.{Row, SparkSession}
      import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
      
      val scheam = StructType(Array(
            StructField("x",DoubleType,true),
            StructField("y",DoubleType,true)
          ))
      val data_df = spark.read.schema(scheam).csv("data.csv")
      data_df.show()
      +----+----+
      |   x|   y|
      +----+----+
      |12.0|13.0|
      |12.3|25.6|
      |12.4|null|
      |null|34.5|
      //converting the data_df dataframe to RDD[Double,Double]
      val points_rdd = data_df.rdd.map{case Row(x:Double,y:Double) => (x,y)}
      

      处理空值:

      val filterd_data_df = data_df.filter(data_df("x").isNotNull && data_df("y").isNotNull).
                      rdd.map{case Row(x:Double,y:Double) => (x,y)}
      import spark.implicits._
      filterd_data_df.toDF("x", "y").show()
      +----+----+
      |   x|   y|
      +----+----+
      |12.0|13.0|
      |12.3|25.6|
      +----+----+
      

      【讨论】:

        猜你喜欢
        • 2015-09-11
        • 2017-08-01
        • 2017-05-23
        • 1970-01-01
        • 1970-01-01
        • 2017-04-19
        • 2017-06-13
        • 2016-03-10
        • 2021-08-21
        相关资源
        最近更新 更多