【问题标题】:Stream input to external process in Scala在Scala中将输入流式传输到外部进程
【发布时间】:2015-03-21 15:21:40
【问题描述】:

我有一个 Iterable[String],我想将它流式传输到外部进程并返回一个 Iterable[String] 用于输出。

我觉得这应该可以在编译时工作

import scala.sys.process._

object PipeUtils {
  implicit class IteratorStream(s: TraversableOnce[String]) {
    def pipe(cmd: String) = s.toStream.#>(cmd).lines
    def run(cmd: String) = s.toStream.#>(cmd).!
  }
}

但是,Scala 尝试执行 s 的内容,而不是将它们传递给标准输入。谁能告诉我我做错了什么?

更新:

我认为我最初的问题是 s.toStream 被隐式转换为 ProcessBuilder 然后执行。这是不正确的,因为它是流程的输入。

我想出了以下解决方案。这感觉非常hacky和错误,但它现在似乎有效。我不会写这个作为答案,因为我觉得答案应该是一行而不是这个巨大的东西。

object PipeUtils {

  /**
   * This class feels wrong.  I think that for the pipe command it actually loads all of the output
   * into memory.  This could blow up the machine if used wrong, however, I cannot figure out how to get it to
   * work properly.  Hopefully http://stackoverflow.com/questions/28095469/stream-input-to-external-process-in-scala
   * will get some good responses.
   * @param s
   */
  implicit class IteratorStream(s: TraversableOnce[String]) {

    val in = (in: OutputStream) => {
      s.foreach(x => in.write((x + "\n").getBytes))
      in.close
    }

    def pipe(cmd: String) = {
      val output = ListBuffer[String]()
      val io = new ProcessIO(in,
      out => {Source.fromInputStream(out).getLines.foreach(output += _)},
      err => {Source.fromInputStream(err).getLines.foreach(println)})

      cmd.run(io).exitValue
      output.toIterable
    }

    def run(cmd: String) = {
      cmd.run(BasicIO.standard(in)).exitValue
    }
  }
}

编辑

这样做的动机来自于在 RDD 上使用 Spark 的 .pipe 函数。我希望在我的本地代码上具有完全相同的功能。

【问题讨论】:

  • 关于从s.toStream 到 ProcessBuilder 的隐式转换是正确的。无论如何,def pipe(cmd: String): Stream[String] = (cmd +: s.toSeq).lineStream 不也可以工作还是我错过了什么?
  • 如何将无限流作为输入?或者在现实世界的情况下,一个非常大的流太大而无法放入 Seq?
  • 好的,我不清楚您的输入可能非常大。
  • 您提供的解决方案也会导致本机程序失败。我认为您所说的将输入作为参数。输入不是参数,它是从标准输入读取的。想想'cat file.txt | cmd'
  • 是的,你是对的,所以我显然错过了一些东西。感谢您的澄清。

标签: scala streaming


【解决方案1】:

假设 scala 2.11+,您应该按照 @edi 的建议使用 lineStream。原因是当它可用时,您会得到一个流式响应,而不是批处理响应。假设我有一个 shell 脚本 echo-sleep.sh:

#/usr/bin/env bash
# echo-sleep.sh
while read line; do echo $line; sleep 1; done

我们想使用如下代码从 scala 中调用它:

import scala.sys.process._
import scala.language.postfixOps
import java.io.ByteArrayInputStream

implicit class X(in: TraversableOnce[String]) {
  // Don't do the BAOS construction in real code.  Just for illustration.
  def pipe(cmd: String) = 
    cmd #< new ByteArrayInputStream(in.mkString("\n").getBytes) lineStream
}

那么如果我们做一个最终的调用,比如:

1 to 10 map (_.toString) pipe "echo-sleep.sh" foreach println

序列中的一个数字每 1 秒出现在 STDOUT 上。如果您像示例中那样缓冲并转换为Iterable,您将失去这种响应能力。

【讨论】:

  • 当输入非常大时这将如何工作?这是流式输入吗?
  • 另外,我知道你建议不要真正使用BOAS,那么你将如何真正做到这一点?
【解决方案2】:

这是一个演示如何编写流程代码的解决方案,以便它同时传输输入和输出。关键是产生一个java.io.PipedInputStream,它被传递给进程的输入。此流通过java.io.PipedOutputStream 从迭代器异步填充。显然,随意将隐式类的输入类型更改为Iterable

这是一个用来展示这个作品的迭代器。

/**
 * An iterator with pauses used to illustrate data streaming to the process to be run.
 */
class PausingIterator[A](zero: A, until: A, pauseMs: Int)(subsequent: A => A) 
extends Iterator[A] {
  private[this] var current = zero
  def hasNext = current != until
  def next(): A = {
    if (!hasNext) throw new NoSuchElementException
    val r = current
    current = subsequent(current)
    Thread.sleep(pauseMs)
    r
  }
}

这是你想要的实际代码

import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.PrintWriter

// For process stuff
import scala.sys.process._
import scala.language.postfixOps

// For asynchronous stream writing.
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

/**
 * A streaming version of the original class.  This does not block to wait for the entire 
 * input or output to be constructed.  This allows the process to get data ASAP and allows 
 * the process to return information back to the scala environment ASAP.  
 *
 * NOTE: Don't forget about error handling in the final production code.
 */
implicit class X(it: Iterator[String]) {
  def pipe(cmd: String) = cmd #< iter2is(it) lineStream

  /**
   * Convert an iterator to an InputStream for use in the pipe function.
   * @param it an iterator to convert
   */
  private[this] def iter2is[A](it: Iterator[A]): InputStream = {
    // What is written to the output stream will appear in the input stream.
    val pos = new PipedOutputStream
    val pis = new PipedInputStream(pos)
    val w = new PrintWriter(pos, true)

    // Scala 2.11 (scala 2.10, use 'future').  Executes asynchrously.  
    // Fill the stream, then close.
    Future {
      it foreach w.println
      w.close
    }

    // Return possibly before pis is fully written to.
    pis
  }
}

最终调用将显​​示 0 到 9,并在显示每个数字之间暂停 3 秒(scala 端的第二个暂停,shell 脚本端的 1 秒暂停)。

// echo-sleep.sh is the same script as in my previous post
new PausingIterator(0, 10, 2000)(_ + 1)
  .map(_.toString)
  .pipe("echo-sleep.sh")
  .foreach(println)

输出

0          [ pause 3 secs ]
1          [ pause 3 secs ]
...
8          [ pause 3 secs ]
9          [ pause 3 secs ]

【讨论】:

  • 按照这个例子,我们成功地使流式 IO 变得非常高效。关键是从另一个线程读取,否则 lineStream 是一个阻塞调用
猜你喜欢
  • 2010-12-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-13
  • 2021-05-18
相关资源
最近更新 更多