【问题标题】:Thread implementation a for loop iteration with lists线程实现一个带有列表的for循环迭代
【发布时间】:2017-07-20 07:36:22
【问题描述】:

我有一个简单的代码如下。这将检查服务器列表的活动状态。 请告诉我如何使用线程或任何其他合适的解决方案并行完成。

        List<Host> hosts = this.getAllHosts();
        List<Host> aliveHosts = new ArrayList<>();
        if (hosts != null && hosts.size() > 0) {
            for (Host host : hosts) {
                try {
                    if(InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                        aliveHosts.add(host);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return aliveHosts;

如何在线程中调用每个getByName 并同时并行执行。目前,他们每个人的超时时间为 3 秒。如果有 10 个项目,则总时间为 30 秒。任何人都可以提供一个解决方案,以便总体上可以在 3-8 秒内完成。

【问题讨论】:

  • 为每个Host创建一个新的Runnable类,设置Host参数并启动它
  • 使用BlockingQueue作为共享资源,不为空时循环
  • 谁能举个例子。

标签: java multithreading list arraylist thread-safety


【解决方案1】:

使用 Java 8 流:

List<Host> aliveHosts = hosts.stream()
                             .parallel()
                             .filter(h -> {
                                            try {
                                              return InetAddress.getByName(h.getIpaddress()).isReachable(TIMEOUT)
                                            } catch(Exception e) {
                                              return false;
                                            }
                                          })
                             .collect(Collectors.toList());

【讨论】:

  • 感谢您的代码。在您知道的任何情况下,是否存在与此代码相关的任何缺点或问题。
  • 不,我看不出它有什么问题,除了你不能轻易控制并发级别。
  • @Jean-BaptisteYunès 实际上,您可以轻松控制并发级别。请参阅我对您的回答的补充。
  • 你能告诉我有 30 台主机总共需要多少时间
  • 未知变量太多...并行度、网络延迟等。做实验!
【解决方案2】:

让我们考虑这个线程示例:

public class SimpleThreads {

    // Display a message, preceded by
    // the name of the current thread
    static void threadMessage(String message) {
        String threadName =
            Thread.currentThread().getName();
        System.out.format("%s: %s%n",
                          threadName,
                          message);
    }

    private static class MessageLoop
        implements Runnable {
        public void run() {
            String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
            };
            try {
                for (int i = 0;
                     i < importantInfo.length;
                     i++) {
                    // Pause for 4 seconds
                    Thread.sleep(4000);
                    // Print a message
                    threadMessage(importantInfo[i]);
                }
            } catch (InterruptedException e) {
                threadMessage("I wasn't done!");
            }
        }
    }

    public static void main(String args[])
        throws InterruptedException {

        // Delay, in milliseconds before
        // we interrupt MessageLoop
        // thread (default one hour).
        long patience = 1000 * 60 * 60;

        // If command line argument
        // present, gives patience
        // in seconds.
        if (args.length > 0) {
            try {
                patience = Long.parseLong(args[0]) * 1000;
            } catch (NumberFormatException e) {
                System.err.println("Argument must be an integer.");
                System.exit(1);
            }
        }

        threadMessage("Starting MessageLoop thread");
        long startTime = System.currentTimeMillis();
        Thread t = new Thread(new MessageLoop());
        t.start();

        threadMessage("Waiting for MessageLoop thread to finish");
        // loop until MessageLoop
        // thread exits
        while (t.isAlive()) {
            threadMessage("Still waiting...");
            // Wait maximum of 1 second
            // for MessageLoop thread
            // to finish.
            t.join(1000);
            if (((System.currentTimeMillis() - startTime) > patience)
                  && t.isAlive()) {
                threadMessage("Tired of waiting!");
                t.interrupt();
                // Shouldn't be long now
                // -- wait indefinitely
                t.join();
            }
        }
        threadMessage("Finally!");
    }
}

Source

本质上,您需要一个Runnable,它负责您的线程的工作方式。您将需要实例化一个线程,传递您拥有的Runnable 的实例,然后传递start 您的Thread。您将需要访问所有线程并Join 它们。你可以easily manage the timeout limits as well

【讨论】:

  • 您如何看待 Java8 流方法?
  • @ShamnadPS 这是一个现代解决方案,如果您使用的是 Java 8,它应该可以工作,但对于早期版本的 Java,它可能不兼容。我相信这是针对理想技术情况的特殊解决方案,但是如果您拥有 Java 7 并且出于正当理由无法对其进行更新,无论是什么原因,涉及 Java 8 的解决方案都将不起作用。我特意从教程中给你一个通用的解决方案,因为你可以通过阅读整个教程来获得更深入的理解,并且无论你的 Java 版本如何,它都可以工作。
【解决方案3】:

非 Java 8 方式看起来类似:

List<Host> hosts = this.getAllHosts();

    Queue<Host> q = new ArrayBlockingQueue<>(hosts.size(), true, hosts);
    ExecutorService ex = Executors.newFixedThreadPool(5);
    List<Host> aliveHosts = Collections.synchronizedList(new ArrayList<>());

    while(!q.isEmpty()){
        ex.submit(new Runnable() {
            @Override
            public void run() {
                Host host = q.poll();
                try {
                    if(InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                        aliveHosts.add(host);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        });
    }
    ex.shutdown();
}

【讨论】:

  • 与 Java8 方式相比,这种方式有什么优势吗?
  • 我不确定 .parallel() 是如何在 Java 8 中实现的,但是正如你所看到的,我没有为每个主机创建新的 Runnable 类,因为如果你有例如 100000 个主机,这个将是劣势。
  • 但是如果你的代码运行在Java 8上。那我觉得用Java 8的方式比较合适
【解决方案4】:

Java 8 和ExecutorService

List<Host> hosts = this.getAllHosts();
List<Host> aliveHosts = Collections.synchronizedList(new ArrayList<Host>());
ExecutorService executorService = Executors.newFixedThreadPool(10);
if (hosts != null && hosts.size() > 0) {
    for (Host host : hosts) {
        executorService.submit(() -> {
            try {
                if (InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                    aliveHosts.add(host);
                }
            } catch (IOException e) {
                // logger?
            }
        });
    }
}
executorService.shutdown();
return aliveHosts;

【讨论】:

  • 与 Java 8 并行流相比,使用 ExecutorService 有什么优势。请告诉我。
  • 除了控制并发级别之外,本身没有任何优势。 Java8 流只是为了让您编写基本逻辑而忘记运行时技巧的细节。
【解决方案5】:

除了公认的 Java8 答案之外,您实际上可以通过使用自定义 ForkJoinPool 轻松控制并发级别:

final Predicate<Host> isAlive = h -> {
    try {
        return InetAddress.getByName(h.getIpaddress()).isReachable(TIMEOUT);
    } catch (Exception e) {
        return false;
    }
};
final Callable<List<Host>> collectAliveHosts = () ->
    hosts.stream().parallel().filter(isAlive).collect(Collectors.toList());

final ForkJoinPool threadPool = new ForkJoinPool(4);
final List<Host> aliveHosts = threadPool.submit(collectAliveHosts).get();

如果您不使用自定义池,则将使用常见的ForkJoinPool,它的大小取决于您当前机器的内核/CPU 数量。然而,这个池被整个 JVM 使用。也就是说,如果您将长时间运行的任务提交到公共池,整个应用程序可能会遭受一些性能下降。

【讨论】:

  • 感谢您的回答。你能告诉我为什么你为 ForkJoinPool 指定了 4 号。可以是主机列表的大小吗?
  • 示例中池的大小 4 只是随机取的。您可以将大小设置为您想要的任何数字。但是更大的池并不总是意味着更快的执行时间。线程池的最佳大小很大程度上取决于实际用例(例如主机数量)、环境(CPU 数量),甚至线程中执行的代码(是否阻塞或挂起线程等)。基本上,您需要调整池大小来确定最适合您的值。但是,如果在另一个环境中执行,此值不一定会发挥最佳效果。
【解决方案6】:

我们可以使用 Future 接口并行执行。

package test.basics;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class TestFutureTask {
    private static final int TIMEOUT = 30000;

    public static void main(String[] args) {
        List<String> hosts = new ArrayList<String>();
        hosts.add("127.0.0.1");
        hosts.add("127.0.0.2");
        hosts.add("127.0.0.3");
        hosts.add("127.0.0.4");
        hosts.add("127.0.0.5");
        hosts.add("127.0.0.6");
        List<String> aliveHosts = new ArrayList<>();
        List<String> notAliveHosts = new ArrayList<>();

        long stTime = System.currentTimeMillis();
        System.out.println("Starting time " + stTime);
        Map<String, Future> jobList = new HashMap<>();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (String host : hosts) {

            Future f = newCachedThreadPool.submit(new Callable<Boolean>() {

                private String host;

                @Override
                public Boolean call() throws Exception {
                    return InetAddress.getByName(host).isReachable(TIMEOUT);
                }

                public Callable<Boolean> init(String host) {
                    this.host = host;
                    return this;
                }
            }.init(host));

            jobList.put(host, f);

        }

        for (String host : jobList.keySet()) {
            try {
                if ((boolean) jobList.get(host).get()) {
                    aliveHosts.add(host);
                } else {
                    notAliveHosts.add(host);
                }
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Ending time : " + endTime);
        System.out.println("Time taken :" + (endTime - stTime));
        System.out.println("Alive hosts: " + aliveHosts);
        System.out.println("Not alive hosts: " + notAliveHosts);
    }
}

示例输出:

开始时间1500570979858

结束时间:1500571009872

所用时间:30014

活动主机:[127.0.0.1]

不活跃的主机:[127.0.0.6, 127.0.0.5, 127.0.0.4, 127.0.0.3, 127.0.0.2]

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-28
    • 2015-11-11
    • 2021-07-12
    • 1970-01-01
    • 2020-04-08
    • 2021-12-07
    • 2023-03-25
    • 1970-01-01
    相关资源
    最近更新 更多