【发布时间】:2015-03-21 15:21:40
【问题描述】:
我有一个 Iterable[String],我想将它流式传输到外部进程并返回一个 Iterable[String] 用于输出。
我觉得这应该可以在编译时工作
import scala.sys.process._
object PipeUtils {
implicit class IteratorStream(s: TraversableOnce[String]) {
def pipe(cmd: String) = s.toStream.#>(cmd).lines
def run(cmd: String) = s.toStream.#>(cmd).!
}
}
但是,Scala 尝试执行 s 的内容,而不是将它们传递给标准输入。谁能告诉我我做错了什么?
更新:
我认为我最初的问题是 s.toStream 被隐式转换为 ProcessBuilder 然后执行。这是不正确的,因为它是流程的输入。
我想出了以下解决方案。这感觉非常hacky和错误,但它现在似乎有效。我不会写这个作为答案,因为我觉得答案应该是一行而不是这个巨大的东西。
object PipeUtils {
/**
* This class feels wrong. I think that for the pipe command it actually loads all of the output
* into memory. This could blow up the machine if used wrong, however, I cannot figure out how to get it to
* work properly. Hopefully http://stackoverflow.com/questions/28095469/stream-input-to-external-process-in-scala
* will get some good responses.
* @param s
*/
implicit class IteratorStream(s: TraversableOnce[String]) {
val in = (in: OutputStream) => {
s.foreach(x => in.write((x + "\n").getBytes))
in.close
}
def pipe(cmd: String) = {
val output = ListBuffer[String]()
val io = new ProcessIO(in,
out => {Source.fromInputStream(out).getLines.foreach(output += _)},
err => {Source.fromInputStream(err).getLines.foreach(println)})
cmd.run(io).exitValue
output.toIterable
}
def run(cmd: String) = {
cmd.run(BasicIO.standard(in)).exitValue
}
}
}
编辑
这样做的动机来自于在 RDD 上使用 Spark 的 .pipe 函数。我希望在我的本地代码上具有完全相同的功能。
【问题讨论】:
-
关于从
s.toStream到 ProcessBuilder 的隐式转换是正确的。无论如何,def pipe(cmd: String): Stream[String] = (cmd +: s.toSeq).lineStream不也可以工作还是我错过了什么? -
如何将无限流作为输入?或者在现实世界的情况下,一个非常大的流太大而无法放入 Seq?
-
好的,我不清楚您的输入可能非常大。
-
您提供的解决方案也会导致本机程序失败。我认为您所说的将输入作为参数。输入不是参数,它是从标准输入读取的。想想'cat file.txt | cmd'
-
是的,你是对的,所以我显然错过了一些东西。感谢您的澄清。