【问题标题】:Java PipedInputStream PipedOutputStream Size limitationJava PipedInputStream PipedOutputStream 大小限制
【发布时间】:2012-07-09 18:06:57
【问题描述】:

我正在使用java管道将数据(outstream)从解压缩模块(JavaUncompress类)传递到解析模块(处理程序类),文件很大,我想先解压缩文件并直接解析而不是保存解压后的文件,然后解析。但是,它仅适用于小文件。当我输入一个 1G 文件时,似乎只有部分文件(比如 50000 行)从输出流到解析模块的输入流。

我尝试使用字符串来保存解压缩文件,同样的事情发生了,字符串只包含解压缩文件的一部分(与流水线版本相同的第 50000 行停止)。对发生的事情有任何想法吗?非常感谢。

这是我的管道代码:

   {
   PipedInputStream in = new PipedInputStream(); // to output
   final PipedOutputStream out = new PipedOutputStream(in); // out is something from other

   new Thread(
    new Runnable(){
        public void run(){ 
                JavaUncompress.putDataOnOutputStream(inFile,out); }
        }
        ).start();

   doc = handler.processDataFromInputStream(in);
   }

   public static void putDataOnOutputStream(String inZipFileName, PipedOutputStream out){

   try {
          FileInputStream fis = new FileInputStream(inZipFileName);
          //FilterInputStream ftis = new FilterInputStream;
          ZipInputStream zis = new ZipInputStream(new BufferedInputStream(fis));
          ZipEntry entry;

          while((entry = zis.getNextEntry()) != null) {
             System.out.println("Extracting: " +entry);
             byte data[] = new byte[BUFFER];

             long len = entry.getSize();
             long blk = len/BUFFER;
             int  rem = (int)(len - blk*BUFFER);
             System.out.println(len+" = "+blk +"*BUFFER + "+rem);

             for(long i=0; i!=blk; ++i){
                 if ((zis.read(data, 0, BUFFER)) != -1) {
                     out.write(data);
                 }
             }

             byte dataRem[] = new byte[rem];
             if ((zis.read(dataRem, 0, rem)) != -1) {
                 out.write(dataRem);
                 out.flush();
                 out.close();
             }

          }
          zis.close();

       } catch(Exception e) {
          e.printStackTrace();
       }
   }

【问题讨论】:

  • 为什么要使用 anything? 请看我的回答。在阅读文件时解析文件。
  • @EJP 的评论和回答很到位。不过,作为实际代码的旁注,您不能忽略InputStream.read() 的返回值——它非常很重要。另外,你需要展示你的阅读代码(你怎么知道文件之间的边界?)。

标签: java pipeline


【解决方案1】:

我同意不使用 JDK 中的 Pipes 实现,它们太混乱且完全同步,这是一个更快的 BlockingQueue 实现,它在一个小缓冲区的帮助下将对上下文切换的影响最小化,一个 Blocking队列非常适合单个生产者/消费者:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

【讨论】:

    【解决方案2】:

    PipedOutputStream.write() 将阻塞如果相应的PipedInputStream 超过 4096 或它后面的任何字节,但为什么要这样做呢?为什么不直接解压缩文件并在同一个线程中处理它?多线程并没有什么好处,这只是一个毫无意义的复杂化。

    我在 Java 中 15 年只使用过一次管道,我很快将其更改为队列。

    【讨论】:

      猜你喜欢
      • 2023-04-01
      • 2012-03-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-06
      • 2017-10-28
      • 1970-01-01
      • 2010-10-03
      相关资源
      最近更新 更多