【问题标题】:Spark in Scala - Map with Function with Extra ArgumentsScala中的Spark - 带有额外参数的函数映射
【发布时间】:2019-12-19 09:05:02
【问题描述】:

Scala 中有没有一种方法可以为带有附加/额外参数的 RDD 转换定义显式函数?

例如,下面的 Python 代码使用 lambda 表达式将转换映射(需要一个带有一个参数的函数)与函数 my_power(实际上有 2 个参数)一起应用。

def my_power(a, b):
    res = a ** b
    return res

def my_main(sc, n):
    inputRDD = sc.parallelize([1, 2, 3, 4])
    powerRDD = inputRDD.map(lambda x: my_power(x, n))
    resVAL = powerRDD.collect()
    for item in resVAL:
        print(item)

但是,在 Scala 中尝试等效实现时,我得到一个 Task not serializable 异常。

val myPower: (Int, Int) => Int = (a: Int, b: Int) => {
  val res: Int = math.pow(a, b).toInt
  res
}

def myMain(sc: SparkContext, n: Int): Unit = {
  val inputRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))
  val squareRDD: RDD[Int] = inputRDD.map( (x: Int) => myPower(x, n) )
  val resVAL: Array[Int] = squareRDD.collect()
  for (item <- resVAL){
    println(item)
  }
}

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    以这种方式它对我有用。

    package examples
    
    import org.apache.log4j.Level
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    object RDDTest extends App {
      val logger = org.apache.log4j.Logger.getLogger("org")
      logger.setLevel(Level.WARN)
      val spark = SparkSession.builder()
        .appName(this.getClass.getName)
        .config("spark.master", "local[*]").getOrCreate()
    
    
      val myPower: (Int, Int) => Int = (a: Int, b: Int) => {
        val res: Int = math.pow(a, b).toInt
        res
      }
      val scontext = spark.sparkContext
      myMain(scontext, 10);
    
      def myMain(sc: SparkContext, n: Int): Unit = {
        val inputRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))
        val squareRDD: RDD[Int] = inputRDD.map((x: Int) => myPower(x, n))
        val resVAL: Array[Int] = squareRDD.collect()
        for ( item <- resVAL ) {
          println(item)
        }
      }
    }
    
    
    

    结果:

    1024
    59049
    1048576
    
    

    还有另一个选项可以使用sc.broadcast 广播 n,并且也可以像 map 一样在闭包中访问...

    【讨论】:

      【解决方案2】:

      只需添加一个局部变量作为函数别名就可以了:

      val myPower: (Int, Int) => Int = (a: Int, b: Int) => {
        val res: Int = math.pow(a, b).toInt
        res
      }
      
      def myMain(sc: SparkContext, n: Int): Unit = {
        val inputRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))
      
        val myPowerAlias = myPower
        val squareRDD: RDD[Int] = inputRDD.map( (x: Int) => myPowerAlias(x, n) )
      
        val resVAL: Array[Int] = squareRDD.collect()
        for (item <- resVAL){
          println(item)
        }
      }
      

      【讨论】:

      • 上述我提到的方法不适合你?
      猜你喜欢
      • 1970-01-01
      • 2016-01-06
      • 1970-01-01
      • 1970-01-01
      • 2019-12-07
      • 1970-01-01
      • 2022-12-17
      • 2018-12-06
      • 1970-01-01
      相关资源
      最近更新 更多