【问题标题】:spark convert spark-SQL to RDD APIspark 将 spark-SQL 转换为 RDD API
【发布时间】:2017-01-04 00:42:12
【问题描述】:

Spark SQL 对我来说非常清楚。但是,我刚刚开始使用 spark 的 RDD API。正如spark apply function to columns in parallel 指出的那样,这应该让我摆脱缓慢的洗牌

def handleBias(df: DataFrame, colName: String, target: String = this.target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }
}

在伪代码中:df foreach column (handleBias(column) 所以加载了一个最小的数据框

val input = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  )
  val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

但无法正确映射

val rdd1_inputDf = inputDf.rdd.flatMap { x => {(0 until x.size).map(idx => (idx, x(idx)))}}
      rdd1_inputDf.toDF.show

失败了

java.lang.ClassNotFoundException: scala.Any
java.lang.ClassNotFoundException: scala.Any

可以在https://github.com/geoHeil/sparkContrastCodinghttps://github.com/geoHeil/sparkContrastCoding/blob/master/src/main/scala/ColumnParallel.scala 找到本问题中概述的问题的示例。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql rdd


    【解决方案1】:

    当您在 DataFrame 上调用 .rdd 时,您会得到一个非强类型的 RDD[Row]。如果您希望能够映射元素,则需要对 Row 进行模式匹配:

    scala> val input = Seq(
         |     (0, "A", "B", "C", "D"),
         |     (1, "A", "B", "C", "D"),
         |     (0, "d", "a", "jkl", "d"),
         |     (0, "d", "g", "C", "D"),
         |     (1, "A", "d", "t", "k"),
         |     (1, "d", "c", "C", "D"),
         |     (1, "c", "B", "C", "D")
         |   )
    input: Seq[(Int, String, String, String, String)] = List((0,A,B,C,D), (1,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D))
    
    scala> val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 
    inputDf: org.apache.spark.sql.DataFrame = [TARGET: int, col1: string ... 3 more fields]
    
    scala> import org.apache.spark.sql.Row
    import org.apache.spark.sql.Row
    
    scala> val rowRDD = inputDf.rdd
    rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:27
    
    scala> val typedRDD = rowRDD.map{case Row(a: Int, b: String, c: String, d: String, e: String) => (a,b,c,d,e)}
    typedRDD: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[20] at map at <console>:29
    
    scala> typedRDD.keyBy(_._1).groupByKey.foreach{println}
    [Stage 7:>                                                          (0 + 0) / 4]
    (0,CompactBuffer((A,B,C,D), (d,a,jkl,d), (d,g,C,D)))
    (1,CompactBuffer((A,B,C,D), (A,d,t,k), (d,c,C,D), (c,B,C,D)))
    

    否则您可以使用键入的Dataset

    scala> val ds = input.toDS
    ds: org.apache.spark.sql.Dataset[(Int, String, String, String, String)] = [_1: int, _2: string ... 3 more fields]
    
    scala> ds.rdd
    res2: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[8] at rdd at <console>:30
    
    scala> ds.rdd.keyBy(_._1).groupByKey.foreach{println}
    [Stage 0:>                                                          (0 + 0) / 4]
    (0,CompactBuffer((0,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D)))
    (1,CompactBuffer((1,A,B,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D)))
    

    【讨论】:

    • 因为我想在 ml.Pipeline 中使用它并且输出步骤是 DataFrame ,所以“模式丢失​​”例如我需要使用模式匹配吗?这个对吗?但是有很多列是否有办法“推断”它们(部分 shcema?
    • 是的,很遗憾,DF =&gt; RDD 转换根本不使用模式(而且我认为没有强制使用它的好方法)。但是看看我的新Dataset 示例:不需要使用中介Dataframe 并且看起来DataSet 可以很好地推断类型(在Spark 2.0 中,我认为你可以用DF 做的任何事情也可以用 DS 完成)
    • @GeorgHeiler(不确定您是否收到通知 ^^^^)
    • 谢谢。确实你是对的。然而,ml 管道中的 spark 转换器将仅输出数据帧 ;) 即使以数据集作为输入。所以我认为后续的转换器步骤会丢失架构
    • 我在这里发布了一个后续问题stackoverflow.com/questions/41445571/…也许你也有建议。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-02-10
    • 1970-01-01
    • 2015-02-27
    • 1970-01-01
    • 2018-03-05
    • 2017-06-02
    • 2018-06-14
    相关资源
    最近更新 更多