【问题标题】:Managing resource in Java multithreading在 Java 多线程中管理资源
【发布时间】:2021-06-26 06:47:00
【问题描述】:

假设:

  1. 有 10 个任务
  2. 有 3 个线程
  3. 有 3 个数组列表
  4. 每个线程都与 1 个 arrayList 相关

我的问题是:如何管理资源?例如,第一步 3 任务将使用 3 个线程和 3 个 arrayLists 执行。然后只要有可用的线程,第 4 个任务将与可用的 arrayList 一起执行。如何在 Java 中做到这一点?请给我参考或源代码。

public class NewClass {

    private Queue<Integer> availableCore = new LinkedList();

    public void manage() throws InterruptedException {
        ArrayList<ArrayList<Integer>> data = new ArrayList<>();
        data.add(new ArrayList<>());
        data.add(new ArrayList<>());
        data.add(new ArrayList<>());

        availableCore.add(0);
        availableCore.add(1);
        availableCore.add(2);

        ArrayList<Calculate> allThreads = new ArrayList<>();
        ExecutorService es = Executors.newWorkStealingPool(3);
        int threadCount = -1;

        int numOfTask = 10;

        while (numOfTask > 0) {
            if (!availableCore.isEmpty()) {
                System.out.println(availableCore.element());
                threadCount++;
                int core = availableCore.remove();
                allThreads.add(new Calculate(data.get(core), numOfTask, core));
                es.execute(allThreads.get(threadCount));
                numOfTask--;
            }
        }

        es.shutdown();
        boolean finished = es.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Done\n");

        for (int i = 0; i < data.size(); i++) {
            System.out.println(data.get(i));
        }
    }

    public class Calculate extends Thread {

        private ArrayList<Integer> data;
        private int start;
        private int core;

        public Calculate(ArrayList<Integer> data, int start, int core) {
            this.data = data;
            this.start = start;
            this.core = core;
        }

        @Override
        public void run() {
            this.data.add(start);
            availableCore.add(this.core);
        }

        public int getCore() {
            return core;
        }
    }
}

上面的这段代码代表了我的实际问题。当我尝试运行几次时,有时会在“availableCore.element()”中出现错误。它说availableCore是空的,但我做了一个条件,确保它不为空。

if (!availableCore.isEmpty()) {
            System.out.println(availableCore.element());

【问题讨论】:

    标签: java multithreading concurrency parallel-processing load-balancing


    【解决方案1】:

    我的问题是:如何管理资源?例如,第一步 将使用 4 个核心和 4 个 arrayList 执行 4 个任务。然后每当一个 core 可用,第 5 个任务将与 available 一起执行 数组列表。如何在 Java 中做到这一点?请给我参考或来源 代码。

    AFAIK,Java 本身没有将给定线程显式映射到给定核心的功能(线程亲和性)。您可以通过显式系统调用来完成此操作,例如在 Linux 操作系统中使用 taskset 命令。但这需要你做很多工作,并且可能会因此而失去便携性。

    您最好使用更高级别的 Java 抽象(例如,ExecutorService)来为您处理那些低级别的细节。使用ExecutorService 将任务发送到线程池,让池和最终操作系统线程调度程序处理线程/任务之间的映射。

    顺便说一句,即使使用 Java 线程池,也不能保证线程会很好地映射到单独的内核。

    其他语言(例如 C/C++)可能会为您提供库,允许您将内核显式映射到线程,例如使用 OpenMP thread affinity features


    根据您的评论:

    如果我只将任务发送到线程池而不管理 arrayList,会消耗大量内存。假设 1 有 100 个任务 那么它将需要100个arrayLists ..我想减少内存 每次执行任务时仅使用 4 个要重用的 arrayList 来消耗 完成。

    听起来你想实现producer-consumer pattern。一个线程将负责向队列添加工作(即ArrayLists),其他线程将从队列中请求工作并使用它。

    【讨论】:

    • 这是工作的生产者和消费者模式。非常感谢@dreamcrash。
    【解决方案2】:

    正如@dreamcrash 所说,我们无法将线程映射到核心。但是,您可以使用ExecutorService 来完成您的任务。在这种情况下,您将为每个任务使用线程,您的代码将如下所示:

    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Cores {
    
        private static ExecutorService service = Executors.newFixedThreadPool(4);
    
        public static void main(String[] args) {
            List<String> first = List.of("1", "2", "3");
            List<String> second = List.of("1", "2", "3");
            List<String> third = List.of("1", "2", "3");
            List<String> fourth = List.of("1", "2", "3");
            List<String> fifth = List.of("1", "2", "3");
    
            sendToExecutor(first, second, third, fourth, fifth);
    
            service.shutdown();
        }
    
        @SafeVarargs
        public static void sendToExecutor(List<String>... lists) {
            for (List<String> list : lists) {
                service.execute(() -> handleWholeList(list));
            }
        }
    
        public static void handleWholeList(List<String> strings) {
            strings.forEach(s -> System.out.printf("String %s handled in thread %s%n", s, Thread.currentThread().getName()));
        }
    }
    

    而不是硬编码的 4 个线程:

    private static ExecutorService service = Executors.newFixedThreadPool(4);
    

    您可以使用可用处理器的数量:

    private static ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-03-11
      • 2023-03-18
      • 2017-11-08
      • 2019-10-12
      • 2010-09-08
      相关资源
      最近更新 更多