【问题标题】:How to send message to multiple channels in go [duplicate]如何在go中向多个频道发送消息[重复]
【发布时间】:2021-12-03 06:53:51
【问题描述】:

所以我的问题是如何将消息发送到broadcast 函数仅在通道未关闭且仅一次时才获得的通道。

发送消息后应增加sentNumber

我说只是提醒一下,向所有频道发送消息是有时间限制的!

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    sentNumber int
)

func broadcast(waitTime time.Duration, message string, ch ...chan string) (sentNumber int) {
    start := time.Now()
    for _, channel := range ch {
        if time.Since(start) >= waitTime {
            break
        }
        go send(channel, message)
    }
    return 0
}

func send(channel chan string, message string) {
    for {
        if _,open := <-channel; open{
            break
        }
    }
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        wg.Done()
        channel <- message
    }()
    wg.Wait()
}

func main() {
    a := make(chan string, 1)
    b := make(chan string, 1)
    broadcast(5, "secret message", a, b)
    fmt.Println(<-a)
    fmt.Println(<-b)
}

【问题讨论】:

    标签: go concurrency


    【解决方案1】:
    1. time.Since(start) &gt;= waitTime 不能破坏 send 函数
    2. go send(channel, message) 在这种情况下不应该比单线程队列更高效
    3. broadcast 没有责任检查频道是否已关闭,频道不是由broadcast 创建/关闭的
    package main
    
    import (
        "context"
        "fmt"
        "time"
    )
    
    func broadcast(waitTime time.Duration, message string, chs ...chan string) (sentNumber int) {
        ctx, cancel := context.WithTimeout(context.Background(), waitTime)
        defer cancel()
    
        jobQueue := make(chan chan string, len(chs))
        for _, c := range chs {
            jobQueue <- c
        }
    
    queue:
        for c := range jobQueue {
            select {
            case c <- message:
                // sent success
                sentNumber += 1
                if sentNumber == len(chs) {
                    cancel()
                }
            case <-ctx.Done():
                // timeout, break job queue
                break queue
            default:
                // if send failed, retry later
                jobQueue <- c
            }
        }
    
        return
    }
    
    func main() {
        a := make(chan string)
        b := make(chan string)
    
        go func() {
            time.Sleep(time.Second)
            fmt.Println("a:", <-a)
        }()
    
        go func() {
            time.Sleep(3 * time.Second)
            fmt.Println("b:", <-b)
        }()
    
        c := broadcast(2*time.Second, "secret message", a, b)
        fmt.Printf("sent count:%d\n", c)
    
        time.Sleep(3 * time.Second)
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-24
      • 2018-12-09
      • 2021-01-01
      • 2021-02-06
      • 2019-07-29
      相关资源
      最近更新 更多