【问题标题】:How to check a channel is closed or not without reading it?如何在不阅读的情况下检查频道是否关闭?
【发布时间】:2013-04-12 21:19:25
【问题描述】:

这是由@Jimt 编写的 Go 中工作人员和控制器模式的一个很好的示例,以回答 "Is there some elegant way to pause & resume any other goroutine in golang?"

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int)

        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }

    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()

    // Wait for all goroutines to finish.
    wg.Wait()
}

func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.

    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused {
                break
            }

            // Do actual work here.
        }
    }
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    for i := range workers {
        workers[i] <- Running
    }

    // Pause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Paused
    }

    // Unpause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Running
    }

    // Shutdown workers.
    <-time.After(1e9)
    for i := range workers {
        close(workers[i])
    }
}

但是这段代码也有一个问题:如果你想在worker()退出时删除workers中的一个工作通道,就会发生死锁。

如果你close(workers[i]),下次控制器写入它会导致恐慌,因为 go 无法写入关闭的通道。如果您使用一些互斥锁来保护它,那么它将卡在workers[i] &lt;- Running 上,因为worker 没有从通道读取任何内容并且写入将被阻塞,互斥锁将导致死锁。您也可以为通道提供更大的缓冲区作为解决方法,但这还不够好。

所以我认为解决这个问题的最好方法是worker()退出时关闭通道,如果控制器发现通道关闭,它将跳过它并且什么都不做。但是在这种情况下,我找不到如何检查通道是否已关闭。如果我尝试读取控制器中的通道,控制器可能被阻塞。所以我现在很困惑。

PS:恢复引发的恐慌是我尝试过的,但它会关闭引发恐慌的 goroutine。在这种情况下,它将是控制器,所以没有用。

不过,我认为 Go 团队在下一版本的 Go 中实现此功能很有用。

【问题讨论】:

  • 处理你的工人的状态!如果您关闭频道,则无需再次写入。
  • 在这里,我做了这个:github.com/atedja/go-tunnel.

标签: go channel


【解决方案1】:

如果您收听此频道,您总能发现该频道已关闭。

case state, opened := <-ws:
    if !opened {
         // channel was closed 
         // return or made some final work
    }
    switch state {
        case Stopped:

但请记住,您不能两次关闭一个频道。这会引起恐慌。

【讨论】:

  • 我说的是“没有仔细阅读”,-1 表示没有仔细阅读问题。
  • >PS:恢复引发的恐慌是我尝试过的,但它会关闭引发恐慌的 goroutine。在这种情况下,它将是控制器,所以没有用。你总是可以去 func(chan z){ defer func(){ //handle recover} close(z)}
  • 但是我要预留controller,close(z)会被worker而不是controller调用。
【解决方案2】:

通过恢复引发的恐慌,可以通过一种 hacky 方式为尝试写入的通道完成。但是如果不读取读取通道,则无法检查它是否已关闭。

你会的

  • 最终从中读取“真”值 (v &lt;- c)
  • 读取“真”值和“未关闭”指示符 (v, ok &lt;- c)
  • 读取零值和“关闭”指示器 (v, ok &lt;- c) (example)
  • 将在永久读取的频道中阻塞 (v &lt;- c)

从技术上讲,只有最后一个不会从频道中读取,但这没什么用。

【讨论】:

  • 恢复引发的恐慌是我尝试过的,但它会关闭引发恐慌的 goroutine。在这种情况下,它将是controller,所以没有用:)
  • 您也可以使用 unsafe 和反射包编写 hack 查看我的答案
  • v, ok &lt;- c这不起作用,除非c上有数据
【解决方案3】:

没有办法编写一个安全的应用程序,你需要知道一个通​​道是否打开而不与它交互。

完成您想做的事情的最佳方式是使用两个渠道 - 一个用于工作,一个用于表示更改状态的愿望(以及该状态更改的完成,如果这很重要)。

频道很便宜。复杂的设计重载语义不是。

[也]

<-time.After(1e9)

是一种非常令人困惑且不明显的写作方式

time.Sleep(time.Second)

保持简单,每个人(包括您)都能理解。

【讨论】:

  • 嗯,我是 GO 新手,但我认为 time.Sleep 会阻塞主线程,而
【解决方案4】:

首先检查频道是否有元素更容易,这将确保频道处于活动状态。

func isChanClosed(ch chan interface{}) bool {
    if len(ch) == 0 {
        select {
        case _, ok := <-ch:
            return !ok
        }
    }
    return false 
}

【讨论】:

  • 作为mentioned by Dustin,没有办法安全地做到这一点。当你进入你的if 身体时,len(ch) 可能是任何东西。 (例如,在您的选择尝试读取之前,另一个核心上的 goroutine 会向通道发送一个值)。
【解决方案5】:

我知道这个答案来晚了,我已经写了这个解决方案,Hacking Go run-time,它不安全,它可能会崩溃:

import (
    "unsafe"
    "reflect"
)


func isChanClosed(ch interface{}) bool {
    if reflect.TypeOf(ch).Kind() != reflect.Chan {
        panic("only channels!")
    }
    
    // get interface value pointer, from cgo_export 
    // typedef struct { void *t; void *v; } GoInterface;
    // then get channel real pointer
    cptr := *(*uintptr)(unsafe.Pointer(
        unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
    ))
    
    // this function will return true if chan.closed > 0
    // see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go 
    // type hchan struct {
    // qcount   uint           // total data in the queue
    // dataqsiz uint           // size of the circular queue
    // buf      unsafe.Pointer // points to an array of dataqsiz elements
    // elemsize uint16
    // closed   uint32
    // **
    
    cptr += unsafe.Sizeof(uint(0))*2
    cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
    cptr += unsafe.Sizeof(uint16(0))
    return *(*uint32)(unsafe.Pointer(cptr)) > 0
}

【讨论】:

  • go vet 在最后一行 return *(*uint32)(unsafe.Pointer(cptr)) &gt; 0cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0))) 上返回“可能误用 unsafe.Pointer”,有没有在这些行中没有 unsafe.Pointer 的选项?
  • 您需要在一个表达式中完成所有指针运算才能让 go vet 满意。这个解决方案是一个数据竞争而不是有效的 Go,你还必须至少读取用 atomic.LoadUint32 关闭。不过,无论哪种方式,这都是一个非常脆弱的 hack,如果 hchan 在 Go 版本之间发生变化,这将会中断。
  • 这可能很聪明,但感觉就像在另一个问题之上添加一个问题
  • 这会导致 go 1.16.6 出现恐慌在 return *(*uint32)(unsafe.Pointer(cptr)) &gt; 0 行获取错误消息 Panic: fatal error: checkptr: pointer arithmetic result points to invalid allocation
  • hchan 的关闭属性通过锁保护。在没有获得锁的情况下读取这个值是没有意义的。就像知道通道是否打开一样,除非您在保留此信息的同时保护任何关闭操作,否则这反过来会首先为您提供有关通道是否打开的知识。不过,我很欣赏黑客攻击!
【解决方案6】:

来自文档:

可以使用内置函数关闭通道。接收操作符的多值赋值形式报告在通道关闭之前是否发送了接收值。

https://golang.org/ref/spec#Receive_operator

Golang in Action 的示例显示了这种情况:

// This sample program demonstrates how to use an unbuffered
// channel to simulate a game of tennis between two goroutines.
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// wg is used to wait for the program to finish.
var wg sync.WaitGroup

func init() {
    rand.Seed(time.Now().UnixNano())
}

// main is the entry point for all Go programs.
func main() {
    // Create an unbuffered channel.
    court := make(chan int)
    // Add a count of two, one for each goroutine.
    wg.Add(2)
    // Launch two players.
    go player("Nadal", court)
    go player("Djokovic", court)
    // Start the set.
    court <- 1
    // Wait for the game to finish.
    wg.Wait()
}

// player simulates a person playing the game of tennis.
func player(name string, court chan int) {
    // Schedule the call to Done to tell main we are done.
    defer wg.Done()
    for {
        // Wait for the ball to be hit back to us.
        ball, ok := <-court
        fmt.Printf("ok %t\n", ok)
        if !ok {
            // If the channel was closed we won.
            fmt.Printf("Player %s Won\n", name)
            return
        }
        // Pick a random number and see if we miss the ball.
        n := rand.Intn(100)
        if n%13 == 0 {
            fmt.Printf("Player %s Missed\n", name)
            // Close the channel to signal we lost.
            close(court)
            return
        }

        // Display and then increment the hit count by one.
        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball++
        // Hit the ball back to the opposing player.
        court <- ball
    }
}

【讨论】:

  • 问题是如何在不读取通道的情况下检查关闭状态,即在写入之前。
【解决方案7】:

好吧,你可以用default分支来检测它,因为会选择一个关闭的通道,例如:下面的代码会选择default,channel,channel,第一个选择不被阻塞.

func main() {
    ch := make(chan int)

    go func() {
        select {
        case <-ch:
            log.Printf("1.channel")
        default:
            log.Printf("1.default")
        }
        select {
        case <-ch:
            log.Printf("2.channel")
        }
        close(ch)
        select {
        case <-ch:
            log.Printf("3.channel")
        default:
            log.Printf("3.default")
        }
    }()
    time.Sleep(time.Second)
    ch <- 1
    time.Sleep(time.Second)
}

打印

2018/05/24 08:00:00 1.default
2018/05/24 08:00:01 2.channel
2018/05/24 08:00:01 3.channel

注意,请参阅@Angad 在此答案下的评论:

如果您使用的是缓冲通道并且它包含 未读数据

【讨论】:

  • 这个解决方案存在一个问题(以及写得很好的go101.org/article/channel-closing.html,它提出了一个类似的解决方案) - 如果您使用缓冲通道并且它包含未读,则它不起作用数据
  • @Angad 确实,这不是检测封闭通道的完美解决方案。是检测读取通道是否阻塞的完美解决方案。 (即如果读取通道会阻塞,则我们知道它没有关闭;如果读取通道不会阻塞,我们知道它可能已关闭)。
  • 1/ 如其他 cmets 所述,此答案是正确的。 2/ 最后的编辑是有问题的,当然It doesn't work if you're using a Buffered Channel and it contains unread data,如果channel里面还有数据,还没有关闭读端,只有写端。
【解决方案8】:

除了关闭频道之外,您还可以将频道设置为 nil。这样你就可以检查它是否为零。

操场上的例子: https://play.golang.org/p/v0f3d4DisCz

编辑: 这实际上是一个糟糕的解决方案,如下一个示例所示, 因为在函数中将通道设置为 nil 会破坏它: https://play.golang.org/p/YVE2-LV9TOp

【讨论】:

  • 按地址传递通道(或在按地址传递的结构中)
【解决方案9】:

我在多个并发 goroutine 中经常遇到这个问题。

这可能是也可能不是一个好的模式,但我为我的工人定义了一个结构,其中包含工人状态的退出通道和字段:

type Worker struct {
    data chan struct
    quit chan bool
    stopped bool
}

然后你可以让控制器为工人调用停止函数:

func (w *Worker) Stop() {
    w.quit <- true
    w.stopped = true
}

func (w *Worker) eventloop() {
    for {
        if w.Stopped {
            return
        }
        select {
            case d := <-w.data:
                //DO something
                if w.Stopped {
                    return
                }
            case <-w.quit:
                return
        }
    }
}

这为您提供了一种很好的方法来让您的工作人员完全停止,而不会出现任何挂起或产生错误,这在容器中运行时尤其有用。

【讨论】:

  • 这不会引入竞争条件吗?我的意思是,如果你写信给w.quit,但在这种情况下不从那个频道读取,它会阻塞,stopped 也不会变成true
【解决方案10】:
ch1 := make(chan int)
ch2 := make(chan int)
go func(){
    for i:=0; i<10; i++{
        ch1 <- i
    }
    close(ch1)
}()
go func(){
    for i:=10; i<15; i++{
        ch2 <- i
    }
    close(ch2)
}()
ok1, ok2 := false, false
v := 0
for{
    ok1, ok2 = true, true
    select{
        case v,ok1 = <-ch1:
        if ok1 {fmt.Println(v)}
        default:
    }
    select{
        case v,ok2 = <-ch2:
        if ok2 {fmt.Println(v)}
        default:
    }
    if !ok1 && !ok2{return}
    
}

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-04-10
    • 2010-12-04
    • 2016-05-18
    • 1970-01-01
    • 1970-01-01
    • 2018-09-22
    • 1970-01-01
    相关资源
    最近更新 更多