【问题标题】:Read input stream twice两次读取输入流
【发布时间】:2012-03-19 01:31:57
【问题描述】:

你如何两次读取相同的输入流?有办法复制吗?

我需要从网络获取图像,将其保存在本地,然后返回保存的图像。我只是认为使用相同的流而不是为下载的内容启动一个新流然后再次读取它会更快。

【问题讨论】:

  • 可能使用标记和重置

标签: java inputstream


【解决方案1】:

根据 InputStream 的来源,您可能无法重置它。您可以使用markSupported() 检查是否支持mark()reset()

如果是,可以在InputStream上调用reset()返回到开头。如果没有,则需要再次从源中读取 InputStream。

【讨论】:

  • InputStream 不支持 'mark' - 你可以在 IS 上调用 mark 但它什么也不做。同样,在 IS 上调用 reset 将引发异常。
  • @ayahuasca InputStream BufferedInputStream 等子类确实支持“标记”
【解决方案2】:

如果您正在使用InputStream 的实现,您可以检查InputStream#markSupported() 的结果,告诉您是否可以使用mark() / reset() 方法。

如果您可以在阅读时标记流,则调用reset() 以返回开始。

如果不能,则必须再次打开流。

另一种解决方案是将 InputStream 转换为字节数组,然后根据需要多次迭代数组。您可以在这篇文章Convert InputStream to byte array in Java 中找到几个解决方案,是否使用第三方库。注意,如果读取的内容太大,您可能会遇到一些内存问题。

最后,如果您需要读取图像,请使用:

BufferedImage image = ImageIO.read(new URL("http://www.example.com/images/toto.jpg"));

使用ImageIO#read(java.net.URL) 还允许您使用缓存。

【讨论】:

  • 使用ImageIO#read(java.net.URL) 时的警告:一些网络服务器和CDN 可能会拒绝ImageIO#read 发出的裸呼叫(即没有使服务器相信呼叫来自网络浏览器的用户代理) .在这种情况下,使用 URLConnection.openConnection() 将用户代理设置为该连接 + 使用 `ImageIO.read(InputStream) 在大多数情况下会成功。
  • InputStream 不是接口
【解决方案3】:

将输入流转换为字节,然后将其传递给保存文件函数,在该函数中将其组装到输入流中。 同样在原始函数中使用字节用于其他任务

【讨论】:

  • 我说这个是个坏主意,结果数组可能很大并且会抢走设备的内存。
【解决方案4】:

您可以使用org.apache.commons.io.IOUtils.copy 将 InputStream 的内容复制到字节数组中,然后使用 ByteArrayInputStream 从字节数组中重复读取。例如:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
org.apache.commons.io.IOUtils.copy(in, baos);
byte[] bytes = baos.toByteArray();

// either
while (needToReadAgain) {
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    yourReadMethodHere(bais);
}

// or
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
while (needToReadAgain) {
    bais.reset();
    yourReadMethodHere(bais);
}

【讨论】:

  • @Paul Grime:IOUtils.toByeArray 内部也会从内部调用 copy 方法。
  • 正如@Ankit 所说,这个解决方案对我无效,因为输入是在内部读取的,不能重复使用。
  • 我知道这条评论已经过时了,但是,在第一个选项中,如果您将输入流作为字节数组读取,是否意味着您正在将所有数据加载到内存中?如果您要加载大文件之类的内容,这可能是个大问题?
  • @jaxkodex,是的,这是正确的。如果您作为开发人员更了解您正在处理的流的实际类型,那么您可以编写更合适的自定义行为。提供的答案是一般抽象。
  • 可以使用 IOUtils.toByteArray(InputStream) 在一次调用中获取字节数组。
【解决方案5】:

如果你的InputStream支持使用标记,那么你可以mark()你的inputStream然后reset()它。如果您的InputStrem 不支持标记,那么您可以使用java.io.BufferedInputStream 类,这样您就可以像这样将您的流嵌入到BufferedInputStream

    InputStream bufferdInputStream = new BufferedInputStream(yourInputStream);
    bufferdInputStream.mark(some_value);
    //read your bufferdInputStream 
    bufferdInputStream.reset();
    //read it again

【讨论】:

  • 一个缓冲的输入流只能标记回缓冲区大小,所以如果源不适合,你不能一直回到开头。
  • @L.Blanc 抱歉,但这似乎不正确。看看BufferedInputStream.fill(),有“增长缓冲区”部分,其中新缓冲区大小仅与marklimitMAX_BUFFER_SIZE 进行比较。
【解决方案6】:

您可以使用 PushbackInputStream 包装输入流。 PushbackInputStream 允许 unread ("write back") 已经读取的字节,所以你可以这样做:

public class StreamTest {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's wrap it with PushBackInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new PushbackInputStream(originalStream, 10); // 10 means that maximnum 10 characters can be "written back" to the stream

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    ((PushbackInputStream) wrappedStream).unread(readBytes, 0, readBytes.length);

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3


  }

  private static byte[] getBytes(InputStream is, int howManyBytes) throws IOException {
    System.out.print("Reading stream: ");

    byte[] buf = new byte[howManyBytes];

    int next = 0;
    for (int i = 0; i < howManyBytes; i++) {
      next = is.read();
      if (next > 0) {
        buf[i] = (byte) next;
      }
    }
    return buf;
  }

  private static void printBytes(byte[] buffer) throws IOException {
    System.out.print("Reading stream: ");

    for (int i = 0; i < buffer.length; i++) {
      System.out.print(buffer[i] + " ");
    }
    System.out.println();
  }


}

请注意,PushbackInputStream 存储字节的内部缓冲区,因此它确实在内存中创建了一个缓冲区来保存“写回”的字节。

了解了这种方法,我们可以更进一步,将其与 FilterInputStream 结合起来。 FilterInputStream 将原始输入流存储为委托。这允许创建允许自动“未读”原始数据的新类定义。该类的定义如下:

public class TryReadInputStream extends FilterInputStream {
  private final int maxPushbackBufferSize;

  /**
  * Creates a <code>FilterInputStream</code>
  * by assigning the  argument <code>in</code>
  * to the field <code>this.in</code> so as
  * to remember it for later use.
  *
  * @param in the underlying input stream, or <code>null</code> if
  *           this instance is to be created without an underlying stream.
  */
  public TryReadInputStream(InputStream in, int maxPushbackBufferSize) {
    super(new PushbackInputStream(in, maxPushbackBufferSize));
    this.maxPushbackBufferSize = maxPushbackBufferSize;
  }

  /**
   * Reads from input stream the <code>length</code> of bytes to given buffer. The read bytes are still avilable
   * in the stream
   *
   * @param buffer the destination buffer to which read the data
   * @param offset  the start offset in the destination <code>buffer</code>
   * @aram length how many bytes to read from the stream to buff. Length needs to be less than
   *        <code>maxPushbackBufferSize</code> or IOException will be thrown
   *
   * @return number of bytes read
   * @throws java.io.IOException in case length is
   */
  public int tryRead(byte[] buffer, int offset, int length) throws IOException {
    validateMaxLength(length);

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int bytesRead = 0;

    int nextByte = 0;

    for (int i = 0; (i < length) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        buffer[offset + bytesRead++] = (byte) nextByte;
      }
    }

    if (bytesRead > 0) {
      ((PushbackInputStream) in).unread(buffer, offset, bytesRead);
    }

    return bytesRead;

  }

  public byte[] tryRead(int maxBytesToRead) throws IOException {
    validateMaxLength(maxBytesToRead);

    ByteArrayOutputStream baos = new ByteArrayOutputStream(); // as ByteArrayOutputStream to dynamically allocate internal bytes array instead of allocating possibly large buffer (if maxBytesToRead is large)

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int nextByte = 0;

    for (int i = 0; (i < maxBytesToRead) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        baos.write((byte) nextByte);
      }
    }

    byte[] buffer = baos.toByteArray();

    if (buffer.length > 0) {
      ((PushbackInputStream) in).unread(buffer, 0, buffer.length);
    }

    return buffer;

  }

  private void validateMaxLength(int length) throws IOException {
    if (length > maxPushbackBufferSize) {
      throw new IOException(
        "Trying to read more bytes than maxBytesToRead. Max bytes: " + maxPushbackBufferSize + ". Trying to read: " +
        length);
    }
  }

}

这个类有两个方法。一种用于读取现有缓冲区(定义类似于调用 InputStream 类的public int read(byte b[], int off, int len))。第二个返回新缓冲区(如果要读取的缓冲区大小未知,这可能更有效)。

现在让我们看看我们的课程的实际效果:

public class StreamTest2 {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's use our TryReadInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new TryReadInputStream(originalStream, 10);

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // NOTE: no manual call to "unread"(!) because TryReadInputStream handles this internally
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
    printBytes(readBytes); // prints 1 2 3

    // we can also call normal read which will actually read the bytes without "writing them back"
    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 4 5 6

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // now we can try read next bytes
    printBytes(readBytes); // prints 7 8 9

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 7 8 9


  }



}

【讨论】:

    【解决方案7】:

    怎么样:

    if (stream.markSupported() == false) {
    
            // lets replace the stream object
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            IOUtils.copy(stream, baos);
            stream.close();
            stream = new ByteArrayInputStream(baos.toByteArray());
            // now the stream should support 'mark' and 'reset'
    
        }
    

    【讨论】:

    • 这是个糟糕的主意。你像这样把整个流的内容放在内存中。
    【解决方案8】:

    如果有人在 Spring Boot 应用程序中运行,并且您想读取 RestTemplate 的响应正文(这就是我想读取流两次的原因),那么有一种干净(er)的方法这个。

    首先需要使用Spring的StreamUtils将流复制到String中:

    String text = StreamUtils.copyToString(response.getBody(), Charset.defaultCharset()))
    

    但这还不是全部。您还需要使用可以为您缓冲流的请求工厂,如下所示:

    ClientHttpRequestFactory factory = new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
    RestTemplate restTemplate = new RestTemplate(factory);
    

    或者,如果您使用的是工厂 bean,那么(尽管如此,这是 Kotlin):

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    fun createRestTemplate(): RestTemplate = RestTemplateBuilder()
      .requestFactory { BufferingClientHttpRequestFactory(SimpleClientHttpRequestFactory()) }
      .additionalInterceptors(loggingInterceptor)
      .build()
    

    来源:https://objectpartners.com/2018/03/01/log-your-resttemplate-request-and-response-without-destroying-the-body/

    【讨论】:

      【解决方案9】:

      为了将InputStream一分为二,同时避免将所有数据加载到内存中,然后独立处理它们:

      1. 创建一对OutputStream,准确地说:PipedOutputStream
      2. 将每个 PipedOutputStream 与 PipedInputStream 连接起来,这些PipedInputStream 是返回的InputStream
      3. 将源 InputStream 与刚刚创建的 OutputStream 连接。因此,从源代码InputStream 中读取的所有内容都将写入OutputStream。不需要实现它,因为它已经在 TeeInputStream (commons.io) 中完成。
      4. 在一个单独的线程中读取整个源输入流,然后将输入数据隐式传输到目标输入流。

        public static final List<InputStream> splitInputStream(InputStream input) 
            throws IOException 
        { 
            Objects.requireNonNull(input);      
        
            PipedOutputStream pipedOut01 = new PipedOutputStream();
            PipedOutputStream pipedOut02 = new PipedOutputStream();
        
            List<InputStream> inputStreamList = new ArrayList<>();
            inputStreamList.add(new PipedInputStream(pipedOut01));
            inputStreamList.add(new PipedInputStream(pipedOut02));
        
            TeeOutputStream tout = new TeeOutputStream(pipedOut01, pipedOut02);
        
            TeeInputStream tin = new TeeInputStream(input, tout, true);
        
            Executors.newSingleThreadExecutor().submit(tin::readAllBytes);  
        
            return Collections.unmodifiableList(inputStreamList);
        }
        

      注意消费完后关闭inputStreams,关闭运行的线程:TeeInputStream.readAllBytes()

      以防万一,您需要将其拆分为多个InputStream,而不仅仅是两个。将前面的代码片段中的TeeOutputStream 类替换为您自己的实现,这将封装List&lt;OutputStream&gt; 并覆盖OutputStream 接口:

      public final class TeeListOutputStream extends OutputStream {
          private final List<? extends OutputStream> branchList;
      
          public TeeListOutputStream(final List<? extends OutputStream> branchList) {
              Objects.requireNonNull(branchList);
              this.branchList = branchList;
          }
      
          @Override
          public synchronized void write(final int b) throws IOException {
              for (OutputStream branch : branchList) {
                  branch.write(b);
              }
          }
      
          @Override
          public void flush() throws IOException {
              for (OutputStream branch : branchList) {
                  branch.flush();
              }
          }
      
          @Override
          public void close() throws IOException {
              for (OutputStream branch : branchList) {
                  branch.close();
              }
          }
      }
      

      【讨论】:

      • 拜托,你能解释一下第4步吗?为什么我们必须手动触发阅读?为什么读取任何 pipedInputStream 都不会触发源 inputStream 的读取?为什么我们会异步调用?
      • 要关闭 TeeOutputStream 我在线程中添加了 tin.close:` Executors.newSingleThreadExecutor().submit(() -> { try { tin.readAllBytes(); tin.close(); } catch (IOException ioException) { ioException.printStackTrace(); } }); `
      【解决方案10】:

      如果您使用 RestTemplate 进行 http 调用,只需添加一个拦截器。 响应体由 ClientHttpResponse 的实现缓存。 现在可以根据需要多次从 respose 中检索输入流

      ClientHttpRequestInterceptor interceptor =  new ClientHttpRequestInterceptor() {
      
                  @Override
                  public ClientHttpResponse intercept(HttpRequest request, byte[] body,
                          ClientHttpRequestExecution execution) throws IOException {
                      ClientHttpResponse  response = execution.execute(request, body);
      
                        // additional work before returning response
                        return response 
                  }
              };
      
          // Add the interceptor to RestTemplate Instance 
      
               restTemplate.getInterceptors().add(interceptor); 
      

      【讨论】:

        【解决方案11】:
        ByteArrayInputStream ins = new ByteArrayInputStream("Hello".getBytes());
        System.out.println("ins.available() at begining:: " + ins.available());
        ins.mark(0);
        // Read input stream for some operations
        System.out.println("ins.available() after reading :: " + ins.available());
            ins.reset();
            System.out.println("ins.available() after resetting :: " + ins.available());
            // ins is ready for reading once again.
        

        【讨论】:

        • 上述语句的输出为: ins.available() at begin:: :: 1028 ins.available() after reading :: 0 ins.available() after reset :: 1028
        猜你喜欢
        • 2019-07-15
        • 2016-05-02
        • 2013-07-31
        • 2017-09-12
        • 2023-04-04
        • 2015-10-02
        • 1970-01-01
        相关资源
        最近更新 更多