【问题标题】:Parallel operations on Kotlin collections?Kotlin 集合的并行操作?
【发布时间】:2016-04-14 08:42:11
【问题描述】:

在 Scala 中,可以轻松地做一个并行映射、forEach 等,使用:

collection.par.map(..)

在 Kotlin 中是否有等价物?

【问题讨论】:

标签: parallel-processing kotlin


【解决方案1】:

Kotlin 标准库不支持并行操作。不过,由于 Kotlin 使用标准的 Java 集合类,您也可以使用 Java 8 流 API 对 Kotlin 集合执行并行操作。

例如

myCollection.parallelStream()
        .map { ... }
        .filter { ... }

【讨论】:

  • 如何在 Kotlin 中使用 Java 8 流 API?
  • @LordScone 与您在 Java 中执行此操作的方式相同。例如:myCollection.parallelStream().map { ... }. filter { ... }
  • 仅供参考,仅在长列表的情况下建议使用并行流,对于较小的列表,在 java 世界中在不同线程中运行 2 个作业将是开销。尽可能坚持协程
  • parallelStream 是 Java,而不是 kotlin,这就是它需要 Java 8 的原因
【解决方案2】:

从 Kotlin 1.1 开始,并行操作也可以用 coroutines 非常优雅地表示。这是列表中的pmap

fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
    map { async(CommonPool) { f(it) } }.map { it.await() }
}

请注意,协程仍是一个实验性功能。

【讨论】:

  • 随着 Kotlin 1.3 的推出,这仍然是最好的答案吗?我注意到下面@OlivierTerrien 的 Stream 答案,但我更愿意坚持使用 Kotlin 序列和 Iterables。
  • @BenjaminH 谢谢;我已将 yole 的答案标记为已接受,因为它还引用了流 API 并在 OlivierTerrien 的答案之前发布。
  • 相当优雅?相反,我会说代码很难阅读。
  • @DzmitryLazerka 我想我知道你来自哪里,但这个确切的代码并不是优雅的部分。这段代码的使用很优雅。如果将上述方法放在某处,则可以仅与foo.pmap { v -&gt; ... } 一起使用。我认为这相当优雅。
  • 目前“CommonPool”无法访问 - 它在“kotlinx.coroutines”内部!
【解决方案3】:

在 Kotlin 的标准库中还没有官方支持,但是你可以定义一个 extension function 来模仿 par.map

fun <T, R> Iterable<T>.pmap(
          numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, 
          exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
          transform: (T) -> R): List<R> {

    // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
    val defaultSize = if (this is Collection<*>) this.size else 10
    val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))

    for (item in this) {
        exec.submit { destination.add(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)

    return ArrayList<R>(destination)
}

(github source)

这是一个简单的使用示例

val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }

如果需要,它允许通过提供线程数甚至特定的java.util.concurrent.Executor 来调整线程。例如

listOf("foo", "bar").pmap(4, transform = { it + "!" })

请注意,这种方法只允许并行化map 操作,不会影响任何下游位。例如。第一个示例中的filter 将运行单线程。然而,在许多情况下,只有数据转换(即map)需要并行化。此外,将上述方法扩展到 Kotlin 集合 API 的其他元素也很简单。

【讨论】:

  • 我看不出“destination.add(transform(item))”是如何线程安全的。什么是防止两个线程同时调用“destination.add”,从而破坏东西,因为 ArrayList.add() 不是线程安全操作?
  • 感谢您的提示。相当some 人们认为,当只添加元素时,不同步应该没问题。但是,我已将其更改为使用同步列表来提高线程安全性。
  • destination 中的顺序可能与原始列表中的不同
  • 我认为许多并行集合实现(如scala)并不关心保留顺序。不过,通过将上面的 for-each 循环更改为索引循环以及下游处理,可以轻松地保留顺序。
  • 我对返回Sequence&lt;R&gt;(或Flow&lt;R&gt;)的版本感兴趣。不幸的是,我不能简单地让整个代码在= execute{ 块中执行并调用yield 而不是destination.add,因为yield 只能在原始块中执行,所以在exec.submit { } 内不是一个选项。 (无需保留订单。)
【解决方案4】:

你可以使用这个扩展方法:

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}

更多信息请见Parallel Map in Kotlin

【讨论】:

    【解决方案5】:

    从 1.2 版本开始,kotlin 添加了一个符合 JRE8 的stream feature

    因此,异步迭代列表可以像下面这样完成:

    fun main(args: Array<String>) {
      val c = listOf("toto", "tata", "tutu")
      c.parallelStream().forEach { println(it) }
    }
    

    【讨论】:

    • 我没有密切关注 kotlin;这和 Yole 的答案不一样吗?我很欣赏你的答案有示例代码。也许我们可以编辑 Yole 的答案以添加示例代码。
    • @HRJ,不完全是。 Yole 说 Kotlin 不支持流,直到 1.2 版才成立。从这个版本开始,Kotlin 提供了一种像 Java8 一样流式传输集合的方法。
    • Yole 说“Kotlin 不支持并行操作”。请再次检查。
    • 是的,你是对的。写得太快了。并行操作不流式传输。
    • 可能值得指出的是,由于这需要 JRE8,它仅适用于 Android 24 及更高版本。
    【解决方案6】:

    Kotlin 想要是惯用的,但又不至于过于合成,乍一看难以理解。

    通过协程进行并行计算也不例外。他们希望通过一些预先构建的方法使其简单但不隐含,允许在需要时分支计算。

    在你的情况下:

    collection.map { 
            async{ produceWith(it) } 
        }
        .forEach { 
            consume(it.await()) 
        }
    

    请注意,要调用asyncawait,您需要位于所谓的Context 中,您不能在非协程上下文中进行挂起调用或启动协程。要输入一个,您可以:

    • runBlocking { /* your code here */ }:它将暂停当前线程,直到 lambda 返回。
    • GlobalScope.launch { }:它将并行执行 lambda;如果您的main 执行完毕,而您的协程还没有发生坏事,那么最好使用runBlocking

    希望对你有帮助:)

    【讨论】:

    • 虽然我很欣赏 Kotlin 不想变得不透明,但肯定这是一个足够普遍的要求来保证扩展方法吗? forEachParallel 或类似的东西
    【解决方案7】:

    我发现另一种非常优雅的方法是这样的,使用kotlinx.coroutines 库:

    import kotlinx.coroutines.flow.asFlow
    
    suspend fun process(myCollection: Iterable<Foo>) {
        myCollection.asFlow()
            .map { /* ... */ }
            .filter { /* ... */ }
            .collect { /* ... perform some side effect ... */ }
    }
    

    然而,它确实需要额外的依赖; kotlinx.coroutines 不在标准库中。

    【讨论】:

    【解决方案8】:

    目前没有。 Kotlin 与 Scala 的官方比较提到:

    以后可能会添加到 Kotlin 中的东西:

    • 并行集合

    【讨论】:

      【解决方案9】:

      此解决方案假定您的项目正在使用协程:

      implementation( "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")
      

      名为parallelTransform 的函数不保留元素的顺序并返回Flow&lt;R&gt;,而函数parallelMap 保留元素的顺序并返回List&lt;R&gt;

      为多个调用创建一个线程池:

      val numberOfCores = Runtime.getRuntime().availableProcessors()
      val executorDispatcher: ExecutorCoroutineDispatcher =
          Executors.newFixedThreadPool(numberOfCores ).asCoroutineDispatcher()
      

      使用该调度程序(并在不再需要时调用close()):

      inline fun <T, R> Iterable<T>.parallelTransform(
          dispatcher: ExecutorDispatcher,
          crossinline transform: (T) -> R
      ): Flow<R> = channelFlow {
      
          val items: Iterable<T> = this@parallelTransform
          val channelFlowScope: ProducerScope<R> = this@channelFlow
      
          launch(dispatcher) {
              items.forEach {item ->
                  launch {
                      channelFlowScope.send(transform(item))
                  }
              }
          }
      }
      

      如果线程池重用不重要(线程池并不便宜),您可以使用这个版本:

      inline fun <T, R> Iterable<T>.parallelTransform(
          numberOfThreads: Int,
          crossinline transform: (T) -> R
      ): Flow<R> = channelFlow {
      
          val items: Iterable<T> = this@parallelTransform
          val channelFlowScope: ProducerScope<R> = this@channelFlow
      
          Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
              launch( dispatcher ) {
                  items.forEach { item ->
                      launch {
                          channelFlowScope.send(transform(item))
                      }
                  }
              }
          }
      }
      

      如果您需要保留元素顺序的版本:

      inline fun <T, R> Iterable<T>.parallelMap(
          dispatcher: ExecutorDispatcher,
          crossinline transform: (T) -> R
      ): List<R> = runBlocking {
      
          val items: Iterable<T> = this@parallelMap
          val result = ConcurrentSkipListMap<Int, R>()
      
          launch(dispatcher) {
              items.withIndex().forEach {(index, item) ->
                  launch {
                      result[index] = transform(item)
                  }
              }
          }
      
          // ConcurrentSkipListMap is a SortedMap
          // so the values will be in the right order
          result.values.toList()
      }
      

      【讨论】:

        【解决方案10】:

        我发现了这个:

        实现'com.github.cvb941:kotlin-parallel-operations:1.3'

        详情:

        https://github.com/cvb941/kotlin-parallel-operations

        【讨论】:

          【解决方案11】:

          我想出了几个扩展函数:

          1. Iterable&lt;T&gt; 类型上的 suspend 扩展函数,它对项目进行并行处理并返回处理每个项目的一些结果。默认情况下,它使用Dispatchers.IO 调度程序将阻塞任务卸载到共享线程池。必须从协程(包括带有 Dispatchers.Main 调度程序的协程)或另一个 suspend 函数调用。

            suspend fun <T, R> Iterable<T>.processInParallel(
                dispatcher: CoroutineDispatcher = Dispatchers.IO,
                processBlock: suspend (v: T) -> R,
            ): List<R> = coroutineScope { // or supervisorScope
                map {
                    async(dispatcher) { processBlock(it) }
                }.awaitAll()
            }
            

            协程调用示例:

            val collection = listOf("A", "B", "C", "D", "E")
            
            someCoroutineScope.launch {
                val results = collection.processInParallel {
                    process(it)
                }
                // use processing results
            }
            

          其中someCoroutineScopeCoroutineScope 的一个实例。

          1. CoroutineScope 上启动并忘记扩展功能,它不会返回任何结果。它还默认使用Dispatchers.IO 调度程序。可以使用CoroutineScope 或从另一个协程调用。

            fun <T> CoroutineScope.processInParallelAndForget(
                iterable: Iterable<T>,
                dispatcher: CoroutineDispatcher = Dispatchers.IO,
                processBlock: suspend (v: T) -> Unit
            ) = iterable.forEach {
                launch(dispatcher) { processBlock(it) }
            }
            

            调用示例:

            someoroutineScope.processInParallelAndForget(collection) {
                process(it)
            }
            
            // OR from another coroutine:
            
            someCoroutineScope.launch {
                processInParallelAndForget(collection) {
                    process(it)
                }
            }
            

          2a。在Iterable&lt;T&gt; 上启动并忘记扩展功能。和之前的差不多,只是扩展类型不同。 CoroutineScope 必须作为参数传递给函数。

          fun <T> Iterable<T>.processInParallelAndForget(
              scope: CoroutineScope,
              dispatcher: CoroutineDispatcher = Dispatchers.IO,
              processBlock: suspend (v: T) -> Unit
          ) = forEach {
              scope.launch(dispatcher) { processBlock(it) }
          }
          

          调用:

          collection.processInParallelAndForget(someCoroutineScope) {
              process(it)
          }
          
          // OR from another coroutine:
          
          someScope.launch {
              collection.processInParallelAndForget(this) {
                  process(it)
              }
          }
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2017-02-17
            • 1970-01-01
            • 2011-12-01
            • 2020-07-08
            • 2023-02-08
            • 2019-01-27
            • 1970-01-01
            相关资源
            最近更新 更多