【问题标题】:Java PipedInputStream available() method return valueJava PipedInputStream available() 方法返回值
【发布时间】:2011-09-23 21:21:03
【问题描述】:

我正在尝试编写一段解锁代码来读取PipedInputStream。它基本上会在调用阻塞读取 API 之前检查是否有要读取的内容:

int n = 0;
if ((n = pipedInputStream_.available()) > 0) {
     pipedInputStream_.read(...)
}

阅读java API doc 我无法确定该检查应该是什么,因为可能的值为零(意味着没有数据,或关闭/中断的流)或大于零。那么调用者如何知道是否有任何要读取的内容呢?

“返回可以从这个输入流中读取而不阻塞的字节数,如果这个输入流已经通过调用它的 close() 方法关闭,或者如果管道未连接,或损坏,则返回 0。”

查看源代码似乎唯一的值是零或大于零。

public synchronized int available() throws IOException {
    if(in < 0)
        return 0;
    else if(in == out)
        return buffer.length;
    else if (in > out)
        return in - out;
    else
        return in + buffer.length - out;
}

【问题讨论】:

  • 零并不意味着关闭或中断流。零或大于零是每个规范唯一允许的结果。
  • 你能解释一下为什么你说零并不意味着关闭或中断流吗?请看我同意的 seh 的解释。 Java 文档还说:“返回:可以从该输入流中读取而不会阻塞的字节数,或者如果该输入流已通过调用其 close() 方法关闭,或者如果管道未连接,则为 0,或者坏了。”
  • 你误会了。零表示要么没有数据可以读取而不阻塞流已关闭管道未连接或 i> 管道坏了。这并不意味着这些条件中的一个
  • 当然。附录消除了您之前回复中第一个陈述引起的混乱。
  • 这里的混淆包含在您的陈述中。我已经为你编辑了。

标签: java inputstream pipe


【解决方案1】:

如果available() 返回零,则目前没有可读取的字节。根据您引用的文档,可能有几个原因:

  • 管道已关闭。
  • 管道坏了。
  • 所有以前可用的输入(如果有)都已被消耗。

来自available() 的零返回值可能意味着发生了错误,这意味着您将永远无法通过管道读取更多数据,但您不能在这里确定一下,因为零可能表示上面的第三个条件,在InputStream#read() 上阻塞可能最终会产生更多数据,相应的OutputStream 侧将通过管道推送。

在有更多数据可用之前,我认为无法用available() 轮询PipedInputStream,因为您将永远无法将上述终端案例(第一个和第二个)与读者区分开来比作者还饿。像许多流接口一样,您也必须尝试使用并准备失败。这就是陷阱; InputStream#read() 会阻止,但直到您承诺阻止尝试读取时,您才能发现没有更多的输入。

将消费行为基于available() 是不可行的。如果它返回一个正数,则有 something 可供阅读,但当然,即使现在可用的内容也可能“不足以”满足您的消费者。如果您提交一个线程以阻塞方式使用InputStream 并跳过使用available() 的轮询,您会发现您的应用程序更易于管理。让InputStream#read() 成为您在这里的唯一神谕。

【讨论】:

  • 谢谢。是的,在查看了 PipedInputStream 的其余代码,特别是循环缓冲区的实现/使用之后,我发现第三个终止条件也是零的原因之一。
【解决方案2】:

我需要一个过滤器来拦截慢速连接,我需要尽快关闭数据库连接,所以我最初使用 Java 管道,但是当仔细观察它们的实现时,它都是同步的,所以我最终使用一个小缓冲区创建了自己的 QueueInputStream 和阻塞队列将缓冲区放入队列中一次已满,它是无锁的,除非 LinkedBlockingQueue 使用的锁定条件在小缓冲区的帮助下应该很便宜,此类仅用于单个每个实例的生产者和消费者:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{-1};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

}

【讨论】:

    猜你喜欢
    • 2021-05-10
    • 2012-12-18
    • 2013-08-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-30
    • 1970-01-01
    • 2012-05-10
    相关资源
    最近更新 更多