【问题标题】:How to stop goroutine blocked by external I/O started for process?如何停止为进程启动的外部 I/O 阻塞的 goroutine?
【发布时间】:2017-09-12 04:50:38
【问题描述】:

我在这里遇到一个问题,我无法安全退出 goroutine。

我正在使用 exec.Command 创建一个外部进程(存储进程的 cmd、stdin 管道和 stdout 管道):

exec.Command(args[0], args[1]...) // args[0] is a base command

每当需要启动我正在调用的那个过程时:

cmd.Start()

然后在启动和附加时,我正在运行 2 个 goroutine:

shutdown := make(chan struct{})
// Run the routine which will read from process and send the data to CmdIn channel
go pr.cmdInRoutine()
// Run the routine which will read from CmdOut and write to process
go pr.cmdOutRoutine()

cmdInRoutine:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, "cmdInRoutine")

    for {
        println("CMDINROUTINE")
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdInRoutine down !!!")
            return
        default:
            println("Inside the for loop of the CmdInRoutine")
            if pr.stdOutPipe == nil {
                println("!!! Standard output pipe is nil. Sending Exit Request !!!")
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }

            buf := make([]byte, 2048)

            size, err := pr.stdOutPipe.Read(buf)
            if err != nil {
                println("!!! Sending exit request from cmdInRoutine !!!")
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }

            println("--- Received data for sending to CmdIn:", string(buf[:size]))
            pr.CmdIn <- buf[:size]
        }

    }
}

cmdOutRoutine:

func (pr *ExternalProcess) cmdOutRoutine() {
    app.At(te, "cmdOutRoutine")

    for {
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdOutRoutine down !!!")
            return
        case data := <-pr.CmdOut:
            println("Received data for sending to Process: ", data)
            if pr.stdInPipe == nil {
                println("!!! Standard input pipe is nil. Sending Exit Request !!!")
                pr.ProcessExit <- true
                return
            }

            println("--- Received input to write to external process:", string(data))
            _, err := pr.stdInPipe.Write(append(data, '\n'))
            if err != nil {
                println("!!! Couldn't Write To the std in pipe of the process !!!")
                pr.ProcessExit <- true
                return
            }
        }
    }
}

这里有有趣的案例:

1) 当进程在 cmdInRoutine 中发送 EOF 时(不要介意 pr.ProcessExit 我使用通道通知父处理程序以停止并退出进程) 我也关闭了关闭通道,这让 cmdOutRoutine 退出,因为在 select 语句中没有默认情况,因此它阻塞并等待退出或然后写入其中的数据正在运行的进程使用存储的 stdInPipe

2) 当我只想停止 goroutines 但让进程继续运行时,即暂停读写时,我正在关闭关闭通道,希望这两个 goroutines 将结束。
- cmdOutRoutine 打印 !!!关闭 cmdOutRoutine !!! 因为 select 没有默认情况并关闭关闭通道导致几乎立即返回
- cmdOutRoutine 不打印任何东西,我有一种奇怪的感觉,它甚至没有返回,我认为是因为 在从 stdInPipe 读取的默认情况下它被阻止.

我正在考虑在 for 循环之前在 cmdOutRoutine 内运行另一个 goroutine,并将进程的 stdIn 数据转换为通道,然后我就可以消除默认值cmdInRoutine 中的情况,但这会产生另一个问题,新的 goroutine 也必须停止,它仍然会被正在运行的进程的 stdIn 读取阻塞。

任何想法我该如何解决这个问题(修改逻辑)以满足随时关闭和启动 goroutines(进程 I/O)而不是正在运行的进程本身的需求?或者有没有办法避免阻塞读取和写入的调用,我还不知道?

非常感谢。

【问题讨论】:

  • 您有两个独立的例程正在运行,您应该使用两个独立的关闭通道。像你一样只有一个,你不知道哪个会先从通道收集关闭信号,让另一个阻塞。
  • 还要注意命令运行后不能重复使用golang.org/pkg/os/exec/#Cmd
  • @RayfenWindspear 可以使用单个关闭通道。从关闭的通道读取总是会立即返回零值,因此两个 goroutine 都会收集关闭信号。
  • 啊,是的,他正在使用close。我把pr.ProcessExitpr.shutdown 混淆了。不过,如果不使用 close,这一点仍然有效,但这里的情况并非如此。
  • 这是您建议的解决方案示例,但是是的,您仍然会遇到从 io 读取的 go 例程仍然不知道何时停止的情况。 groups.google.com/forum/#!topic/golang-nuts/lRA8S3bv9BM

标签: go pipe exec goroutine


【解决方案1】:

它可能在pr.stdOutPipe.Read(buf) 被阻止。您可以尝试关闭 pr.stdOutPipe,这应该会中断读取。

您也可以关闭pr.stdInPipe,以确保写入不会阻塞。

编辑:这将不允许您重新附加,但没有其他方法可以中断该读取。最好只让这两个 goroutine 在整个进程中运行,并在堆栈中的其他位置暂停(例如,如果您不想在暂停状态下接收命令的输出,请不要将 buf 写入 @987654326 @ - 但要小心避免竞争条件)。

在当前版本的 go 中关闭可能有问题:issue 6817

编辑结束

另外,请注意pr.CmdIn。如果关闭 stdOutPipe 不会导致 Read 返回错误,cmdInRoutine 将尝试写入通道。如果没有任何内容,cmdInRoutine 将永远阻塞。我会将pr.stdOutPipe.Read(buf) 移出选择,然后将pr.CmdIn &lt;- buf[:size] 作为另一种情况添加到选择中:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, "cmdInRoutine")

    // this check should probably only happen once.
    // if stdOutPipe can change during the loop,
    // then that's a race condition.
    if pr.stdOutPipe == nil {
        println("!!! Standard output pipe is nil. Sending Exit Request !!!")
        pr.ProcessExit <- true
        close(pr.shutdown)
        return
    }

    for {
        println("CMDINROUTINE")
        // we need to allocate a new buffer in each iteration,
        // because when we pass it through the channel,
        // we can no longer safely overwrite the data in it,
        // since the other goroutine might still be using it.
        buf := make([]byte, 2048)
        size, err := pr.stdOutPipe.Read(buf)
        if err != nil {
            println("!!! Sending exit request from cmdInRoutine !!!")
            // Be careful with this, if you also closed pr.shutdown when you closed stdOutPipe, then this is going to panic (closing a closed channel).
            pr.ProcessExit <- true
            close(pr.shutdown)
            return
        }

        // now that we have some data, try to send it,
        // unless we're done.
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdInRoutine down !!!")
            return
        case pr.CmdIn <- buf[:size]:
            println("--- Received data for sending to CmdIn:", string(buf[:size]))
        }
    }
}

【讨论】:

  • 我以为他不想关闭管道。他提到通过关闭 goroutine 来暂停 cmd,所以我认为关闭它们不是一种选择。我的两分钱。我喜欢放入通道的情况,好主意。
  • @user1431317 感谢您的回答。起初我在 for 循环之外有缓冲区,但后来数据(字节)与我认为是连续读取的混合。我不确定 buf[:] 的行为如何。我要试试这个,我会告诉你进展如何。
  • @user1431317 循环外的缓冲区导致连续读取的数据混合。对此我能做些什么吗?我不想在 for 循环中分配。谢谢。
  • 问题是你正在通过通道传递原始缓冲区,因此从通道读取的 goroutine 将使用相同的缓冲区,这可以被 cmdInRoutine 中的下一个读取覆盖。所以不幸的是你不能避免分配。
  • @user1431317 另一种方法是逐字节复制数据吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-22
相关资源
最近更新 更多