【问题标题】:Connecting an input stream to an outputstream将输入流连接到输出流
【发布时间】:2010-12-07 04:27:57
【问题描述】:

在 java9 中更新:https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

我看到了一些类似的,但不是我需要的线程。

我有一个服务器,它基本上会从客户端客户端 A 接收输入,并将其逐字节转发到另一个客户端客户端 B。

我想将客户端 A 的输入流与客户端 B 的输出流连接起来。这可能吗?有什么方法可以做到这一点?

此外,这些客户端正在相互发送消息,这些消息对时间有些敏感,因此无法进行缓冲。我不想要一个 500 的缓冲区,而客户端发送 499 个字节,然后我的服务器推迟转发 500 个字节,因为它没有收到最后一个字节来填充缓冲区。

现在,我正在解析每条消息以查找其长度,然后读取长度字节,然后转发它们。我认为(并测试)这比读取一个字节并一遍又一遍地转发一个字节要好,因为那会非常慢。出于我在上一段中所述的原因,我也不想使用缓冲区或计时器 - 我不希望仅仅因为缓冲区未满而等待很长时间才能通过的消息。

有什么好的方法可以做到这一点?

【问题讨论】:

    标签: java inputstream outputstream


    【解决方案1】:

    仅仅因为您使用缓冲区并不意味着流必须填充该缓冲区。换句话说,这应该没问题:

    public static void copyStream(InputStream input, OutputStream output)
        throws IOException
    {
        byte[] buffer = new byte[1024]; // Adjust if you want
        int bytesRead;
        while ((bytesRead = input.read(buffer)) != -1)
        {
            output.write(buffer, 0, bytesRead);
        }
    }
    

    这应该可以正常工作 - 基本上 read 调用将阻塞,直到有 一些 数据可用,但它不会等到 all 可用以填充缓冲区. (我想它可以,而且我相信FileInputStream 通常填充缓冲区,但附加到套接字的流更有可能立即为您提供数据。)

    我认为至少值得先尝试这个简单的解决方案。

    【讨论】:

    • 是的,我认为这可以解决问题。我想我对确实需要填充缓冲区的 readFully() 感到困惑。
    • 我已经尝试过您的代码,我还尝试通过读取消息的长度然后执行 byte[] buf = length; 来逐条读取消息inputstream.read(buf)....后一种方法更快,我不知道为什么。它似乎执行了更多的代码行,但速度更快。几乎快 2 倍。
    • @Zibbobz:任何数组大小都可以工作——它越大,需要的读取越少,但它在工作时占用的内存就越多。它不一定是流的实际长度。
    • @sgibly:好吧,鉴于close() 无论如何都会刷新它,我个人认为这不值得。当然,如果您采用这样的代码,您应该可以随意添加它:)
    • @sgibly:我会说它的文档记录很差,而不是 intent 是每个人都必须调用 flush...
    【解决方案2】:

    使用怎么样

    void feedInputToOutput(InputStream in, OutputStream out) {
       IOUtils.copy(in, out);
    }
    

    并完成它?

    来自 jakarta apache commons i/o 库,该库已被大量项目使用,因此您可能已经在类路径中拥有该 jar。

    【讨论】:

    • 或者只使用函数本身,因为不需要调用具有完全相同参数的另一个函数....
    • 是的,这就是我个人所做的。我想我只是输入了额外的方法名称作为文档,但它不是必需的。
    • 据我所知,此方法一直阻塞,直到通过 while 输入。因此,这应该在提问者的异步线程中完成。
    【解决方案3】:

    JDK 9 已为此功能添加了InputStream#transferTo(OutputStream out)

    【讨论】:

      【解决方案4】:

      为了完整起见,guava 也有一个 handy utility 用于此

      ByteStreams.copy(input, output);
      

      【讨论】:

        【解决方案5】:

        您可以使用循环缓冲区:

        代码

        // buffer all data in a circular buffer of infinite size
        CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE);
        class1.putDataOnOutputStream(cbb.getOutputStream());
        class2.processDataFromInputStream(cbb.getInputStream());
        


        Maven 依赖项

        <dependency>
            <groupId>org.ostermiller</groupId>
            <artifactId>utils</artifactId>
            <version>1.07.00</version>
        </dependency>
        


        模式详情

        http://ostermiller.org/utils/CircularBuffer.html

        【讨论】:

          【解决方案6】:

          异步方式来实现它。

          void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out) {
              Thread t = new Thread(new Runnable() {
          
                  public void run() {
                      try {
                          int d;
                          while ((d = inputStream.read()) != -1) {
                              out.write(d);
                          }
                      } catch (IOException ex) {
                          //TODO make a callback on exception.
                      }
                  }
              });
              t.setDaemon(true);
              t.start();
          }
          

          【讨论】:

          • 这是在不阻塞当前线程的情况下将数据从一个流传输到另一个流。
          【解决方案7】:

          BUFFER_SIZE 是要读取的卡盘大小。应该 > 1kb 且

          private static final int BUFFER_SIZE = 2 * 1024 * 1024;
          private void copy(InputStream input, OutputStream output) throws IOException {
              try {
                  byte[] buffer = new byte[BUFFER_SIZE];
                  int bytesRead = input.read(buffer);
                  while (bytesRead != -1) {
                      output.write(buffer, 0, bytesRead);
                      bytesRead = input.read(buffer);
                  }
              //If needed, close streams.
              } finally {
                  input.close();
                  output.close();
              }
          }
          

          【讨论】:

          • 应该远小于 10MB。这就是我们所说的 TCP。任何大于套接字接收缓冲区的大小都是完全没有意义的,它们以千字节而不是兆字节为单位。
          【解决方案8】:

          使用 org.apache.commons.io.IOUtils

          InputStream inStream = new ...
          OutputStream outStream = new ...
          IOUtils.copy(inStream, outStream);
          

          copyLarge 大小 >2GB

          【讨论】:

            【解决方案9】:

            这是一个干净且快速的 Scala 版本(没有 stackoverflow):

              import scala.annotation.tailrec
              import java.io._
            
              implicit class InputStreamOps(in: InputStream) {
                def >(out: OutputStream): Unit = pipeTo(out)
            
                def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize))
            
                @tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match {
                  case n if n > 0 =>
                    out.write(buffer, 0, n)
                    pipeTo(out, buffer)
                  case _ =>
                    in.close()
                    out.close()
                }
              }
            

            这可以使用&gt; 符号,例如inputstream &gt; outputstream 并传入自定义缓冲区/大小。

            【讨论】:

            • 你能提供一些类似的Java实现吗?
            • @Luchostein:我正在回复下面 George Pligor 的错误 Scala 回答
            【解决方案10】:

            如果您喜欢函数式,这是一个用 Scala 编写的函数,展示了如何仅使用 val(而不是 var)将输入流复制到输出流。

            def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int) {
              val buffer = new Array[Byte](bufferSize);
              def recurse() {
                val len = inputStream.read(buffer);
                if (len > 0) {
                  outputStream.write(buffer.take(len));
                  recurse();
                }
              }
              recurse();
            }
            

            请注意,不建议在可用内存很少的 java 应用程序中使用此方法,因为使用递归函数很容易出现堆栈溢出异常错误

            【讨论】:

            • -1:递归 Scala 解决方案如何与 Java 问题相关?
            • 方法recurse是尾递归的。如果您使用 @tailrec 对其进行注释,则不会出现堆栈溢出问题。
            • 这个答案验证了所有的纯java程序员都在承受着老板的压力,需要严肃的愤怒管理!
            猜你喜欢
            • 2012-09-06
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2012-08-15
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多