【问题标题】:Java - Processing file with three threadsJava - 使用三个线程处理文件
【发布时间】:2021-12-19 08:37:24
【问题描述】:
ExecutorService executor1 = Executors.newSingleThreadExecutor();
        ExecutorService executor2 = Executors.newSingleThreadExecutor();
        ExecutorService executor3 = Executors.newSingleThreadExecutor();
        ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<String>(1000);
        try {

             String line;
             InputStream is = file.getInputStream();
             br = new BufferedReader(new InputStreamReader(is));
             while ((line = br.readLine()) != null) {
                 String[] values = line.split(",");
                 List<String> valuesList = Arrays.asList(values);
                 for(String valueList : valuesList) {
                     abq.put(valueList);
                     executor2.execute(new Runnable () {
                         public void run() {
                             System.out.println(valueList + Thread.currentThread().getName());
                         }
                     });       

嗨 我正在尝试执行以下操作:

  1. 从主线程读取文件
  2. 将读取的值存储到阻塞队列中,另一个线程将从该队列中访问和处理。
  3. 让另一个线程写入另一个文件。
    但我对如何做到这一点感到困惑。 如果我声明一个固定线程池,我将无法控制哪个线程做什么,但是在这种方法中,这不是类似于顺序处理,因为线程属于不同的池吗?
    如果有人可以指导我如何做到这一点,那将对我有很大帮助。

【问题讨论】:

    标签: java spring file concurrency java.util.concurrent


    【解决方案1】:

    我不知道我是否理解你想要做什么,但我会这样做: 我创建了 3 个线程并以不同的方式命名它们,根据我让它们运行 3 种不同方法的名称: 第一个读取文件并将结果放入所有 trhead 和静态共享的数组列表中 第二个有一个无限循环(或者如果你想做一些优化,一个任务每隔一定时间运行一次),它总是获取数组列表的第一个元素并处理它并将它放入另一个也是共享和静态的数组列表中 第三个线程还有一个循环(或任务),它总是获取第一个元素并将其打印到文件中。

    我会这样做,但也许我不明白,如果你需要给我写信

    (谷歌翻译从:意大利语到:英语)

    【讨论】:

    • 是的,这就是我现在正在做的事情,但是如果线程来自不同的池,它们不会按顺序执行吗?
    • mhm,你不能只用一个设置名称的 main 方法和 3 个方法(我写的那些)来做一个类 ThreadOperation 扩展 Thread 然后你在 main 方法或任何地方创建它们您喜欢并且您以比读取文件的优先级更高的优先级启动它们。 (共享数组你应该把它们放在你创建线程的类中(而不是你扩展到线程的类)
    【解决方案2】:

    我们正在执行以下两项任务:

    1. 读取 + 反序列化输入(从文件中读取字节并解析为 Java 对象“消息”)
    2. 序列化 + 写入输出(转换为目标格式并写入)

    请注意,只有在查看某些特定的高吞吐量场景时,才可能需要使用 2 个不同线程顺序工作并同步(阻塞)消息切换的方法。与每个线程的每个任务相关的工作通常必须平衡(即占用相似数量的 CPU 周期),这甚至是有益的。或者,该输出被写入例如云存储或可能会间歇性停止的东西。

    您是正确的,您必须使用线程安全队列进行线程之间的切换!最简单的方法是直接实例化和设置 2 个线程之间的交互(而不是使用 ExecutorService)。

    有关代码示例,请参见下文。这里我们假设表示消息的 Java 对象称为MyMsg,并且在输入和输出文件中序列化形式都是基于String

    public class ProcessorExample {
    
      private static final MyMsg END = new MyMsg(); // used to signal file read finished
    
      public void processFile(File inFile, File outFile) {
        BlockingQueue<MyMsg> queue = new ArrayBlockingQueue<>(4096);
        Thread reader = new Thread(() -> read(queue, inFile), "reader");
        Thread writer = new Thread(() -> write(queue, outFile), "writer");
        reader.start();
        writer.start();
      }
    
      private void read(BlockingQueue<MyMsg> queue, File inFile) {
        try (BufferedReader reader = new BufferedReader(new FileReader(inFile))) {
          String line;
          while ((line = reader.readLine()) != null) {
            queue.put(deserialize(line)); //surround with try/catch to drop failed messages
          }
          queue.put(END);
        } catch (IOException|InterruptedException e) {
            // TODO: Graceful handling of exceptions at file level
        }
      }
    
      private void write(BlockingQueue<MyMsg> queue, File outFile) {
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(outFile))) {
          MyMsg msg;
          while ((msg = queue.take()) != END) {
            writer.write(serialize(msg)); //surround with try/catch to drop failed messages
          }
        } catch (IOException|InterruptedException e) {
          // TODO: Graceful handling of exceptions at file level
        }
      }
    
      private MyMsg deserialize(String str) {
        return null; //TODO implement
      }
    
      private String serialize(MyMsg msg) {
        return null; //TODO implement
      }
    
    }
    

    一些最后的想法:

    • 为了获得最高吞吐量,请考虑使用来自JCTools 的无锁单生产者单消费者队列,而不是ArrayBlockingQueue
    • 尽可能在实际工作负载上测试您的代码,以获得适用于您的用例的真实性能数据。
    • 如果您要处理大量文件 (>10k),最好开始重用线程。
    • 如果这实际上不是“高吞吐量”场景,而是反序列化/序列化速度慢的问题,那么我建议先看看如何改进这些方法。那里有很棒的算法和库来执行可能需要很长时间的事情(读取巨大的 XML 等)

    【讨论】:

      【解决方案3】:

      我看到发布了各种解决方案,但我想知道您是否想将读取和写入拆分到一个文件中。

      如果您要并行处理许多文件,那么我将从在单个线程上顺序读取和写入开始,这样就不需要同步,因此您不会浪费 CPU 的潜力。

      您可以通过多个线程并行处理多个文件。

      我将从创建一些基准开始,例如在进入多线程解决方案之前,使用 JMH 并查看实际瓶颈是什么。如果使用得当,单个 CPU 可以完成大量工作。

      如果您使用的是 Linux,那么对于简单的顺序 I/O,您不需要等待磁盘访问(因此它不是同步 I/O)。 Linux 依赖于一种称为预读的功能,它将您的进程将要读取的数据预取到页面缓存中。在顺序读取的情况下,预取窗口将被最大化。因此,当您的本地读取缓冲区耗尽时,它只需要查看页面缓存以将下一个数据加载到用户空间缓冲区中,因为预读可能已经将数据加载到页面缓存中。并且对于缓冲 I/O,写入也首先在用户空间缓冲区中结束;一旦缓冲区填满,它就会被写入页面缓存。只有在未来的某个时刻,脏页才会被写入磁盘。现代 NVMe SSD 的顺序读/写速度非常快(每秒数 GB)。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-12-21
        • 1970-01-01
        • 2013-09-11
        • 2013-04-29
        • 1970-01-01
        • 1970-01-01
        • 2016-04-27
        • 1970-01-01
        相关资源
        最近更新 更多