【问题标题】:How to multi-thread my sequential Java code如何多线程我的顺序 Java 代码
【发布时间】:2019-03-13 23:25:22
【问题描述】:

我有一个 Java 程序,它给出一个列表,对每个列表项执行一些 独立 处理(包括从一些 HTTP 资源中检索文本并将它们插入到一个独立的 HashMap 中),最后计算一些这些 HashMap 上的数字。主要的sn-p是这样的:

    for (int i = 0; i < mylist.size(); i++) {
        long startepoch = getTime(mylist.get(i).time);
        MyItem m = mylist.get(i);
        String index=(i+1)+"";

        process1(index, m.name, startepoch, m.duration);
        //adds to hashmap1

        if(m.name.equals("TEST")) {
            process2(index, m.name, startepoch, m.duration);
        //adds to hashmap2

        } else {
            process3(index, m.name, startepoch, m.duration);
        //adds to hashmap3
            process4(index, m.name, startepoch, m.duration);
        //adds to hashmap4
            process5(index, m.name, startepoch, m.duration);
        //adds to hashmap5
            process6(index, m.name, startepoch, m.duration);
        //adds to hashmap6
        }
    }

    // then start calculation on all hashmaps
    calculate_all();

由于目前此 sn-p 是按顺序执行的,因此对于 500 个项目的列表,这可能需要 30 分钟左右。如何多线程我的代码以使其更快?并以线程安全的方式?

我试图使用ExecutorService executorService = Executors.newFixedThreadPool(10);,然后将每个单独的进程提交到executorService,如下所示,但问题是我不知道他们什么时候结束,所以打电话给calculate_all()。所以我没有继续。

            executorService.submit(new Runnable() {
                public void run() {
                    process2(index, m.name, startepoch, m.duration);
                }
            });

有更好的工作想法吗?

【问题讨论】:

    标签: java multithreading thread-safety executorservice


    【解决方案1】:

    但问题是我不知道他们什么时候完成

    当您向 Executor 提交内容时,您会收到带有结果(如果有)的 Future

    然后您可以从您的主线程调用Future::get 来等待这些结果(或者在您的情况下只是完成)。

    List<Future<?>> completions = executor.invokeAll(tasks);
    
    // later, when you need to wait for completion
    for(Future<?> c: completions) c.get();
    

    您需要注意的另一件事是如何存储结果。如果您打算让您的任务将它们放入某个共享数据结构中,请确保使该线程安全。从Runnable 更改为Callable 可能更容易,以便任务可以返回结果(稍后您可以在主线程上以单线程方式合并)。

    【讨论】:

    • 谢谢。你能在你的回答中补充我的 sn-p 吗? tasks 是什么? ? 是什么?
    • 顺便说一句,没有共享数据存储。每个进程都有独立的hashmap。
    • tasks 是您的 Runnables(在 List 或其他集合中)。在您的示例中,您循环提交它们,invokeAll 可用于一次提交它们。结果是一样的。无论哪种方式,您都会从执行者那里为每个提交的任务返回一个Future&lt;?&gt;,并且在您将它们全部踢掉之后,您希望等待它们全部完成(? 将是Future 的结果类型,但是带有 Runnable,没有返回结果。这不是伪代码,Future&lt;?&gt; 是有效的 Java 语法)。
    • 好的,谢谢。你只需用你的修改更新我上面的 sn-p 就好了。对所有这些都非常陌生。
    【解决方案2】:

    请注意,多线程不一定会提高速度。多线程主要用于通过防止不必要的睡眠等来减少空闲 CPU 周期。

    对于您提供的内容,我无能为力,但是,我认为您可以从以下操作开始:

    1. 使用线程安全的数据结构。这是必须的。如果你错过了这个 一步,你的软件最终会崩溃。你会有一个 很难查明原因。 (例如,如果你有一个 ArrayList, 使用线程安全的)
    2. 您可以通过删除 for 循环开始尝试多线程 而是为每次执行使用一个线程。如果你的 for 循环 大小超过线程的数量,你将拥有 将它们排入队列。
    3. 您的最终计算需要所有其他线程 结束。您可以使用 CountDownLatch、wait()/notifyAll() 或 synchronized() 取决于您的实现。
    4. 执行最终计算。

    编辑

    响应(2):

    你当前的执行是这样的:

    for (int i = 0; i < mylist.size(); i++) {
        some_processes();
    }
    
    // then start calculation on all hashmaps
    calculate_all();
    

    现在,要删除“for”循环,您可以首先从增加“for”循环开始。例如:

    // Assuming mylist.size() is around 500 and you want, say 5, hardcoded multi-thrads
    Thread_1:
    for (int i = 0; i < 100; i++) {
        some_processes();
    }
    Thread_2:
    for (int i = 100; i < 200; i++) {
        some_processes();
    }
    Thread_3:
    for (int i = 200; i < 300; i++) {
        some_processes();
    }
    Thread_4:
    for (int i = 300; i < 400; i++) {
        some_processes();
    }
    Thread_5:
    for (int i = 400; i < mylist.size(); i++) {
        some_processes();
    }
    // Now you can use these threads as such:
    CountDownLatch latch = new CountDownLatch(5);
    ExecutorService executor = Executors.newFixedThreadPool(5);
    executor.submit(new Thread1(latch));
    executor.submit(new Thread2(latch));
    executor.submit(new Thread3(latch));
    executor.submit(new Thread4(latch));
    executor.submit(new Thread5(latch));
    try {
        latch.await();  // wait until latch counted down to 0
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    // then start calculation on all hashmaps
    calculate_all();
    

    如您所见,这种方法有几个缺点。例如,如果列表大小变为 380 怎么办?然后你有一个空闲线程。另外,如果你想要超过 5 个线程怎么办?

    所以此时,您可以通过使“for”循环越来越少来进一步增加“for”循环的数量。最多,“for loop count”==“thread count”,有效地删除你的for循环。所以从技术上讲,你需要“mylist.size()”数量的线程。您可以这样实现:

    // Allow a maximum amount of threads, say mylist.size(). I used LinkedBlockingDeque here because you might choose something lower than mylist.size().
    BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(mylist.size());
    CountDownLatch latch = new CountDownLatch(mylist.size());
    
    new Thread(new add_some_processes_w_single_loop_for_loop_to_queue(queue, latch)).start();
    new Thread(new take_finished_processes_from_queue(queue)).start();
    try {
        latch.await();  // wait until latch counted down to 0
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    // then start calculation on all hashmaps
    calculate_all();
    

    请注意,通过这种安排,我们已经删除了您最初的“for”循环,而是创建了另一个仅在队列清空时提交新线程的循环。您可以使用生产者和消费者应用程序检查 BlockingQueue 示例。例如参见:BlockingQueue examples

    编辑 2

    Future 的简单实现可能如下所示:

    ExecutorService executorService = Executors.newCachedThreadPool();  
    Future future1, future2, future3, future4, future5, future6;  
    
    for (int i = 0; i < mylist.size(); i++) {
        long startepoch = getTime(mylist.get(i).time);
        MyItem m = mylist.get(i);
        String index=(i+1)+"";
    
        future1 = executorService.submit(new Callable() {...})
        //adds to hashmap1
    
        future1.get(); // Add this if you need to wait for process1 to finish before moving on to others. Also, add a try{}catch{} block as shown below.
    
        if(m.name.equals("TEST")) {
            future2 = executorService.submit(new Callable() {...})
        //adds to hashmap2
    
            future2.get(); // Add this if you need to wait for process2 to finish before moving on to others. Also, add a try{}catch{} block as shown below.
    
        } else {
            future3 = executorService.submit(new Callable() {...})
        //adds to hashmap3
            future4 = executorService.submit(new Callable() {...})
        //adds to hashmap4
            future5 = executorService.submit(new Callable() {...})
        //adds to hashmap5
            future6 = executorService.submit(new Callable() {...})
        //adds to hashmap6
    
             // Add extra future.get here as above...
        }
    }
    
    // then start calculation on all hashmaps
    calculate_all();
    

    不要忘记添加 try-catch 块,否则您可能无法从异常和崩溃中恢复。

    // Example try-catch block surrounding a Future.get().
    try {
        Object result = future.get();       
    } catch (ExecutionException e) {
        //Do something
    } catch (InterruptedException e) {
        //Do something
    }
    

    但是,您可以有一个更复杂的,如here 所示。该链接也解释了 Thilo 的回答。

    【讨论】:

    • 并行化 HTTP 请求是多线程有意义的完美示例。
    • 我同意这一点。
    • 就我的 HTTP 请求而言,它绝对有帮助。而且我没有所说的共享存储。我喜欢你的#2。您能详细说明如何删除 for 循环吗?请补充我的 sn-p。
    • 总体上看起来像是一个可行的解决方案。但它增加了很多复杂性。如果您使用Futute 发布答案会很好吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-18
    相关资源
    最近更新 更多