【问题标题】:How to "try send" to a channel, and abort if channel is full?如何“尝试发送”到频道,如果频道已满则中止?
【发布时间】:2014-01-06 19:04:33
【问题描述】:

我有一个经典的“生产者-消费者”问题的变体。在我的程序中,有 10 个生产者并行工作,他们的目标是总共生产 N 个产品。

我考虑过使用缓冲通道:

products := make([]int, 100) // In total, produce 100 products

// The producers
for i := 0; i < 10; i++ {
    go func() {
        products <- 1 // !!
    }()
}

但是,它不起作用:

  • goroutine 没有意识到目标已经达到,channel 发送块,函数永远不会返回。
  • if len(products) &lt; 100 { products &lt;- 1 } 不是原子操作,因此没有帮助。

那么还有其他方法吗?

【问题讨论】:

  • 我可以礼貌地建议您的术语在这里有点误导。生产者-消费者模式通常是指一个生产者和多个消费者,也称为观察者模式。你没有在这个意义上使用它。相反,您描述的是一种任务耕作模式,其中许多工人为单个控制器产生工作结果。
  • @Rick-777 谢谢,你是对的。

标签: go


【解决方案1】:

products := make([]int, 100) 切片,而不是chan。你想要:

products := make(chan int, 100)

如果你真的想做一个非阻塞发送,你可以使用select:

select {
case products <- 1:
default:
}

这将首先尝试发送产品,如果已满,则运行默认代码(无操作)并继续。

【讨论】:

  • 使用默认有问题。如果消费者在发送每个单独的数据包时必须对其进行大量工作,则可能会导致默认错误触发(特别是如果生产者在不同的线程中或消费者被运行时抢占)。此方法还依赖于在生产例程之前启动的消费者例程。
  • 默认值永远不会错误触发。此外,消费者程序需要在生产后开始,仅返回 100。我同意你的答案是最好的,但我不同意你为什么认为我的行不通。
  • 你是对的,我错过了你使用缓冲通道。默认方法在通道被缓冲时起作用。
  • 默认方法也适用于通道无缓冲时。来自Spec:如果一个或多个通信可以继续,则通过统一的伪随机选择选择一个可以继续的通信。否则,如果有默认情况,则选择该情况(强调我的)。它也用于Go by example
【解决方案2】:

最好的方法可能是在生产者中使用退出通道和选择语句。 Go 保证关闭的通道将始终注册为读取而不会阻塞。

Here's a working version on the playground

package main

import "fmt"
import "time"

func main() {
    productionChan := make(chan int)
    quit := make(chan struct{})
    for i := 0; i < 110; i++ {
        go produce(productionChan, quit)
    }

    consume(productionChan, quit)
    time.Sleep(5 * time.Second) // Just so we can observe the excess production channels quitting correctly
}

func consume(productionChan <-chan int, quit chan<- struct{}) {
    payload := make([]int, 100)

    for i := range payload {
        payload[i] = <-productionChan
    }

    close(quit)
    fmt.Println("Complete payload received, length of payload slice: ", len(payload))
}

func produce(productionChan chan<- int, quit <-chan struct{}) {
    select {
    case <-quit:
        fmt.Println("No need to produce, quitting!")
    case productionChan <- 1:
    }
}

这个想法是,单个消费者 goroutine 迭代所需大小的有效负载切片,在该切片被填充后,循环终止并关闭退出通道。所有生产者都在一条关于是从退出通道发送还是接收的选择语句中被阻止。当退出通道关闭时,使用该退出通道启动的每个生产者都将立即退出。

如果您有较少数量的生产者,每个都发送多个值,那么这个习惯用法也应该很容易修改。

【讨论】:

    【解决方案3】:

    免责声明:这根本不是惯用的 Go,我不建议在实践中实际使用此代码。那就是……

    Go 最近推出了reflect.Value.TryRecvreflect.Value.TrySend。他们完全符合您的要求。

    products := make(chan int, 100)
    
    for i := 0; i < 10; i++ {
        go func() {
            for {
                if !reflect.ValueOf(products).TrySend(1) {
                    return
                }
            }
        }()
    }
    

    查看go playground上运行的代码。

    【讨论】:

    • 哇老兄,这很深。
    • 除非绝对必要,否则不应使用反射。这比使用 select 效率低得多(请参阅我的答案)。但是,您可能真的想使用@Jsor 的答案。
    【解决方案4】:

    您可以使用select 尝试发送到缓冲通道:如果您选择发送并且通道缓冲区已满,您将到达default 案例:

    //this goroutine sends to the channel until it can't
    func f(c chan int, wg *sync.WaitGroup) {
    
        for i := 0; ; i++ {
            select {
            case c <- i: //sent successfully
                continue
            default:
                fmt.Println("Can't send, aborting!")
                wg.Done()
                return
            }
        }
    }
    
    func main() {
    
        c := make(chan int, 100)
        wg := sync.WaitGroup{}
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go f(c, &wg)
        }
    
        wg.Wait()
        fmt.Println("Done!")
    
    }
    

    这种方法的缺点是,如果消费者在您完成生产之前开始消费,您将进入无限循环。

    您也可以从消费者端关闭通道,导致生产者恐慌,并捕捉这种恐慌:

    func f(c chan int) {
    
        defer func() {
            _ = recover()
            fmt.Println("Done!")
        }()
    
        for i := 0; ; i++ {
            c <- i
        }
    }
    
    func main() {
    
        c := make(chan int)
    
        for i := 0; i < 10; i++ {
    
            go f(c)
        }
    
        n := 0
        for {
            <-c
            n++
            if n > 100 {
                close(c)
                break
            }
        }
    

    但是,如果您只是不想产生超过给定数量的项目,为什么不生成 N 个 goroutine,每个 goroutine 产生 1 个项目?还是产生 K 个 goroutine,每个都产生 N/K 个项目?最好提前知道这一点。

    【讨论】:

      猜你喜欢
      • 2019-01-12
      • 1970-01-01
      • 2019-08-01
      • 2020-04-14
      • 2019-05-11
      • 1970-01-01
      • 2022-01-10
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多