此解决方案假定您的项目正在使用协程:
implementation( "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")
名为parallelTransform 的函数不保留元素的顺序并返回Flow<R>,而函数parallelMap 保留元素的顺序并返回List<R>。
为多个调用创建一个线程池:
val numberOfCores = Runtime.getRuntime().availableProcessors()
val executorDispatcher: ExecutorCoroutineDispatcher =
Executors.newFixedThreadPool(numberOfCores ).asCoroutineDispatcher()
使用该调度程序(并在不再需要时调用close()):
inline fun <T, R> Iterable<T>.parallelTransform(
dispatcher: ExecutorDispatcher,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
launch(dispatcher) {
items.forEach {item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
如果线程池重用不重要(线程池并不便宜),您可以使用这个版本:
inline fun <T, R> Iterable<T>.parallelTransform(
numberOfThreads: Int,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
launch( dispatcher ) {
items.forEach { item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
}
如果您需要保留元素顺序的版本:
inline fun <T, R> Iterable<T>.parallelMap(
dispatcher: ExecutorDispatcher,
crossinline transform: (T) -> R
): List<R> = runBlocking {
val items: Iterable<T> = this@parallelMap
val result = ConcurrentSkipListMap<Int, R>()
launch(dispatcher) {
items.withIndex().forEach {(index, item) ->
launch {
result[index] = transform(item)
}
}
}
// ConcurrentSkipListMap is a SortedMap
// so the values will be in the right order
result.values.toList()
}