【问题标题】:How to transform Scala nested map operation to Scala Spark operation?如何将 Scala 嵌套映射操作转换为 Scala Spark 操作?
【发布时间】:2014-11-07 23:13:57
【问题描述】:

以下代码计算数据集中两个 List 之间的欧氏距离:

 val user1 = List("a", "1", "3", "2", "6", "9")  //> user1  : List[String] = List(a, 1, 3, 2, 6, 9)
  val user2 = List("b", "1", "2", "2", "5", "9")  //> user2  : List[String] = List(b, 1, 2, 2, 5, 9)

  val all = List(user1, user2)                    //> all  : List[List[String]] = List(List(a, 1, 3, 2, 6, 9), List(b, 1, 2, 2, 5,
                                                  //|  9))



  def euclDistance(userA: List[String], userB: List[String]) = {
    println("comparing "+userA(0) +" and "+userB(0))
    val zipped = userA.zip(userB)
    val lastElements = zipped match {
      case (h :: t) => t
    }
    val subElements = lastElements.map(m => ((m._1.toDouble - m._2.toDouble) * (m._1.toDouble - m._2.toDouble)))
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

    sqRoot
  }                                               //> euclDistance: (userA: List[String], userB: List[String])Double

  all.map(m => (all.map(m2 => euclDistance(m,m2))))
                                                  //> comparing a and a
                                                  //| comparing a and b
                                                  //| comparing b and a
                                                  //| comparing b and b
                                                  //| res0: List[List[Double]] = List(List(0.0, 1.4142135623730951), List(1.414213
                                                  //| 5623730951, 0.0))

但是如何将其转化为并行的 Spark Scala 操作呢?

当我打印 distAll 的内容时:

scala> distAll.foreach(p => p.foreach(println))
14/10/24 23:09:42 INFO SparkContext: Starting job: foreach at <console>:21
14/10/24 23:09:42 INFO DAGScheduler: Got job 2 (foreach at <console>:21) with 4
output partitions (allowLocal=false)
14/10/24 23:09:42 INFO DAGScheduler: Final stage: Stage 2(foreach at <console>:2
1)
14/10/24 23:09:42 INFO DAGScheduler: Parents of final stage: List()
14/10/24 23:09:42 INFO DAGScheduler: Missing parents: List()
14/10/24 23:09:42 INFO DAGScheduler: Submitting Stage 2 (ParallelCollectionRDD[1
] at parallelize at <console>:18), which has no missing parents
14/10/24 23:09:42 INFO MemoryStore: ensureFreeSpace(1152) called with curMem=115
2, maxMem=278019440
14/10/24 23:09:42 INFO MemoryStore: Block broadcast_2 stored as values in memory
 (estimated size 1152.0 B, free 265.1 MB)
14/10/24 23:09:42 INFO DAGScheduler: Submitting 4 missing tasks from Stage 2 (Pa
rallelCollectionRDD[1] at parallelize at <console>:18)
14/10/24 23:09:42 INFO TaskSchedulerImpl: Adding task set 2.0 with 4 tasks
14/10/24 23:09:42 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 8, lo
calhost, PROCESS_LOCAL, 1169 bytes)
14/10/24 23:09:42 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 9, lo
calhost, PROCESS_LOCAL, 1419 bytes)
14/10/24 23:09:42 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 10, l
ocalhost, PROCESS_LOCAL, 1169 bytes)
14/10/24 23:09:42 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 11, l
ocalhost, PROCESS_LOCAL, 1420 bytes)
14/10/24 23:09:42 INFO Executor: Running task 0.0 in stage 2.0 (TID 8)
14/10/24 23:09:42 INFO Executor: Running task 1.0 in stage 2.0 (TID 9)
14/10/24 23:09:42 INFO Executor: Running task 3.0 in stage 2.0 (TID 11)
a14/10/24 23:09:42 INFO Executor: Running task 2.0 in stage 2.0 (TID 10)

14/10/24 23:09:42 INFO Executor: Finished task 2.0 in stage 2.0 (TID 10). 585 by
tes result sent to driver
114/10/24 23:09:42 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 10)
in 16 ms on localhost (1/4)

314/10/24 23:09:42 INFO Executor: Finished task 0.0 in stage 2.0 (TID 8). 585 by
tes result sent to driver

214/10/24 23:09:42 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 8) i
n 16 ms on localhost (2/4)

6
9
14/10/24 23:09:42 INFO Executor: Finished task 1.0 in stage 2.0 (TID 9). 585 byt
es result sent to driver
b14/10/24 23:09:42 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 9) i
n 16 ms on localhost (3/4)

1
2
2
5
9
14/10/24 23:09:42 INFO Executor: Finished task 3.0 in stage 2.0 (TID 11). 585 by
tes result sent to driver
14/10/24 23:09:42 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 11) i
n 31 ms on localhost (4/4)
14/10/24 23:09:42 INFO DAGScheduler: Stage 2 (foreach at <console>:21) finished
in 0.031 s
14/10/24 23:09:42 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have
all completed, from pool
14/10/24 23:09:42 INFO SparkContext: Job finished: foreach at <console>:21, took
 0.037641021 s

距离没有填充?

更新:

为了让 Eu​​gene Zhulenev 下面的答案为我工作,我需要进行以下更改:

用 java.io.Serializable 扩展 UserObject

还将 User 重命名为 UserObject。

这里是更新的代码:

val user1 = List("a", "1", "3", "2", "6", "9")    //> user1  : List[String] = List(a, 1, 3, 2, 6, 9)
  val user2 = List("b", "1", "2", "2", "5", "9")  //> user2  : List[String] = List(b, 1, 2, 2, 5, 9)

  case class User(name: String, features: Vector[Double])

object UserObject extends java.io.Serializable {
    def fromList(list: List[String]): User = list match {
      case h :: tail => User(h, tail.map(_.toDouble).toVector)
    }
  }

 val all = List(UserObject.fromList(user1), UserObject.fromList(user2))


    val users= sc.parallelize(all.combinations(2).toSeq.map {
    case l :: r :: Nil => (l, r)
  })

   def euclDistance(userA: User, userB: User) = {
    println(s"comparing ${userA.name} and ${userB.name}")
    val subElements = (userA.features zip userB.features) map {
      m => (m._1 - m._2) * (m._1 - m._2)
    }
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

println("value is"+sqRoot)
    sqRoot
  }

  users.foreach(t => euclDistance(t._1, t._2))

更新 2:

我在 maasg 答案中尝试过代码但收到错误:

scala> val userDistanceRdd = usersRdd.map { case (user1, user2) => {
     |         val data = sc.broadcast.value
     |         val distance = euclidDistance(data(user1), data(user2))
     |         ((user1, user2),distance)
     |     }
     |     }
<console>:27: error: missing arguments for method broadcast in class SparkContex
t;
follow this method with `_' if you want to treat it as a partially applied funct
ion
               val data = sc.broadcast.value

这是我修改后的完整代码:

type UserId = String
type UserData = Array[Double]

val users: List[UserId]= List("a" , "b")
val data: Map[UserId,UserData] = Map( ("a" , Array(3.0,4.0)),
("b" , Array(3.0,4.0)) )

def combinations[T](l: List[T]): List[(T,T)] = l match {
    case Nil => Nil
    case h::Nil => Nil
    case h::t => t.map(x=>(h,x)) ++ combinations(t)
}
val broadcastData = sc.broadcast(data)

val usersRdd = sc.parallelize(combinations(users))

val euclidDistance: (UserData, UserData) => Double = (x,y) => 
    math.sqrt((x zip y).map{case (a,b) => math.pow(a-b,2)}.sum)
val userDistanceRdd = usersRdd.map { case (user1, user2) => {
        val data = sc.broadcast.value
        val distance = euclidDistance(data(user1), data(user2))
        ((user1, user2),distance)
    }
    }

为了使 maasg 代码正常工作,我需要将 } 添加到 userDistanceRdd 函数。

代码:

type UserId = String
type UserData = Array[Double]

val users: List[UserId] = List("a" , "b")

val data: Map[UserId,UserData] = Map( ("a" , Array(3.0,4.0)),
("b" , Array(3.0,3.0)) )

def combinations[T](l: List[T]): List[(T,T)] = l match {
    case Nil => Nil
    case h::Nil => Nil
    case h::t => t.map(x=>(h,x)) ++ combinations(t)
}

val broadcastData = sc.broadcast(data)
val usersRdd = sc.parallelize(combinations(users))
val euclidDistance: (UserData, UserData) => Double = (x,y) => 
    math.sqrt((x zip y).map{case (a,b) => math.pow(a-b,2)}.sum)
val userDistanceRdd = usersRdd.map{ case (user1, user2) => {
        val data = broadcastData.value
        val distance = euclidDistance(data(user1), data(user2))
        ((user1, user2),distance)
    }
    }

userDistanceRdd.foreach(println)

【问题讨论】:

  • 我不明白为什么这需要在 spark 上运行。您想扩大哪个维度? # 个功能/用户或 # 个用户
  • @maasg 也可能是,但更可能是用户数量
  • @maasg 是不是用于向外扩展的火花?即不扩大“你想扩大什么维度?” ?
  • 'scale up' 是指尺寸增长超出单机内存限制。
  • @maasg 好的,你是说这段代码不适合放大吗?

标签: scala apache-spark


【解决方案1】:

首先,我建议您从将用户模型存储在列表中转移到类型良好的类。然后我认为您不需要像 (a-a) 和 (b-b) 这样计算相同用户之间的距离,也没有理由计算两次 (a-b) (b-a) 之间的距离。

  val user1 = List("a", "1", "3", "2", "6", "9")
  val user2 = List("b", "1", "2", "2", "5", "9")

  case class User(name: String, features: Vector[Double])

  object User {
    def fromList(list: List[String]): User = list match {
      case h :: tail => User(h, tail.map(_.toDouble).toVector)
    }
  }

  def euclDistance(userA: User, userB: User) = {
    println(s"comparing ${userA.name} and ${userB.name}")
    val subElements = (userA.features zip userB.features) map {
      m => (m._1 - m._2) * (m._1 - m._2)
    }
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

    sqRoot
  }

  val all = List(User.fromList(user1), User.fromList(user2))


  val users: RDD[(User, User)] = sc.parallelize(all.combinations(2).toSeq.map {
    case l :: r :: Nil => (l, r)
  })

  users.foreach(t => euclDistance(t._1, t._2))

【讨论】:

  • 谢谢,请查看更新后的问题,我需要进行一些调整才能正常工作。我正在使用 Spark v1.1.0 并通过命令行运行代码。你用的一样吗?
  • 我使用带有本地 spark 上下文的 Spark 1.1.0 位运行它,因此不会出现序列化错误。看起来我第一次没有得到你需要的东西,会尝试更新 mo 答案。 'all.combinations' 在驱动节点上运行,从 RDD 获取组合会很酷。但我现在不知道如何有效地做到这一点。不过你的问题可能没问题
  • “看起来我第一次没有得到你需要的东西,将尝试更新 mo 答案”第一次你指的是通过次数吗?您的代码按预期工作?
  • 我在想如何解决这个问题:stackoverflow.com/questions/26557873/…
  • @EugeneZhulenev 在这里使用 RDD 没有多大意义,因为 foreach 将在驱动程序上运行,如果你用普通的 Scala 计算它实际上没有区别。
【解决方案2】:

实际的解决方案将取决于数据集的维度。假设原始数据集适合内存并且您希望并行计算欧几里德距离,我会这样进行:

假设 users 是按某个 id 列出的用户列表,userData 是按 id 索引的每个用户要处理的数据。

// sc is the Spark Context
type UserId = String
type UserData = Array[Double]

val users: List[UserId]= ???
val data: Map[UserId,UserData] = ???
// combination generates the unique pairs of users for which distance makes sense
// given that euclidDistance (a,b) = eclidDistance(b,a) only (a,b) is in this set
def combinations[T](l: List[T]): List[(T,T)] = l match {
    case Nil => Nil
    case h::Nil => Nil
    case h::t => t.map(x=>(h,x)) ++ comb(t)
}

// broadcasts the data to all workers
val broadcastData = sc.broadcast(data)
val usersRdd = sc.parallelize(combinations(users))
val euclidDistance: (UserData, UserData) => Double = (x,y) => 
    math.sqrt((x zip y).map{case (a,b) => math.pow(a-b,2)}.sum)
val userDistanceRdd = usersRdd.map{ case (user1, user2) => {
        val data = broadcastData.value
        val distance = euclidDistance(data(user1), data(user2))
        ((user1, user2),distance)
    }

如果用户数据太大,您可以从外部存储中加载,而不是使用广播变量。

【讨论】:

  • 请查看我的问题更新。我试过你的代码但收到错误?
  • @blue-sky 哎呀。应该是broadcastData iso broadcast 我会修复答案。 - 完成
猜你喜欢
  • 1970-01-01
  • 2016-06-06
  • 1970-01-01
  • 1970-01-01
  • 2019-11-24
  • 1970-01-01
  • 2013-05-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多