【问题标题】:Spark: Using mapPartition with ScalaSpark:在 Scala 中使用 mapPartition
【发布时间】:2016-12-04 12:45:00
【问题描述】:

假设我有以下数据框:

var randomData = Seq(("a",8),("h",5),("f",3),("a",2),("b",8),("c",3)
val df = sc.parallelize(randomData,2).toDF()

我有这个函数,它将作为mapPartition 的输入:

def trialIterator(row:Iterator[(String,Int)]): Iterator[(String,Int)] =
    row.toArray.tail.toIterator

并使用地图分区:

df.mapPartition(trialIterator)

我收到以下错误消息:

类型不匹配,预期 (Iterator[Row]) => Iterator[NotInferedR],实际:Iterator[(String,Int) => Iterator[(String,Int)]

我可以理解这是由于我的函数的输入、输出类型而发生的,但是如何解决这个问题?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    如果您想获得强类型输入,请不要使用Dataset[Row] (DataFrame),而是使用Dataset[T],其中T 在此特定场景中为(String, Int)。也不要转换成Array,也不要在不知道分区是否为空的情况下盲目调用tail

    def trialIterator(iter: Iterator[(String, Int)]) = iter.drop(1)
    
    randomData
      .toDS // org.apache.spark.sql.Dataset[(String, Int)]
      .mapPartitions(trialIterator _)
    

    randomData.toDF // org.apache.spark.sql.Dataset[Row] 
      .as[(String, Int)] // org.apache.spark.sql.Dataset[(String, Int)]
      .mapPartitions(trialIterator _)
    

    【讨论】:

    • 感谢您的回答。这里的功能只是为了说明我的问题。不是我想要使用的那个。为什么我不应该使用数据框?
    • 因为对于实际应用,DataFrame 只是一个Dataset[Seq[Any]],所以你可以简单地认为它是无类型的/不是类型安全的。
    【解决方案2】:

    你期待输入Iterator[(String,Int)],而你应该期待Iterator[Row]

    def trialIterator(row:Iterator[Row]): Iterator[(String,Int)] = {
        row.next()
        row //seems to do the same thing w/o all the conversions
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-02-07
      • 1970-01-01
      • 2016-10-28
      • 2020-08-09
      • 2021-12-26
      • 2023-03-11
      • 2016-03-11
      • 1970-01-01
      相关资源
      最近更新 更多