【问题标题】:Parallelizing recursive function calls on each element of an array (with ForkJoinPool)对数组的每个元素进行并行递归函数调用(使用 ForkJoinPool)
【发布时间】:2014-06-18 21:42:16
【问题描述】:

考虑以下场景:

public void processArray(int a, SomeType array) {
    for (int i = 0; i < array.length; ++i) {
        recFunction(a, array[i]);
    }
}
private void recFunction(int a, SomeType element){
     ...
     recFunction(a + x, element);
     recFunction(a + y, element);
     ...
}

processArray 对数组的每个元素调用 recFunction。如何使用 Java 创建该程序的并行版本?请注意,要处理的数组可能非常大(最多 10,000 个元素)。我的第一个想法是使用 ForkJoinPool,但我绝对不能为每个数组元素创建一个 RecursiveTask,因为那样会创建 10,000 个新对象。

另外,我必须处理不同的数组,因此必须调用 processArray 几次。我想避免每次都创建新线程并使用现有线程。有什么想法可以使用 Executor 或 ForkJoinPool 来实现吗?

【问题讨论】:

  • 是否可以拆分您的阵列。例如创建 4 个线程,每个线程计算数组的一半?或者这些元素是相互依赖的?将这项任务并行化真的值得吗?此外,我认为 ForkJoinTask 与 ForkJoinPool 的规模相当大。一个任务会创建多少个新任务? (也许发布完整的问题?)
  • 10k 数组太小而无法并行化,10m 是一个候选。然后,按照 JavaDoc 中的示例分解数组。当达到阈值时,您在大数组的一部分上调用 ProccessArray() 并将结果连接到链上。
  • @edharned:如果一个元素的处理需要很长时间,那么并行化可能是值得的。
  • @OP 为真。然后按照 JavaDoc 中的分解示例或 StackOverflow 上的 Many, Many 示例进行操作。
  • 创建 10k RecursiveTask 对象的开销非常低,但是,其他 cmets 是有效的:您不会只使用单个元素(也就是说,无论如何您可能会以 1000 个任务结束,每个处理 10 个元素左右),以及函数是递归的事实是否是使用 fork join 池的原因,目前还不清楚。

标签: java arrays multithreading recursion concurrency


【解决方案1】:

这是一个快速排序示例,无论如何都不是最佳的,但应该为您提供与递归版本进行比较的基础。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ParallelSort {
    // default, Creates a ForkJoinPool with parallelism equal to Runtime.availableProcessors()
    private ForkJoinPool pool = new ForkJoinPool();

    public void sort(int[] elements) {
        pool.invoke(new SortAction(elements, 0, elements.length-1));
    }

    public class SortAction extends RecursiveAction {
        private static final long serialVersionUID = 3060427288506544983L;

        final int[] elements;
        int low;
        int high;

        SortAction(int[] elements, int low, int high) {
            this.elements = elements;
            this.low = low;
            this.high = high;
        }

        protected void compute() {
            if (low < high) {
                int pivotIndex = (low + high) >>> 1;
                pivotIndex = partition(pivotIndex);

                invokeAll(new SortAction(elements, low, pivotIndex - 1),
                          new SortAction(elements, pivotIndex + 1, high));
            }
        }

        private int partition(int pivotIndex) {
            int pivotValue = elements[pivotIndex];

            swap(elements, pivotIndex, high);
            int storeIndex = low;

            for (int i = low; i < high; i++) {
                if (elements[i] < pivotValue) {
                    swap(elements, i, storeIndex);
                    storeIndex++;
                }
            }

            swap(elements, storeIndex, high);
            return storeIndex;
        }

        private void swap(int[] elements, int i, int j) {
            if (i != j) {
                int temp = elements[i];
                elements[i] = elements[j];
                elements[j] = temp;
            }
        }
    }
}

一些快速说明:

  • 您不需要在排序类中声明 ForkJoinPool,我只是在这里做了,所以调用者不必考虑池。

  • 如果分区大小很小,这种快速排序会回退到串行排序,效果会更好。我没有在这里打扰。

这在数百万个元素上运行没有问题。

【讨论】:

  • 你真的应该提到上面的 SOURCE 以及指向它的链接,它似乎是 ricardozuasti.com/2012/…
  • @javadba 快速排序在所有算法书籍中几乎相同(ForkJoinPool 示例也是如此)。另外请注意,该站点的示例在 List 上排序,如果您将 LinkedList 传递给它,这将非常糟糕。
  • 而且,如果你想学究气,这些同样具有影响力:docs.oracle.com/javase/tutorial/essential/concurrency/…en.wikipedia.org/wiki/Quicksort(你的链接的作者显然遵循后者的伪代码,顺便说一句,一直到“storeIndex”名称)
猜你喜欢
  • 2023-04-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-14
  • 2018-07-11
  • 2018-12-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多