【问题标题】:Run while loop in parallel并行运行while循环
【发布时间】:2020-05-11 12:10:45
【问题描述】:

我有一个大集合(+90 000 个对象),我想在其上并行运行 while 循环,我的函数源如下

val context = newSingleThreadAsyncContext()
        return KtxAsync.async(context)  {
            val fields = regularMazeService.generateFields(colsNo, rowsNo)

        val time = measureTimeMillis {
            withContext(newAsyncContext(10)) {
                while (availableFieldsWrappers.isNotEmpty()) {

                    val wrapper = getFirstShuffled(availableFieldsWrappers.lastIndex)
                            .let { availableFieldsWrappers[it] }

                    if (wrapper.neighborsIndexes.isEmpty()) {
                        availableFieldsWrappers.remove(wrapper)
                        continue
                    }

                    val nextFieldIndex = getFirstShuffled(wrapper.neighborsIndexes.lastIndex)
                            .let {
                                val fieldIndex = wrapper.neighborsIndexes[it]
                                wrapper.neighborsIndexes.removeAt(it)
                                fieldIndex
                            }

                    if (visitedFieldsIndexes.contains(nextFieldIndex)) {
                        wrapper.neighborsIndexes.remove(nextFieldIndex)
                        fields[nextFieldIndex].neighborFieldsIndexes.remove(wrapper.index)
                        continue
                    }

                    val nextField = fields[nextFieldIndex]
                    availableFieldsWrappers.add(FieldWrapper(nextField, nextFieldIndex))
                    visitedFieldsIndexes.add(nextFieldIndex)

                    wrapper.field.removeNeighborWall(nextFieldIndex)
                    nextField.removeNeighborWall(wrapper.index)
                }
            }
        }
        Gdx.app.log("maze-time", "$time")

一流的

private val availableFieldsWrappers = Collections.synchronizedList(mutableListOf<FieldWrapper>())
private val visitedFieldsIndexes = Collections.synchronizedList(mutableListOf<Int>())

我测试了几次,结果如下:

  • 1 个线程 - 21213 毫秒
  • 5 个线程 - 27894 毫秒
  • 10 个线程 - 21494 毫秒
  • 15 个线程 - 20986 毫秒

我做错了什么?

【问题讨论】:

    标签: libktx


    【解决方案1】:
    1. 您正在使用 Java 标准库中的 Collections.synchronizedList,它返回一个列表包装器,该包装器利用阻塞 synchronized 机制来确保线程安全。这种机制与协程不兼容,因为它会阻止其他线程访问集合,直到操作完成。从多个协程访问数据或使用non-blocking mutex 保护共享数据时,您通常应该使用非阻塞并发集合。

    2. List.contains 将随着越来越多的元素添加而变得越来越慢 (O(n))。您应该使用visitedFieldsIndexes 的集合而不是列表。只需确保使用互斥锁保护它或使用并发变体即可。同样,从availableFieldsWrappers 中删除具有随机索引的值的成本也相当高 - 相反,您可以对列表进行一次洗牌并使用简单的迭代。

    3. 您没有重用协程上下文。通常,您可以创建一次异步上下文并重用其实例,而不是每次需要协程时都创建新的线程池。您应该只调用并分配一次 newAsyncContext(10) 的结果,然后在整个应用程序中重复使用它。

    4. 您当前编写的代码没有很好地利用协程。不要将协程调度程序视为一个线程池,您可以在其中并行启动 N 个大任务(即您的 while availableFieldsWrappers.isNotEmpty 循环),您应该将其视为数百或数千个小任务的执行器,并相应地调整您的代码。我认为您可以通过引入例如重写代码来完全避免 available/visited 集合。 Kotlin flows 或只是处理较小部分逻辑的多个 KtxAsync.async/KtxAsync.launch 调用。

    5. 除非某些函数处于挂起状态或在下面使用协程,否则您根本没有真正利用异步上下文的多个线程。 withContext(newAsyncContext(10)) 启动一个协程,它按顺序处理整个逻辑,只利用一个线程。有关如何重写代码的一些想法,请参见 4。尝试收集(或仅打印)线程哈希和名称,看看您是否很好地使用了所有线程。

    【讨论】:

      猜你喜欢
      • 2017-01-12
      • 2014-12-18
      • 2021-12-23
      • 2015-07-26
      • 2021-05-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多