【问题标题】:How do you merge two input streams in Java?如何在 Java 中合并两个输入流?
【发布时间】:2010-10-20 02:26:11
【问题描述】:

在 Java 中有两个 InputStream,有没有一种方法可以合并它们,从而以一个 InputStream 结束,从而为您提供两个流的输出?怎么样?

【问题讨论】:

  • 究竟以什么方式合并?在读取第一个流后无缝继续读取第二个流?我对 Java 不是很熟悉,但在 C# 中,您可以通过实现一个继承自 Stream 的类(包含对两个基本流的引用)然后重写 Read 方法来轻松做到这一点。

标签: java io inputstream


【解决方案1】:

正如评论,不清楚你所说的合并是什么意思。

InputStream.available 并不一定会给您一个有用的答案和阻止流的行为,从而使“随机”从任何一个中获取可用输入变得复杂。您需要两个线程从流中读取数据,然后通过 java.io.Piped(In|Out)putStream 传回数据(尽管这些类有问题)。或者,对于某些类型的流,可以使用不同的接口,例如 java.nio 非阻塞通道。

如果您想要第一个输入流的完整内容,然后是第二个:new java.io.SequenceInputStream(s1, s2)

【讨论】:

  • 哦,太好了,我刚刚学到了一些新东西。 SequenceInputStream 本质上与我的 CatInputStream 相同,但使用传统的枚举而不是 LinkedList。 :-)
  • 作为对答案第一部分的破解,在一般情况下很难解决,但对于 FileInputStream 的特定情况(也许还有套接字?),您可以使用 instanceof/cast 并创建一个通道其中。 (其他流可以使用 Channels.newChannel 创建一致的接口,但不具备所需的非阻塞特性。)
  • Collections.enumeration 是你的朋友。我忘记了第一部分的一部分 - 将编辑。
  • 实际上 FileChannel 不是 SelectableChannel,尽管 SocketChannel 是。所以我相信你有点被文件困住了。直到 JDK7 中的“更多 NIO 功能”(可能)。
  • NIO2 英尺;它还将具有异步 I/O,这将使实现这个合并通道业务变得更加有趣(任何输入通道上的回调都会导致对合并通道的回调)。 :-)
【解决方案2】:

java.io.SequenceInputStream 可能是您需要的。它接受流的枚举,并会输出第一个流的内容,然后是第二个,依此类推,直到所有流都为空。

【讨论】:

    【解决方案3】:

    您可以编写一个自定义的InputStream 实现来执行此操作。示例:

    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Collections;
    import java.util.Deque;
    import java.util.LinkedList;
    
    public class CatInputStream extends InputStream {
        private final Deque<InputStream> streams;
    
        public CatInputStream(InputStream... streams) {
            this.streams = new LinkedList<InputStream>();
            Collections.addAll(this.streams, streams);
        }
    
        private void nextStream() throws IOException {
            streams.removeFirst().close();
        }
    
        @Override
        public int read() throws IOException {
            int result = -1;
            while (!streams.isEmpty()
                    && (result = streams.getFirst().read()) == -1) {
                nextStream();
            }
            return result;
        }
    
        @Override
        public int read(byte b[], int off, int len) throws IOException {
            int result = -1;
            while (!streams.isEmpty()
                    && (result = streams.getFirst().read(b, off, len)) == -1) {
                nextStream();
            }
            return result;
        }
    
        @Override
        public long skip(long n) throws IOException {
            long skipped = 0L;
            while (skipped < n && !streams.isEmpty()) {
                int thisSkip = streams.getFirst().skip(n - skipped);
                if (thisSkip > 0)
                    skipped += thisSkip;
                else
                    nextStream();
            }
            return skipped;
        }
    
        @Override
        public int available() throws IOException {
            return streams.isEmpty() ? 0 : streams.getFirst().available();
        }
    
        @Override
        public void close() throws IOException {
            while (!streams.isEmpty())
                nextStream();
        }
    }
    

    此代码未经测试,因此您的里程可能会有所不同。

    【讨论】:

    • 这不是和 Merzbow 建议的 SequenceInputStream 做同样的事情吗?
    • 抱歉,tackline 首先建议使用 SequenceInputStream(为此我为他 +1 了)。在 SO 中,最早的好答案获胜;你永远不知道后来的答案是不是抄袭。另外,请阅读我对 tackline 的评论,以比较 SequenceInputStream 和 CatInputStream (我确实同意他关于使用 Collections.enumeration 的观点)。
    • 如果因为我上次的评论而否决了我的答案的人,我很抱歉;我应该更好地解释一下:如果我写的答案结果是之前发布的答案的重复(或子集),我通常会删除它,因为我知道我永远不会得到任何分数。在 SO 上,它确实是“西方最快的枪”(搜索那个问题标题)。
    【解决方案4】:

    我想不到。您可能必须将两个流的内容读入一个 byte[],然后从中创建一个 ByteArrayInputStream。

    【讨论】:

    • 赞成一个简单、易于理解、可行的解决方案。如果阻塞很重要(或者它们很大),可能没有所需的行为。
    【解决方案5】:

    这是一个特定于字节数组的 MVar 实现(确保添加您自己的包定义)。从这里开始,在合并流上编写输入流是微不足道的。如果需要,我也可以发布。

    import java.nio.ByteBuffer;
    
    public final class MVar {
    
      private static enum State {
        EMPTY, ONE, MANY
      }
    
      private final Object lock;
    
      private State state;
    
      private byte b;
    
      private ByteBuffer bytes;
      private int length;
    
      public MVar() {
        lock = new Object();
        state = State.EMPTY;
      }
    
      public final void put(byte b) {
        synchronized (lock) {
          while (state != State.EMPTY) {
            try {
              lock.wait();
            } catch (InterruptedException e) {}
          }
          this.b = b;
          state = State.ONE;
          lock.notifyAll();
        }
      }
    
      public final void put(byte[] bytes, int offset, int length) {
        if (length == 0) {
          return;
        }
        synchronized (lock) {
          while (state != State.EMPTY) {
            try {
              lock.wait();
            } catch (InterruptedException e) {}
          }
          this.bytes = ByteBuffer.allocateDirect(length);
          this.bytes.put(bytes, offset, length);
          this.bytes.position(0);
          this.length = length;
          state = State.MANY;
          lock.notifyAll();
        }
      }
    
      public final byte take() {
        synchronized (lock) {
          while (state == State.EMPTY) {
            try {
              lock.wait();
            } catch (InterruptedException e) {}
          }
          switch (state) {
          case ONE: {
            state = State.EMPTY;
            byte b = this.b;
            lock.notifyAll();
            return b;
          }
          case MANY: {
            byte b = bytes.get();
            state = --length <= 0 ? State.EMPTY : State.MANY;
            lock.notifyAll();
            return b;
          }
          default:
            throw new AssertionError();
          }
        }
      }
    
      public final int take(byte[] bytes, int offset, int length) {
        if (length == 0) {
          return 0;
        }
        synchronized (lock) {
          while (state == State.EMPTY) {
            try {
              lock.wait();
            } catch (InterruptedException e) {}
          }
          switch (state) {
          case ONE:
            bytes[offset] = b;
            state = State.EMPTY;
            lock.notifyAll();
            return 1;
          case MANY:
            if (this.length > length) {
              this.bytes.get(bytes, offset, length);
              this.length = this.length - length;
              synchronized (lock) {
                lock.notifyAll();
              }
              return length;
            }
            this.bytes.get(bytes, offset, this.length);
            this.bytes = null;
            state = State.EMPTY;
            length = this.length;
            lock.notifyAll();
            return length;
          default:
            throw new AssertionError();
          }
        }
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2017-01-17
      • 2019-03-05
      • 2011-04-25
      • 2019-04-17
      • 2021-04-24
      • 1970-01-01
      • 2011-11-03
      • 2011-08-06
      • 2020-09-20
      相关资源
      最近更新 更多