【问题标题】:BufferedReader readline when reading in a thread在线程中读取时的 BufferedReader readline
【发布时间】:2012-09-30 20:45:47
【问题描述】:

我是 java 并发编程的新手。

我需要阅读、分析和处理一个增长极快的日志文件,所以我必须 快速地。 我的想法是读取文件(逐行)并匹配我想要的相关行 将这些行传递给可以对该行进行进一步处理的单独线程。 在下面的示例代码中,我将这些线程称为“IOThread”。

我的问题是 IOthread.run() 中的 BufferedReader readline 显然永远不会返回。 在线程内读取 Stream 的工作方式是什么? 还有比下面的方法更好的方法吗?

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

class IOThread extends Thread {
    private InputStream is;
    private int t;

    public IOThread(InputStream is, int t)  {
        this.is = is;
        this.t = t;
        System.out.println("iothread<" + t + ">.init");
    }

    public void run() {
        try {
            System.out.println("iothread<" + t + ">.run");
            String line;

            BufferedReader streamReader = new BufferedReader(new InputStreamReader(is));
            while ((line = streamReader.readLine()) != null) {
                System.out.println("iothread<" + t + "> got line " + line);
            }
            System.out.println("iothread " + t + " end run");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class Stm {
    public Stm(String filePath) {
        System.out.println("start");

        try {
            BufferedReader reader = new BufferedReader(new FileReader(filePath));

            PipedOutputStream po1 = new PipedOutputStream();
            PipedOutputStream po2 = new PipedOutputStream();
            PipedInputStream pi1 = new PipedInputStream(po1);
            PipedInputStream pi2 = new PipedInputStream(po2);
            IOThread it1 = new IOThread(pi1,1);
            IOThread it2 = new IOThread(pi2,2);

            it1.start();
            it2.start();
//          it1.join();
//          it2.join();

            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println("got line " + line);

                if (line.contains("aaa")) {
                    System.out.println("passing to thread 1: " + line);  
                    po1.write(line.getBytes());
                } else if (line.contains("bbb")) {
                    System.out.println("passing to thread 2: " + line);  
                    po2.write(line.getBytes());
                }
            }
            reader.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Stm(args[0]);
    }

}

一个示例输入文件是:

line 1
line 2
line 3 aaa ...
line 4
line 5 bbb ...
line 6 aaa ...
line 7
line 8 bbb ...
line 9 bbb ...
line 10

以输入文件的文件名作为参数调用上述代码。

【问题讨论】:

  • 可能PipedInputStream 期望读取更多字节...您可以close() PipedOutputStream 进行测试吗?
  • 哦,无论如何我和 Sanjay 在一起......使用来自进程内通信的输入流和输出流很麻烦。将受影响的行传递给线程不是更好吗?
  • 我在 run() 方法中的 while ((line = streamReader.readLine()) != null) 行上遇到了 java.io.IOException: Write end dead 异常。

标签: java multithreading concurrency readline


【解决方案1】:

由于以下原因,您的 iothread 中的阅读器一直停留在 while 循环的第一次迭代的头部: 您从 STM 线程传递读取行的内容,但不附加换行符 (\n)。由于您的缓冲阅读器等待换行符(如在 .readLine() 中),它会永远等待。你可以像这样修改你的代码:

   if (line.contains("aaa")) {
                System.out.println("passing to thread 1: " + line);  
                byte[] payload = (line+"\n").getBytes();
                po1.write(payload);
            } else if (line.contains("bbb")) {
                System.out.println("passing to thread 2: " + line);  
                byte[] payload = (line+"\n").getBytes();
                po2.write(payload);
            }

但我不得不说这根本不是一个优雅的解决方案,您可以使用阻塞队列或类似的东西来为您的 IOThreads 提供内容。这样您就可以避免将输入到字符串转换为字节再转换回字符串(不是说摆脱所有流)。

【讨论】:

  • @Michael 我在 run() 方法中的while ((line = streamReader.readLine()) != null) 行上遇到了java.io.IOException: Write end dead 异常
  • @AK7 : 见techtavern.wordpress.com/2008/07/16/… 如前所述,我鼓励您摆脱流并改用阻塞队列和一些工作人员。
  • @Michael:感谢您指出这个愚蠢的错误。不过,我会采用 BlockingQueue 方法(就像你和 Sanjay 指出的那样)。
【解决方案2】:

恕我直言,你把它弄反了。创建多个线程来“处理”东西,而不是从文件中读取数据。从文件中读取数据时,无论如何您都会遇到瓶颈,因此拥有多个线程不会有任何区别。最简单的解决方案是在给定线程中尽可能快地读取行并将这些行存储在共享队列中。然后可以由任意数量的线程访问此队列以进行相关处理。

这样,您实际上可以在 I/O 或读取器线程忙于读取/等待数据时进行并发处理。如果可能,请在阅读器线程中将“逻辑”保持在最低限度。只需阅读这些行,让工作线程完成真正繁重的工作(匹配模式、进一步处理等)。只需使用线程安全队列,您就应该是洁净的。

编辑:使用BlockingQueue 的一些变体,基于数组或基于链表。

【讨论】:

  • 感谢您的意见。我的例子被简化了。在现实生活中,线程必须做更多的事情,所以我的瓶颈不是读取,而是处理。
  • @pitseeker:嗯,这就是我想要表达的观点。不要创建多个 IOThread。创建一组从共享队列中读取的工作线程。此队列将由您的单个 IOThread 填充。
  • 谢谢你桑杰 - 我会试试的。
猜你喜欢
  • 2014-09-26
  • 1970-01-01
  • 1970-01-01
  • 2023-03-21
  • 2021-09-30
  • 1970-01-01
  • 1970-01-01
  • 2015-07-22
相关资源
最近更新 更多