【问题标题】:Threadpool unable to create native thread线程池无法创建本机线程
【发布时间】:2020-07-19 15:47:27
【问题描述】:

我正在尝试以各种方式实现归并排序。我确实用fork/join 实现了它,现在我想用ExecutorServiceThreadpool 实现它。

而我目前的情况如下:

public class Test<T> implements Comparable<T> {

private final ThreadPoolExecutor executor;

    public Test() {
        //having this kind of threadpool prevent the program from generating any output whatsoever.
        //executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
   }


private void divide(LinkedList<T> dataToBeSorted) {
    if (dataToBeSorted.size() < 2) {
        return;
    }

    int mid = dataToBeSorted.size() / 2;
    LinkedList<T> left = new LinkedList<>(dataToBeSorted.subList(0, mid));
    LinkedList<T> right = new LinkedList<>(dataToBeSorted.subList(mid, dataToBeSorted.size()));

    Future<?> f1= (executor.submit(() -> divide(left)));
    Future<?> f2= (executor.submit(() -> divide(right)));

    try {

        f1.get();
        f2.get();

    } catch (InterruptedException | ExecutionException e) {
        System.out.println("something went wrong went to sequential merge sort!");
        new SeqMergeSorter<T>().sort(dataToBeSorted);
        return;
    }

    merge(left, right, dataToBeSorted);
}



public static<T> void sort(LinkedList<T> dataToBeSorted){
    if (dataToBeSorted.size() < 2)
        return;
    var temp=new Test<T>();
    temp.divide(dataToBeSorted);
    temp.executor.shutdownNow();
}

编辑部分:

以上代码已更新。

  • 我添加了future,并尝试以单独的方法将其关闭。 现在我得到一个排序的元素,但只要我的设备可以产生线程。

  • 由于newCachedThreadPool 产生无限线程并在60s 之后取消它们,我得到以下输出:

    [1.427s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

  • 当将newCachedThreadPool 更改为newFixedThreadPool(Runtime.getRuntime().availableProcessors())i 时将没有任何输出!

【问题讨论】:

  • executor.execute() 不等待给定任务完成。它将在未来“某个时间”执行,甚至可能在不同的线程上执行。所以你的 merge() 方法可能还没有正确的值。
  • edit您的问题将您拥有的源代码包含为minimal reproducible example,其他人可以编译和测试。

标签: java executorservice threadpoolexecutor


【解决方案1】:

您的解决方案是多线程并创建大量中间数据结构,请理解它很可能比更优化的单线程实现慢得多。特别是最终访问的是内存,并且由于合并排序的处理方式,事物被移动到任何地方。基本上,除非比较成本巨大,否则该算法将受到主内存性能/缓存而不是 CPU 的限制。

但从技术上讲,您的实现不是线程安全的,并且不等待子任务完成。

因此,通常 var left 和 right 稍后可能会被另一个线程排序,但会立即合并而不等待单个排序结果。

你需要merge来等待left和right首先被排序,然后才合并结果。

如果您不等待,将在非线程安全的数据结构上同时执行,并且程序将出现错误的行为。

【讨论】:

  • 可能的实现是无穷无尽的,真的。不少意味着重新实现 fork/join 框架。基本上,合并可以在未来调用 2 get 并在可用时合并。那将是最简单的一个。结果是您想要的任何内容,例如排序后的链接或显示“我完成”的布尔值。这可能只是一个标志。
  • 你不应该关闭你的执行器,因为“排序”函数是递归调用的,你应该让任务按预期完成。
  • 对于内存,问题是如果你 Executor.newCachedThreadPool() 你会根据需要创建线程,所以最多 n * Log(n) 线程,这样就超出了系统可以处理和是的,你得到的错误是正常的。这意味着当您急切地启动线程时,它不会与 get() 一起正常工作。更好的方法是安排每层的任务,例如在时间 2 乘以 2 排序 2 个连续值,然后上层合并 2x2 以获得排序的 4 个连续值,依此类推。
  • 这样你可以用你的池中可用线程的数量来调度一个层的所有任务,等待所有的任务完成,然后继续下一层......但这变成了越来越复杂。 Get 是每个线程等待其依赖项准备就绪的简单解决方案。以这种方式(可能是 fork/join 的方式),您首先执行依赖项,然后只有当它们都可用时才执行下一步。这允许您只使用几个线程。
  • 使用固定数量的线程,在您当前的实现中,线程正在等待永远不会运行的任务,因为没有更多线程可用于运行它们。因此问题
【解决方案2】:

我最终做了以下事情:

  • 我添加了未来,并尝试以单独的方法将其关闭。现在我得到一个排序的元素,但只要我的设备可以产生线程。

  • 我使用了newWorkStealingPool,通过使用工作窃取概念,我可以对大数据进行排序而不会遇到内存问题。

我的解决方案是这样的:

public class ExecutorMergeSorter<T> implements Comparable<T> {

private final ExecutorService excr;

public ExecutorMergeSorter() {
    excr = Executors.newWorkStealingPool();
}

private void divide(LinkedList<T> dataToBeSorted) {
    if (dataToBeSorted.size() < 2) {
        return;
    }

    int mid = dataToBeSorted.size() / 2;
    LinkedList<T> left = new LinkedList<>(dataToBeSorted.subList(0, mid));
    LinkedList<T> right = new LinkedList<>(dataToBeSorted.subList(mid, dataToBeSorted.size()));

    Future<?> f1= (excr.submit(() -> divide(left)));
    Future<?> f2= (excr.submit(() -> divide(right)));

    try {

        f1.get();
        f2.get();

    } catch (InterruptedException | ExecutionException e) {
        new SeqMergeSorter<T>().sort(dataToBeSorted);
    }

    merge(left, right, dataToBeSorted);
}



public static<T> void sort(LinkedList<T> dataToBeSorted){
    if (dataToBeSorted.size() < 2)
        return;
    var temp=new ExecutorMergeSorter<T>();
    temp.divide(dataToBeSorted);
    

【讨论】:

    猜你喜欢
    • 2023-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-06
    • 2023-03-04
    • 2015-03-13
    • 1970-01-01
    相关资源
    最近更新 更多