【问题标题】:How to implement parameters with unknown types and unknown number in Scala如何在Scala中实现未知类型和未知数量的参数
【发布时间】:2016-03-10 11:21:19
【问题描述】:

我想在 Scala/Spark 中实现一个函数,它可以采用多个减速器/聚合器并一次执行它们。所以基本上我给出了 reduce 函数和初始值,它应该在一次传递中创建一个复合 reduce 操作。

以下是 Python 中的逻辑

from functools import reduce

def reduce_at_once(data, reducer_funcs_inits):
    reducer_funcs, inits = zip(*reducer_funcs_inits)

    complete_reducer_func = lambda acc, y: tuple(rf(a_x, y) for a_x, rf in zip(acc, reducer_funcs))

    return list(reduce(complete_reducer_func, data, inits))

data = list(range(1, 20))
reducer_funcs_inits = [(lambda acc, y: acc + y, 0), # sum
                       (lambda acc, y: acc * y, 1)  # product
                       ]
print(list(reduce_at_once(data, reducer_funcs_inits)))
# [190, 121645100408832000]

我怎样才能在 Scala (Spark) 中做这样的事情?问题似乎是我有一个列表,它的长度我只在调用时才知道,而且列表的元素可能有不同的类型(减少初始累加器),具体取决于我想要包含的减速器(不一定只有这里的数字)。

【问题讨论】:

  • 您还应该添加python 标签

标签: scala apache-spark reduce


【解决方案1】:

您可以随时使用

def reduce_at_once(data: Any, reducer_funcs_inits: Any*)

但这很少是你想要的。特别是,这里你实际上需要

case class ReducerInit[A, B](f: (B, A) => B, init: B)

def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_]

不幸的是,实现reduce_at_once 将会非常难看:

def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_] = {
  val rfs = rfis.map(_.f.asInstanceOf[(Any, A) => Any])
  val inits = rfis.map(_.init.asInstanceOf[Any])

  val crf = (acc: Seq[Any], y: A) => acc.zip(rfs).map { case (a_x, rf) => rf(a_x, y) }

  data.foldLeft(inits)(crf)
}

检查:

val data = 1 to 20

val rf1 = ReducerInit[Int, Int](_ + _, 0)
val rf2 = ReducerInit[Int, Int](_ * _, 1)

println(reduce_at_once(data, rf1, rf2))

givesArrayBuffer(210, -2102132736)(注意溢出)。

【讨论】:

    猜你喜欢
    • 2011-05-20
    • 2022-01-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-29
    • 2012-07-23
    • 1970-01-01
    • 2020-08-01
    相关资源
    最近更新 更多