【问题标题】:Tee the InputStream from a launched process in Java/Kotlin从 Java/Kotlin 中启动的进程中打开 InputStream
【发布时间】:2018-08-06 15:58:50
【问题描述】:

我正在使用 ProcessBuilder 启动一个进程,如下所示:

val pb = ProcessBuilder("/path/to/process")
pb.redirectErrorStream(true)
val proc = pb.start()

我想用进程的标准输出做两件事:

  1. 持续监控其最近的输出线
  2. 将所有行记录到文件中

据我所知,为了完成这两件事,我需要“拆分”从proc.inputStream 获得的 InputStream,以便每一行都镜像到另外两个 InputStream:一个可以使用记录到一个文件,另一个用于解析和监视进程的状态。

一种选择是让一个从 InputStream 读取的线程触发一个事件,每行读取到“订阅者”,我认为这应该可以正常工作,但我希望提出一个更通用的“Tee”类型功能将公开 InputStreams 以供任何人使用。基本上是这样的:

val pb = ProcessBuilder("/path/to/process")
pb.redirectErrorStream(true)
val proc = pb.start()
val originalInputStream = proc.inputStream

val tee = Tee(originalInputStream)
// Every line read from originalInputStream would be 
// mirrored to all branches (not necessarily every line 
// from the beginning of the originalInputStream, but 
// since the start of the lifetime of the created branch)
val branchOne: InputStream = tee.addBranch()
val branchTwo: InputStream = tee.addBranch()

我对@9​​87654324@ 类进行了拍摄,但我不确定在addBranch 方法中要做什么:

class Tee(inputStream: InputStream) {
    val reader = BufferedReader(InputStreamReader(inputStream))
    val branches = mutableListOf<OutputStream>()

    fun readLine() {
        val line = reader.readLine()
        branches.forEach {
            it.write(line.toByteArray())
        }
    }

    fun addBranch(): InputStream {
        // What to do here?  Need to create an OutputStream
        // which readLine can write to, but return an InputStream
        // which will be updated with each future write to that
        // OutputStream
    }
}

编辑:我最终得到的Tee 的实现如下:

/**
 * Reads from the given [InputStream] and mirrors the read
 * data to all of the created 'branches' off of it.
 * All branches will 'receive' all data from the original
 * [InputStream] starting at the the point of
 * the branch's creation.
 * NOTE: This class will not read from the given [InputStream]
 * automatically, its [read] must be invoked
 * to read the data from the original stream and write it to
 * the branches
 */
class Tee(inputStream: InputStream) {
    val reader = BufferedReader(InputStreamReader(inputStream))
    var branches = CopyOnWriteArrayList<OutputStream>()

    fun read() {
        val c = reader.read()

        branches.forEach {
            // Recreate the carriage return so that readLine on the
            // branched InputStreams works
            it.write(c)
        }
    }

    fun addBranch(): InputStream {
        val outputStream = PipedOutputStream()
        branches.add(outputStream)
        return PipedInputStream(outputStream)
    }
}

【问题讨论】:

    标签: java kotlin


    【解决方案1】:

    看看来自 Apache Commons 的 org.apache.commons.io.output.TeeInputStream,那么您就不必费心自己编写了。

    val pb = ProcessBuilder("/path/to/process")
    pb.redirectErrorStream(true)
    val proc = pb.start()
    val original = proc.inputStream
    
    val out = new PipedOutputStream()
    val in = new PipedInputStream()
    out.connect(in)
    
    val tee = new TeeInputStream(in, out)
    

    然后从tee而不是original读取,任何读取的字节也将被写出。通过使用管道流,写入的数据将可通过in 读取,因此现在您可以有两个线程分别从intee 读取。一个线程写入日志,一个线程监控行。

    【讨论】:

    • 我确实从 commons 中查看了 TeeInputStream,但实际上认为它不符合我的用例,因为它似乎只是原始流的单个代理,但我没有意识到其意图是继续阅读原版,所以很高兴知道!但这需要一个预先创建的 OutputStream,我不确定如何创建它以最终获得另一个 InputStream。我查看了 PipedInput/OutputStream,但我的理解是 out.connect(in) 只给你方向输出 -> 输入,而不是输入 -> 输出
    • 我仔细查看了 PipedInputStream/PipedOutputStream,这条路径最终将我带到了我需要的地方。谢谢@monkjack!
    【解决方案2】:

    看起来简单的装饰器对你来说就足够了:

    class Tee(private vararg val branches: OutputStream) : OutputStream() {
        override fun write(b: Int) {
            for (branch in branches) {
                branch.write(b)
            }
        }
    
        override fun write(b: ByteArray?) {
            for (branch in branches) {
                branch.write(b)
            }
        }
    
        override fun write(b: ByteArray?, off: Int, len: Int) {
            for (branch in branches) {
                branch.write(b,off, len)
            }
        }
    
        override fun flush() {
            for (branch in branches) {
                branch.flush()
            }
        }
    
        override fun close() {
            for (branch in branches) {
                branch.close()
            }
        }
    }
    

    然后你可以将你的输入流复制到 Tee,它在下面可以做任何事情——写入文件、解析输入等等。

    如果我理解正确,您需要逐行解析数据,因此您可以添加一个输出流的 else 实现,实际上,它将解析输入数据并执行您需要的操作。

    另外,请看一下这个answer。如果您不想处理多个输出流,这可能就是您所需要的。


    此外,我认为您可以将这两种技术结合起来获得更大的功能——例如,同时写入多个输出流并解析数据。

    【讨论】:

    • 您链接到的答案是我在问题中提到的解决方案类型......可行但我希望围绕InputStream 构建一些东西。不过,我不太听从你的回答,装饰师在哪里?它接受一个 OutputStream,但我不确定如何创建一个 OutputStream 以最终获得一个 InputStream 以供其他读者使用。
    • @bbaldino Tee 装饰其他输出流。这是正确的,如果你想解析输入 - 你必须在输入流上创建装饰器并创建监听器接口
    • 好的,我明白你说的装饰器是什么意思了。我知道如何使用侦听器接口来实现这一点,但我想知道是否有一种方法可以让消费者以InputStream 类型对其进行操作,就像原始流一样(与侦听器相反)界面)。
    • @bbaldino 对你来说打开simulationeuos输入流(即在线程中)并在那里做所有脏活不是更容易吗?然后只是加入线程?当然,OOTB 是不可能的——从流中读取的内容无法再次读取 :) 总有一个选择——使用 Channels。将部分读入 ByteBuffer,用它做你想做的事并阅读更多。它几乎是完美的,因为几乎为零开销,但需要一些自定义代码。
    • @asmodey 不幸的是我不认为我可以...... InputStream 是预先创建的(它来自Process 实例)。我查看了 Channels,但还没有找到获得所需内容的方法。部分问题是,由于 InputStream 是“无限的”,任何基于字节数组的东西都有一些问题,因为据我所知,来自字节数组的读者会复制并期望所有数据已经在那里。
    猜你喜欢
    • 2013-03-25
    • 2021-07-17
    • 1970-01-01
    • 2014-10-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-07
    • 1970-01-01
    相关资源
    最近更新 更多