【问题标题】:Java Multithreading make threads end in same order they started but run at same timeJava多线程使线程以它们开始的顺序结束但同时运行
【发布时间】:2013-09-20 21:35:58
【问题描述】:

我必须编写一个程序,从文件中搜索一堆行并尝试找到给定的子字符串。如果找到它,它会打印出该行。我读取的每一行都创建为一个线程,每个线程搜索文件的一行。到目前为止,这还不是问题。我需要程序做的是按照创建线程的顺序打印最终结果(文本行)。 IE。线程6不应该在线程2之前打印。线程同时运行很好,只需要维护打印顺序。我不能使用 join 方法,因为我不希望下一个在开始之前等待另一个完全完成,我希望它们同时运行。有什么建议吗?此外,该文件可以有任意数量的行,所以我不能硬编码线程的数量。

线程应该自己打印。 主体不进行打印。

【问题讨论】:

  • “加入”并不意味着“等待其他人完成后再开始”,除非你故意这样编程......
  • 创建一个完整的线程来处理文件的一行是非常低效的,如果文件太长,可能会导致应用程序崩溃。使用某些答案中描述的方法之一将行调度到线程池。

标签: java multithreading search concurrency


【解决方案1】:

首先,线程应用程序的顺序很难定义。见这里:unwanted output in multithreading

如果您希望按特定顺序输出,那么您可能应该使用ExecutorService,它将返回Future。您将Callable<String> 类提交给每个返回结果的服务。提交返回Future<String>。然后,您可以按照将作业提交到服务的相同顺序从Future 调用get()

// create a thread pool with 10 workers
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// or you can create an open-ended thread pool
// ExecutorService threadPool = Executors.newCachedThreadPool();
// define your jobs somehow as a Callable that returns the line (prolly a string)
List<Future<String>> futures = threadPool.invokeAll(jobsToDo);
// once we have submitted all jobs to the thread pool, it should be shutdown
threadPool.shutdown();
// now we can go through the futures and get the results in order
for (Future<String> future : futures) {
    // this returns the results from the `call()` in order or it throws
    String resultFromCall = future.get();
}

你的工作 Callable 类看起来像:

public class MyCallable implements Callable<String> {
    private String input;
    public MyCallable(String input) {
        this.input = input;
    }
    public String call() {
        // search the input string
        String result = search(input);
        return result;
    }
}

【讨论】:

  • 是的,我总是忘记这一点。谢谢@SamiKorhonen。
  • 所以我应该实现 Callable 而不是 Runnable?
  • @TeddyBlack Callable 和 Runnable 之间的唯一区别(除了 throws 签名)是 Callable 可以返回一个值而 Runnable 不能。 Executor 可以并且会将 Callable 的返回值交还给您,但是如果您不需要该功能,那么 Runnables 将以几乎相同的方式工作。 Executor 可以通过任一方式接受工作。
  • 在这种情况下@TeddyBlack,由于您需要从对行的处理中返回一个值,那么是的,您应该实现 Callable
  • 那么究竟是谁在打印呢?线程本身应该打印,而不是向 main 发送一行以便 main 打印它。
【解决方案2】:

实际上,您可以使用join()。但是您确实需要根据您的要求在适当的时间和地点调用它。更新了您的新要求“主线程不应打印结果,工作线程应打印结果”。

注意:这种类型的程序实际上最好使用java.util.concurrent 中的实用程序类编写。但是,我使用线程“基类”编写了这个示例来帮助学习。请注意 reggert 的评论(上面)——盲目地在文件中的每行文本创建一个线程可能会导致线程过多,以至于您的程序崩溃或操作系统不堪重负。这种工作委托最好使用线程池来完成(例如java.util.concurrent.Executors)。

public class MultiThreadedLineSearcher {

    public static void main(String[] args) throws Exception {
        Thread previousThread = null;
        for (int i = 0; i < LINES.length; i++) {
            JobRunnable job = new JobRunnable(i, LINES[i], previousThread);
            Thread thread = new Thread(job, "T-" + i);
            thread.start();
            previousThread = thread;
        }
        if (previousThread != null) {
            previousThread.join();
        }
        System.out.println("Program done.");
    }

    public static class JobRunnable implements Runnable {
        private final int _lineIdx;
        private final String _lineText;
        private final Thread _threadToWaitForBeforePrinting;

        public JobRunnable(int lineIdx, String lineText,
                Thread threadToWaitForBeforePrinting) {
            _lineIdx = lineIdx;
            _lineText = lineText;
            _threadToWaitForBeforePrinting = threadToWaitForBeforePrinting;
        }

        public void run() {
            try {
                boolean matched = FIND_ME.matcher(_lineText).find();
                String currentThreadName = Thread.currentThread().getName();
                System.out.println("Thread " + currentThreadName
                        + " is done with its work.");
                if (_threadToWaitForBeforePrinting != null) {
                    System.out.println("Thread " + currentThreadName
                            + " will wait for thread "
                            + _threadToWaitForBeforePrinting.getName()
                            + " before printing its results.");
                    _threadToWaitForBeforePrinting.join();
                }
                System.out.println("RESULT: " + _lineIdx + " matched? "
                        + matched + " (Printed on Thread "
                        + currentThreadName + ")");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    final static String[] LINES = new String[] {
        "Sed ut perspiciatis unde omnis iste natus error sit voluptatem",
        "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa",
        "quae ab illo inventore veritatis et quasi architecto beatae vitae",
        "dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas",
        "sit aspernatur aut odit aut fugit, sed quia consequuntur magni",
        "dolores eos qui ratione voluptatem sequi nesciunt. Neque porro",
        "quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur,",
        "adipisci velit, sed quia non numquam eius modi tempora incidunt",
        "ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad",
        "minima veniam, quis nostrum exercitationem ullam corporis",
        "suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur?",
        "Quis autem vel eum iure reprehenderit qui in ea voluptate velit",
        "esse quam nihil molestiae consequatur, vel illum qui dolorem eum",
        "fugiat quo voluptas nulla pariatur?" };

    // Match only words that are 11 characters or longer
    final static java.util.regex.Pattern FIND_ME = 
            java.util.regex.Pattern.compile("\\w{11,}");
}

下面是如何使用 Executor 来完成的:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class MultiThreadedLineSearcherExecutor {

    static final int MAX_THREADS = 10;

    public static void main(String[] args) throws Exception {
        // Create as many threads as there are lines of text,
        // but do not exceed 10 threads
        int lineCount = LINES.length;
        int threadPoolSize = Math.min(MAX_THREADS, lineCount);
        System.out.println("Number of lines = " + lineCount);
        System.out.println("Thread pool size = " + threadPoolSize);
        ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
        Future previousFuture = null;
        for (int i = 0; i < lineCount; i++) {
            JobRunnable job = new JobRunnable(i, LINES[i], previousFuture);
            previousFuture = executor.submit(job);
        }
        executor.shutdown();
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        System.out.println("Program done.");
    }

    public static class JobRunnable implements Runnable {
        private final int _lineIdx;
        private final String _lineText;
        private final Future _futureToWaitForBeforePrinting;

        public JobRunnable(int lineIdx, String lineText,
                Future previousFuture) {
            _lineIdx = lineIdx;
            _lineText = lineText;
            _futureToWaitForBeforePrinting = previousFuture;
        }

        public void run() {
            try {
                boolean matched = FIND_ME.matcher(_lineText).find();
                String currentThreadName = Thread.currentThread().getName();
                System.out.println("Thread " + currentThreadName
                        + " is done with its work on line " + _lineIdx);
                if (_futureToWaitForBeforePrinting != null) {
                    System.out.println("Thread " + currentThreadName
                            + " will wait for future "
                            + _futureToWaitForBeforePrinting
                            + " before printing its results.");
                    _futureToWaitForBeforePrinting.get();
                }
                System.out.println("RESULT: " + _lineIdx + " matched? "
                        + matched + " (Printed on Thread "
                        + currentThreadName + ")");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    final static String[] LINES = new String[] {
        "Sed ut perspiciatis unde omnis iste natus error sit voluptatem",
        "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa",
        "quae ab illo inventore veritatis et quasi architecto beatae vitae",
        "dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas",
        "sit aspernatur aut odit aut fugit, sed quia consequuntur magni",
        "dolores eos qui ratione voluptatem sequi nesciunt. Neque porro",
        "quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur,",
        "adipisci velit, sed quia non numquam eius modi tempora incidunt",
        "ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad",
        "minima veniam, quis nostrum exercitationem ullam corporis",
        "suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur?",
        "Quis autem vel eum iure reprehenderit qui in ea voluptate velit",
        "esse quam nihil molestiae consequatur, vel illum qui dolorem eum",
        "fugiat quo voluptas nulla pariatur?" };

    // Match only words that are 11 characters or longer
    final static java.util.regex.Pattern FIND_ME = 
            java.util.regex.Pattern.compile("\\w{11,}");
}

【讨论】:

  • 线程必须打印自己的输出。有人特别告诉我不要编写程序,以便主程序完成所有打印。
  • 我更改了程序以满足您新指定的要求。
  • 谢谢!!我不敢相信我忽略了这一点。现在看来很明显了。
【解决方案3】:

我不能使用 join 方法,因为我不希望下一个在开始之前等待另一个完全完成,我希望它们同时运行。

您可以让它们填充一个数组(其中每个“知道”要填充的元素作为其初始状态的一部分),然后加入所有线程,以便等待它们全部完成,并且然后按顺序打印出数组的内容。

【讨论】:

    【解决方案4】:

    我会使用某种集合来对结果进行排序。每个线程将启动一个具有计数器的Runnable。像这样的:

    private Map<Integer, String> map = new HashMap<Integer, String>();
    int counter = 1;
    while (line = getNextLine()) {
        Runnable runnable = new PrinterRunnable(line, counter);
        ++counter;
    }
    

    Runnable 看起来像:

    public PrinterRunnable implements Runnable {
    
       private String line;
       private Integer counter;
    
       public PrinterRunnable(String line, Integer counter) {
           this.line = line;
           this.counter = counter;
       }
    
    
       public run () {
          // Do some work on line and keep in the map the result where counter is the key
       }
    }
    

    重要提示:在此示例中,PrinterRunnable 应该是运行代码的类的内部类,以便它能够访问地图(或者您应该传递对以其他方式映射)。

    最后,您可以对地图的keySet() 进行排序,然后相应地打印值。

    【讨论】:

      猜你喜欢
      • 2018-10-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-05-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多