【问题标题】:Wrapping a ByteBuffer with an InputStream用 InputStream 包装 ByteBuffer
【发布时间】:2011-05-18 22:49:14
【问题描述】:

我有一个接受 InputStream 并从中读取数据的方法。我也想将此方法与 ByteBuffer 一起使用。有没有办法包装一个 ByteBuffer 以便它可以作为流访问?

【问题讨论】:

  • 是原生 ByteBuffer,还是由字节数组支持?
  • 在这种情况下由字节数组支持
  • 我发现杰克逊有它:Jackson ByteBufferBackedInputStreamcom.fasterxml.jackson.databind.util

标签: java nio inputstream bytebuffer


【解决方案1】:

JDK 中什么都没有,但是那里有很多实现,谷歌 ByteBufferInputStream。基本上,它们包装一个或多个 ByteBuffer 并跟踪其中的索引,该索引记录了已读取的内容。 like this 出现了很多,但显然是错误的,请参阅 @Mike Houston's answer for an improved version)。

【讨论】:

    【解决方案2】:

    如果它由字节数组支持,您可以使用ByteArrayInputStream 并通过ByteBuffer.array() 获取字节数组。如果您在本机 ByteBuffer 上尝试,这将引发异常。

    【讨论】:

    • “本机 ByteBuffer”是指通过 ByteBuffer.allocateDirect() 创建的 ByteBuffer 对象吗?
    • 此方法仅在您确定要读取支持字节数组的全部内容时才有效。对于缓冲区部分已满的情况,您最终会读取超出限制的内容。
    • 这种做法是错误的,因为缓冲区内容可能只是数组的一部分,而数组的开头和结尾会包含其他数据。参见 get() 方法实现。
    【解决方案3】:

    Thilo 提到的实现似乎存在一些错误,并且还逐字复制并粘贴到其他网站上:

    1. ByteBufferBackedInputStream.read() 返回它读取的字节的符号扩展 int 表示,这是错误的(值应在 [-1..255] 范围内)
    2. 根据 API 规范,当缓冲区中没有剩余字节时,ByteBufferBackedInputStream.read(byte[], int, int) 不会返回 -1

    ByteBufferBackedOutputStream 似乎比较健全。

    我在下面展示了一个“固定”版本。如果我发现更多错误(或有人指出),我会在这里更新。

    更新:从读/写方法中删除了synchronized关键字

    输入流

    public class ByteBufferBackedInputStream extends InputStream {
    
        ByteBuffer buf;
    
        public ByteBufferBackedInputStream(ByteBuffer buf) {
            this.buf = buf;
        }
    
        public int read() throws IOException {
            if (!buf.hasRemaining()) {
                return -1;
            }
            return buf.get() & 0xFF;
        }
    
        public int read(byte[] bytes, int off, int len)
                throws IOException {
            if (!buf.hasRemaining()) {
                return -1;
            }
    
            len = Math.min(len, buf.remaining());
            buf.get(bytes, off, len);
            return len;
        }
    }
    

    输出流

    public class ByteBufferBackedOutputStream extends OutputStream {
        ByteBuffer buf;
    
        public ByteBufferBackedOutputStream(ByteBuffer buf) {
            this.buf = buf;
        }
    
        public void write(int b) throws IOException {
            buf.put((byte) b);
        }
    
        public void write(byte[] bytes, int off, int len)
                throws IOException {
            buf.put(bytes, off, len);
        }
    
    }
    

    【讨论】:

    • 为什么要让它同步?您是否希望多个线程读取相同的输入流?
    • @denys,抱歉,刚刚注意到您的评论 - 为什么您希望 flush 产生这种效果?似乎flip 会令人困惑,因为它会覆盖早期的数据,而flush() 通常不会这样做。我假设您正在尝试使用包装在输入和输出流中的单个缓冲区作为缓冲区?
    • @jaco0646 虽然您确实只需要实现单个抽象方法,但另一个方法的默认实现是根据 read(int) 和 write(int ) 所以它包含一个循环:for (int i = 0 ; i < len ; i++) { write(b[off + i]); } 为了提高效率,我们可以将字节数组直接传递到缓冲区,避免与 int 值相互转换并为每个字节调用一个函数。
    • 也许您可以从方法签名中删除throws IOException,因为实际实现从不抛出这些异常。
    • 也应该实现int available()
    【解决方案4】:

    如果可用,直接使用堆缓冲区(字节数组),否则使用包装的字节缓冲区(见答案 Mike Houston)

    public static InputStream asInputStream(ByteBuffer buffer) {
        if (buffer.hasArray()) {
            // use heap buffer; no array is created; only the reference is used
            return new ByteArrayInputStream(buffer.array());
        }
        return new ByteBufferInputStream(buffer);
    }
    

    还要注意,包装的缓冲区可以有效地支持标记/重置和跳过操作。

    【讨论】:

    • 请注意,.array() 是可选操作。它可能未实现(例如MappedByteBuffer),即使已实现,也会为只读缓冲区抛出异常。
    • 确实,这就是buffer.hasArray() 存在的原因:)
    • 如果您总是希望 InputStream 基于整个数组,这将是可以的,但不会为具有偏移量的流提供所需的结果。与您早 4 年提供的 this answer 相同的问题...
    • @Chris 首先,OP 没有要求支持带有偏移量的流。其次,我的回答是对 Mike Houston 回答的补充(文中已明确说明)
    • OP 要求包装 ByteBuffer 以作为流访问。 ByteBuffer 使用偏移量来控制调用者通常可以访问底层数组的哪个部分。这就是首先使用ByteBuffer 而不仅仅是byte[] 的部分原因。
    【解决方案5】:

    这是我的InputStream & OutputStream 实现版本:

    ByteBufferBackedInputStream:

    public class ByteBufferBackedInputStream extends InputStream
    {
      private ByteBuffer backendBuffer;
    
      public ByteBufferBackedInputStream(ByteBuffer backendBuffer) {
          Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!");
          this.backendBuffer = backendBuffer;
      }
    
      public void close() throws IOException {
          this.backendBuffer = null;
      }
    
      private void ensureStreamAvailable() throws IOException {
          if (this.backendBuffer == null) {
              throw new IOException("read on a closed InputStream!");
          }
      }
    
      @Override
      public int read() throws IOException {
          this.ensureStreamAvailable();
          return this.backendBuffer.hasRemaining() ? this.backendBuffer.get() & 0xFF : -1;
      }
    
      @Override
      public int read(@Nonnull byte[] buffer) throws IOException {
          return this.read(buffer, 0, buffer.length);
      }
    
      @Override
      public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
          this.ensureStreamAvailable();
          Objects.requireNonNull(buffer, "Given buffer can not be null!");
          if (offset >= 0 && length >= 0 && length <= buffer.length - offset) {
              if (length == 0) {
                  return 0;
              }
              else {
                  int remainingSize = Math.min(this.backendBuffer.remaining(), length);
                  if (remainingSize == 0) {
                      return -1;
                  }
                  else {
                      this.backendBuffer.get(buffer, offset, remainingSize);
                      return remainingSize;
                  }
              }
          }
          else {
              throw new IndexOutOfBoundsException();
          }
      }
    
      public long skip(long n) throws IOException {
          this.ensureStreamAvailable();
          if (n <= 0L) {
              return 0L;
          }
          int length = (int) n;
          int remainingSize = Math.min(this.backendBuffer.remaining(), length);
          this.backendBuffer.position(this.backendBuffer.position() + remainingSize);
          return (long) length;
      }
    
      public int available() throws IOException {
          this.ensureStreamAvailable();
          return this.backendBuffer.remaining();
      }
    
      public synchronized void mark(int var1) {
      }
    
      public synchronized void reset() throws IOException {
          throw new IOException("mark/reset not supported");
      }
    
      public boolean markSupported() {
          return false;
      }
    }
    

    ByteBufferBackedOutputStream:

    public class ByteBufferBackedOutputStream extends OutputStream
    {
        private ByteBuffer backendBuffer;
    
        public ByteBufferBackedOutputStream(ByteBuffer backendBuffer) {
            Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!");
            this.backendBuffer = backendBuffer;
        }
    
        public void close() throws IOException {
            this.backendBuffer = null;
        }
    
        private void ensureStreamAvailable() throws IOException {
            if (this.backendBuffer == null) {
                throw new IOException("write on a closed OutputStream");
            }
        }
    
        @Override
        public void write(int b) throws IOException {
            this.ensureStreamAvailable();
            backendBuffer.put((byte) b);
        }
    
        @Override
        public void write(@Nonnull byte[] bytes) throws IOException {
            this.write(bytes, 0, bytes.length);
        }
    
        @Override
        public void write(@Nonnull byte[] bytes, int off, int len) throws IOException {
            this.ensureStreamAvailable();
            Objects.requireNonNull(bytes, "Given buffer can not be null!");
            if ((off < 0) || (off > bytes.length) || (len < 0) ||
                ((off + len) > bytes.length) || ((off + len) < 0))
            {
                throw new IndexOutOfBoundsException();
            }
            else if (len == 0) {
                return;
            }
    
            backendBuffer.put(bytes, off, len);
        }
    }
    

    【讨论】:

      【解决方案6】:

      基于 ByteArrayInputStream 代码的派生... 要求提供的 ByteBuffer 提前正确设置位置和限制。

          public class ByteBufferInputStream extends InputStream
          {
              /**
               * The input ByteBuffer that was provided.
               * The ByteBuffer should be supplied with position and limit correctly set as appropriate
               */
              protected ByteBuffer buf;
      
              public ByteBufferInputStream(ByteBuffer buf)
              {
                  this.buf = buf;
                  buf.mark(); // to prevent java.nio.InvalidMarkException on InputStream.reset() if mark had not been set
              }
      
              /**
               * Reads the next byte of data from this ByteBuffer. The value byte is returned as an int in the range 0-255.
               * If no byte is available because the end of the buffer has been reached, the value -1 is returned.
               * @return  the next byte of data, or -1 if the limit/end of the buffer has been reached.
               */
              public int read()
              {
                  return buf.hasRemaining()
                      ? (buf.get() & 0xff)
                      : -1;
              }
      
              /**
               * Reads up to len bytes of data into an array of bytes from this ByteBuffer.
               * If the buffer has no remaining bytes, then -1 is returned to indicate end of file.
               * Otherwise, the number k of bytes read is equal to the smaller of len and buffer remaining.
               * @param   b     the buffer into which the data is read.
               * @param   off   the start offset in the destination array b
               * @param   len   the maximum number of bytes read.
               * @return  the total number of bytes read into the buffer, or -1 if there is no more data because the limit/end of
               *          the ByteBuffer has been reached.
               * @exception  NullPointerException If b is null.
               * @exception  IndexOutOfBoundsException If off is negative, len is negative, or len is greater than b.length - off
               */
              public int read(byte b[], int off, int len)
              {
                  if (b == null)
                  {
                      throw new NullPointerException();
                  }
                  else if (off < 0 || len < 0 || len > b.length - off)
                  {
                      throw new IndexOutOfBoundsException();
                  }
      
                  if (!buf.hasRemaining())
                  {
                      return -1;
                  }
      
                  int remaining = buf.remaining();
                  if (len > remaining)
                  {
                      len = remaining;
                  }
      
                  if (len <= 0)
                  {
                      return 0;
                  }
      
                  buf.get(b, off, len);
      
                  return len;
              }
      
              /**
               * Skips n bytes of input from this ByteBuffer. Fewer bytes might be skipped if the limit is reached.
               *
               * @param   n   the number of bytes to be skipped.
               * @return  the actual number of bytes skipped.
               */
              public long skip(long n)
              {
                  int skipAmount = (n < 0)
                      ? 0
                      : ((n > Integer.MAX_VALUE)
                      ? Integer.MAX_VALUE
                      : (int) n);
      
                  if (skipAmount > buf.remaining())
                  {
                      skipAmount = buf.remaining();
                  }
      
                  int newPos = buf.position() + skipAmount;
      
                  buf.position(newPos);
      
                  return skipAmount;
              }
      
              /**
               * Returns remaining bytes available in this ByteBuffer
               * @return the number of remaining bytes that can be read (or skipped over) from this ByteBuffer.
               */
              public int available()
              {
                  return buf.remaining();
              }
      
              public boolean markSupported()
              {
                  return true;
              }
      
              /**
               * Set the current marked position in the ByteBuffer.
               * <p> Note: The readAheadLimit for this class has no meaning.
               */
              public void mark(int readAheadLimit)
              {
                  buf.mark();
              }
      
              /**
               * Resets the ByteBuffer to the marked position.
               */
              public void reset()
              {
                  buf.reset();
              }
      
              /**
               * Closing a ByteBuffer has no effect.
               * The methods in this class can be called after the stream has been closed without generating an IOException.
               */
              public void close() throws IOException
              {
              }
          }
      

      【讨论】:

        猜你喜欢
        • 2015-07-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-02-12
        • 2010-11-23
        • 1970-01-01
        相关资源
        最近更新 更多