【问题标题】:Is it possible to read from a InputStream with a timeout?是否可以从超时的 InputStream 中读取?
【发布时间】:2015-10-16 00:30:42
【问题描述】:

具体来说,问题是这样写一个方法:

int maybeRead(InputStream in, long timeout)

如果数据在 'timeout' 毫秒内可用,则返回值与 in.read() 相同,否则返回 -2。在方法返回之前,所有衍生的线程都必须退出。

为了避免争论,这里的主题是 java.io.InputStream,由 Sun 记录(任何 Java 版本)。请注意,这并不像看起来那么简单。以下是 Sun 文档直接支持的一些事实。

  1. in.read() 方法可能是不可中断的。

  2. 将 InputStream 包装在 Reader 或 InterruptibleChannel 中没有帮助,因为所有这些类都可以调用 InputStream 的方法。如果可以使用这些类,就可以编写一个解决方案,直接在 InputStream 上执行相同的逻辑。

  3. in.available() 返回 0 总是可以接受的。

  4. in.close() 方法可能会阻塞或什么都不做。

  5. 没有杀死另一个线程的通用方法。

【问题讨论】:

    标签: java timeout inputstream


    【解决方案1】:

    受到this answer 的启发,我想出了一个更面向对象的解决方案。

    这仅在您打算读取字符时有效

    您可以覆盖 BufferedReader 并实现如下内容:

    public class SafeBufferedReader extends BufferedReader{
    
        private long millisTimeout;
    
        ( . . . )
    
        @Override
        public int read(char[] cbuf, int off, int len) throws IOException {
            try {
                waitReady();
            } catch(IllegalThreadStateException e) {
                return 0;
            }
            return super.read(cbuf, off, len);
        }
    
        protected void waitReady() throws IllegalThreadStateException, IOException {
            if(ready()) return;
            long timeout = System.currentTimeMillis() + millisTimeout;
            while(System.currentTimeMillis() < timeout) {
                if(ready()) return;
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    break; // Should restore flag
                }
            }
            if(ready()) return; // Just in case.
            throw new IllegalThreadStateException("Read timed out");
        }
    }
    

    这是一个几乎完整的示例。

    我在某些方法上返回 0,您应该将其更改为 -2 以满足您的需求,但我认为 0 更适合 BufferedReader 合同。没有发生任何错误,它只是读取了 0 个字符。 readLine 方法是一个可怕的性能杀手。 如果你真的想使用 readLine,你应该创建一个全新的 BufferedReader。现在,它不是线程安全的。如果有人在 readLines 等待一行的时候调用一个操作,会产生意想不到的结果

    我不喜欢在我所在的位置返回 -2。我会抛出一个异常,因为有些人可能只是在检查 int

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.Reader;
    import java.nio.CharBuffer;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Stream;
    
    /**
     * 
     * readLine
     * 
     * @author Dario
     *
     */
    public class SafeBufferedReader extends BufferedReader{
    
        private long millisTimeout;
    
        private long millisInterval = 100;
    
        private int lookAheadLine;
    
        public SafeBufferedReader(Reader in, int sz, long millisTimeout) {
            super(in, sz);
            this.millisTimeout = millisTimeout;
        }
    
        public SafeBufferedReader(Reader in, long millisTimeout) {
            super(in);
            this.millisTimeout = millisTimeout;
        }
    
    
    
        /**
         * This is probably going to kill readLine performance. You should study BufferedReader and completly override the method.
         * 
         * It should mark the position, then perform its normal operation in a nonblocking way, and if it reaches the timeout then reset position and throw IllegalThreadStateException
         * 
         */
        @Override
        public String readLine() throws IOException {
            try {
                waitReadyLine();
            } catch(IllegalThreadStateException e) {
                //return null; //Null usually means EOS here, so we can't.
                throw e;
            }
            return super.readLine();
        }
    
        @Override
        public int read() throws IOException {
            try {
                waitReady();
            } catch(IllegalThreadStateException e) {
                return -2; // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
            }
            return super.read();
        }
    
        @Override
        public int read(char[] cbuf) throws IOException {
            try {
                waitReady();
            } catch(IllegalThreadStateException e) {
                return -2;  // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
            }
            return super.read(cbuf);
        }
    
        @Override
        public int read(char[] cbuf, int off, int len) throws IOException {
            try {
                waitReady();
            } catch(IllegalThreadStateException e) {
                return 0;
            }
            return super.read(cbuf, off, len);
        }
    
        @Override
        public int read(CharBuffer target) throws IOException {
            try {
                waitReady();
            } catch(IllegalThreadStateException e) {
                return 0;
            }
            return super.read(target);
        }
    
        @Override
        public void mark(int readAheadLimit) throws IOException {
            super.mark(readAheadLimit);
        }
    
        @Override
        public Stream<String> lines() {
            return super.lines();
        }
    
        @Override
        public void reset() throws IOException {
            super.reset();
        }
    
        @Override
        public long skip(long n) throws IOException {
            return super.skip(n);
        }
    
        public long getMillisTimeout() {
            return millisTimeout;
        }
    
        public void setMillisTimeout(long millisTimeout) {
            this.millisTimeout = millisTimeout;
        }
    
        public void setTimeout(long timeout, TimeUnit unit) {
            this.millisTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
        }
    
        public long getMillisInterval() {
            return millisInterval;
        }
    
        public void setMillisInterval(long millisInterval) {
            this.millisInterval = millisInterval;
        }
    
        public void setInterval(long time, TimeUnit unit) {
            this.millisInterval = TimeUnit.MILLISECONDS.convert(time, unit);
        }
    
        /**
         * This is actually forcing us to read the buffer twice in order to determine a line is actually ready.
         * 
         * @throws IllegalThreadStateException
         * @throws IOException
         */
        protected void waitReadyLine() throws IllegalThreadStateException, IOException {
            long timeout = System.currentTimeMillis() + millisTimeout;
            waitReady();
    
            super.mark(lookAheadLine);
            try {
                while(System.currentTimeMillis() < timeout) {
                    while(ready()) {
                        int charInt = super.read();
                        if(charInt==-1) return; // EOS reached
                        char character = (char) charInt;
                        if(character == '\n' || character == '\r' ) return;
                    }
                    try {
                        Thread.sleep(millisInterval);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt(); // Restore flag
                        break;
                    }
                }
            } finally {
                super.reset();
            }
            throw new IllegalThreadStateException("readLine timed out");
    
        }
    
        protected void waitReady() throws IllegalThreadStateException, IOException {
            if(ready()) return;
            long timeout = System.currentTimeMillis() + millisTimeout;
            while(System.currentTimeMillis() < timeout) {
                if(ready()) return;
                try {
                    Thread.sleep(millisInterval);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // Restore flag
                    break;
                }
            }
            if(ready()) return; // Just in case.
            throw new IllegalThreadStateException("read timed out");
        }
    
    }
    

    【讨论】:

      【解决方案2】:

      如果您的 InputStream 由 Socket 支持,您可以使用 setSoTimeout 设置 Socket 超时(以毫秒为单位)。如果 read() 调用没有在指定的超时时间内解除阻塞,它将抛出 SocketTimeoutException。

      只需确保在调用 read() 之前调用 Socket 上的 setSoTimeout。

      【讨论】:

        【解决方案3】:

        使用 inputStream.available()

        System.in.available() 返回 0 总是可以接受的。

        我发现相反的情况 - 它总是返回可用字节数的最佳值。 InputStream.available() 的 Javadoc:

        Returns an estimate of the number of bytes that can be read (or skipped over) 
        from this input stream without blocking by the next invocation of a method for 
        this input stream.
        

        由于时间/过时,估算是不可避免的。这个数字可能是一次性的低估,因为新数据不断出现。然而,它总是在下一次呼叫时“赶上”——它应该考虑所有到达的数据,除了在新呼叫时到达的数据。有数据不满足上述条件时永久返回0。

        第一个警告:InputStream 的具体子类负责 available()

        InputStream 是一个抽象类。它没有数据源。拥有可用数据对它来说毫无意义。因此,available() 的 javadoc 也指出:

        The available method for class InputStream always returns 0.
        
        This method should be overridden by subclasses.
        

        确实,具体的输入流类确实会覆盖 available(),提供有意义的值,而不是常量 0。

        第二个警告:确保在 Windows 中输入时使用回车。

        如果使用System.in,您的程序仅在您的命令外壳移交时接收输入。如果您使用文件重定向/管道(例如 somefile > java myJavaApp 或 somecommand | java myJavaApp ),则输入数据通常会立即移交。但是,如果您手动键入输入,则可能会延迟数据交接。例如。使用 windows cmd.exe shell,数据在 cmd.exe shell 中缓冲。数据仅在回车后(control-m 或&lt;enter&gt;)传递给正在执行的java 程序。这是执行环境的限制。当然,只要 shell 缓冲数据, InputStream.available() 就会返回 0 - 这是正确的行为;那时没有可用的数据。一旦数据从 shell 中可用,该方法就会返回一个 > 0 的值。注意:Cygwin 也使用 cmd.exe。

        最简单的解决方案(没有阻塞,所以不需要超时)

        就用这个吧:

            byte[] inputData = new byte[1024];
            int result = is.read(inputData, 0, is.available());  
            // result will indicate number of bytes read; -1 for EOF with no data read.
        

        或等价地,

            BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
            // ...
                 // inside some iteration / processing logic:
                 if (br.ready()) {
                     int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
                 }
        

        更丰富的解决方案(在超时时间内最大限度地填充缓冲区)

        声明:

        public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
             throws IOException  {
             int bufferOffset = 0;
             long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
             while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
                 int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
                 // can alternatively use bufferedReader, guarded by isReady():
                 int readResult = is.read(b, bufferOffset, readLength);
                 if (readResult == -1) break;
                 bufferOffset += readResult;
             }
             return bufferOffset;
         }
        

        然后使用这个:

            byte[] inputData = new byte[1024];
            int readCount = readInputStreamWithTimeout(System.in, inputData, 6000);  // 6 second timeout
            // readCount will indicate number of bytes read; -1 for EOF with no data read.
        

        【讨论】:

        • 如果is.available() &gt; 1024 这个建议会失败。肯定有返回零的流。 SSLSockets 例如直到最近。你不能依赖这个。
        • 'is.available() > 1024'的情况通过readLength专门处理。
        • 评论 SSLSockets 不正确 - 如果缓冲区中没有数据,则返回 0 表示可用。根据我的回答。 Javadoc:“如果套接字上没有缓冲字节,并且套接字没有使用 close 关闭,那么 available 将返回 0。”
        • @GlenBest 我对 SSLSocket 的评论不正确。 直到最近 [我的重点] 它过去一直返回零。你说的是现在。我说的是 JSSE 的整个历史,在它首次包含在 Java 1.4 in 2002 之前我就一直在使用它。
        • 这其实不是一个好的答案。 1) 如前所述,available() 可能返回 0,具体取决于 JVM、版本、操作系统、实现。 2) 如果您试图访问错误的文件,任何 read() 调用可能永远不会返回(或者至少不会在合适的超时时间内,有些是 10 分钟)。所以使用这个解决方案是个坏主意。如果写得好,伊恩琼斯的答案会更好,可读性更好。
        【解决方案4】:

        假设您的流没有套接字支持(因此您不能使用Socket.setSoTimeout()),我认为解决此类问题的标准方法是使用 Future。

        假设我有以下执行器和流:

            ExecutorService executor = Executors.newFixedThreadPool(2);
            final PipedOutputStream outputStream = new PipedOutputStream();
            final PipedInputStream inputStream = new PipedInputStream(outputStream);
        

        我有写入器写入一些数据,然后等待 5 秒,然后再写入最后一条数据并关闭流:

            Runnable writeTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        outputStream.write(1);
                        outputStream.write(2);
                        Thread.sleep(5000);
                        outputStream.write(3);
                        outputStream.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            executor.submit(writeTask);
        

        正常的阅读方式如下。读取将无限期阻塞数据,因此在 5 秒内完成:

            long start = currentTimeMillis();
            int readByte = 1;
            // Read data without timeout
            while (readByte >= 0) {
                readByte = inputStream.read();
                if (readByte >= 0)
                    System.out.println("Read: " + readByte);
            }
            System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");
        

        哪个输出:

        Read: 1
        Read: 2
        Read: 3
        Complete in 5001ms
        

        如果有更根本的问题,比如作者没有回应,读者会永远阻塞。 如果我在将来包装读取,则可以按如下方式控制超时:

            int readByte = 1;
            // Read data with timeout
            Callable<Integer> readTask = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return inputStream.read();
                }
            };
            while (readByte >= 0) {
                Future<Integer> future = executor.submit(readTask);
                readByte = future.get(1000, TimeUnit.MILLISECONDS);
                if (readByte >= 0)
                    System.out.println("Read: " + readByte);
            }
        

        哪个输出:

        Read: 1
        Read: 2
        Exception in thread "main" java.util.concurrent.TimeoutException
            at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
            at java.util.concurrent.FutureTask.get(FutureTask.java:91)
            at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)
        

        我可以捕获 TimeoutException 并进行任何我想要的清理工作。

        【讨论】:

        • 但是阻塞线程呢?!它会留在内存中直到应用程序终止吗?如果我是正确的,这可能会产生无尽的线程,应用程序负载很重,甚至更多,阻止更多线程使用你的线程池,它的线程被占用和阻塞。如果我错了,请纠正我。谢谢。
        • Muhammad Gelbana,你是对的:阻塞的 read() 线程一直在运行,这是不行的。我找到了一种方法来防止这种情况发生:当超时发生时,从调用线程关闭输入流(在我的情况下,我关闭输入流来自的 android 蓝牙套接字)。当您这样做时, read() 调用将立即返回。在我的情况下,我使用 int read(byte[]) 重载,并且该调用立即返回。也许 int read() 重载会抛出 IOException,因为我不知道它会返回什么......在我看来这是正确的解决方案。
        • -1 因为线程读取保持阻塞,直到应用程序终止。
        • @ortang 这就是我所说的“捕获 TimeoutException 并进行任何清理......”例如我可能想杀死阅读线程: ... catch (TimeoutException e) { executor .shutdownNow(); }
        • executer.shutdownNow 不会杀死线程。它会尝试打断它,但没有任何效果。无法进行清理,这是一个严重的问题。
        【解决方案5】:

        我会质疑问题陈述,而不是盲目地接受它。您只需要来自控制台或网络的超时。如果是后者,你有 Socket.setSoTimeout()HttpURLConnection.setReadTimeout() 两者都完全符合要求,只要你在构造/获取它们时正确设置它们。当您只有 InputStream 时,将其留在应用程序中的任意位置,这是糟糕的设计,导致实现非常尴尬。

        【讨论】:

        • 在其他情况下,读取可能会阻塞很长时间;例如从磁带驱动器、远程安装的网络驱动器或后端带有磁带机械手的 HFS 读取时。 (但你的答案的主旨是正确的。)
        • @StephenC +1 为您提供评论和示例。要添加更多示例,一个简单的情况可能是正确建立了套接字连接但读取尝试被阻止,因为要从 DB 获取数据但不知何故没有发生(假设 DB 没有响应并且查询去了处于锁定状态)。在这种情况下,您需要有一种方法来显式超时套接字上的读取操作。
        • InputStream 抽象的全部意义在于不考虑底层实现。争论发布答案的利弊是公平的。但是,质疑问题陈述对讨论没有帮助
        • InputStream 在流上工作并阻塞,但它不提供超时机制。所以 InputStream 抽象不是一个经过恰当设计的抽象。因此,要求一种在流上超时的方法并不需要太多。所以问题是寻求一个非常实际问题的解决方案。大多数底层实现都会阻塞。这就是流的本质。如果流的另一端没有准备好新数据,套接字、文件、管道将阻塞。
        • @EJP。我不知道你是怎么得到的。我不同意你的看法。问题陈述“如何在 InputStream 上超时”是有效的。由于框架没有提供超时的方法,所以问这样的问题是合适的。
        【解决方案6】:

        这是一种从 System.in 获取 NIO FileChannel 并使用超时检查数据可用性的方法,这是问题中描述的问题的一种特殊情况。在控制台运行它,不要输入任何输入,然后等待结果。在 Windows 和 Linux 上的 Java 6 下测试成功。

        import java.io.FileInputStream;
        import java.io.FilterInputStream;
        import java.io.IOException;
        import java.io.InputStream;
        import java.lang.reflect.Field;
        import java.nio.ByteBuffer;
        import java.nio.channels.ClosedByInterruptException;
        
        public class Main {
        
            static final ByteBuffer buf = ByteBuffer.allocate(4096);
        
            public static void main(String[] args) {
        
                long timeout = 1000 * 5;
        
                try {
                    InputStream in = extract(System.in);
                    if (! (in instanceof FileInputStream))
                        throw new RuntimeException(
                                "Could not extract a FileInputStream from STDIN.");
        
                    try {
                        int ret = maybeAvailable((FileInputStream)in, timeout);
                        System.out.println(
                                Integer.toString(ret) + " bytes were read.");
        
                    } finally {
                        in.close();
                    }
        
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
        
            }
        
            /* unravels all layers of FilterInputStream wrappers to get to the
             * core InputStream
             */
            public static InputStream extract(InputStream in)
                    throws NoSuchFieldException, IllegalAccessException {
        
                Field f = FilterInputStream.class.getDeclaredField("in");
                f.setAccessible(true);
        
                while( in instanceof FilterInputStream )
                    in = (InputStream)f.get((FilterInputStream)in);
        
                return in;
            }
        
            /* Returns the number of bytes which could be read from the stream,
             * timing out after the specified number of milliseconds.
             * Returns 0 on timeout (because no bytes could be read)
             * and -1 for end of stream.
             */
            public static int maybeAvailable(final FileInputStream in, long timeout)
                    throws IOException, InterruptedException {
        
                final int[] dataReady = {0};
                final IOException[] maybeException = {null};
                final Thread reader = new Thread() {
                    public void run() {                
                        try {
                            dataReady[0] = in.getChannel().read(buf);
                        } catch (ClosedByInterruptException e) {
                            System.err.println("Reader interrupted.");
                        } catch (IOException e) {
                            maybeException[0] = e;
                        }
                    }
                };
        
                Thread interruptor = new Thread() {
                    public void run() {
                        reader.interrupt();
                    }
                };
        
                reader.start();
                for(;;) {
        
                    reader.join(timeout);
                    if (!reader.isAlive())
                        break;
        
                    interruptor.start();
                    interruptor.join(1000);
                    reader.join(1000);
                    if (!reader.isAlive())
                        break;
        
                    System.err.println("We're hung");
                    System.exit(1);
                }
        
                if ( maybeException[0] != null )
                    throw maybeException[0];
        
                return dataReady[0];
            }
        }
        

        有趣的是,当在 NetBeans 6.5 中而不是在控制台上运行程序时,超时根本不起作用,实际上需要调用 System.exit() 来杀死僵尸线程。发生的情况是中断器线程在调用 reader.interrupt() 时阻塞 (!)。另一个测试程序(此处未显示)另外尝试关闭通道,但这也不起作用。

        【讨论】:

        • 不适用于 mac os,既不适用于 JDK 1.6,也不适用于 JDK 1.7。只有在读取过程中按回车后才能识别中断。
        【解决方案7】:

        正如 jt 所说,NIO 是最好的(也是正确的)解决方案。如果你真的被 InputStream 困住了,你也可以

        1. 派生一个线程,该线程的专有工作是从 InputStream 中读取数据并将结果放入缓冲区中,该缓冲区可以从原始线程中读取而不会阻塞。如果您只有一个流实例,这应该可以很好地工作。否则,您可以使用 Thread 类中已弃用的方法终止线程,但这可能会导致资源泄漏。

        2. 依靠 isAvailable 来表示可以无阻塞读取的数据。但是,在某些情况下(例如使用 Sockets),isAvailable 可能需要进行潜在的阻塞读取才能报告 0 以外的内容。

        【讨论】:

        • Socket.setSoTimeout() 是一个同样正确且简单得多的解决方案。或HttpURLConnection.setReadTimeout().
        • @EJP - 这些只是在某些情况下“同样正确”;例如如果输入流是套接字流/HTTP 连接流。
        • @Stephen C NIO 仅在相同情况下是非阻塞和可选的。例如,没有非阻塞文件 I/O。
        • @EJP 但是有非阻塞管道 IO(System.in),文件(本地磁盘上)的非阻塞 I/O 是无稽之谈
        • @EJP 在大多数(全部?)Unices System.in 实际上是一个管道(如果你没有告诉 shell 用文件替换它)并且作为一个管道它可以是非阻塞的。
        【解决方案8】:

        我没有使用 Java NIO 包中的类,但似乎它们在这里可能会有所帮助。具体来说,java.nio.channels.Channelsjava.nio.channels.InterruptibleChannel

        【讨论】:

        • +1:我不相信有一种可靠的方法可以单独使用 InputStream 来完成 OP 的要求。然而,nio 就是为此目的而创建的。
        • OP已经基本排除了这个。 InputStream 本质上是阻塞的,并且可能是不可中断的。
        猜你喜欢
        • 2010-10-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-01-02
        • 2012-03-03
        相关资源
        最近更新 更多