【问题标题】:Sending data from one goroutine to multiple other goroutines将数据从一个 goroutine 发送到多个其他 goroutine
【发布时间】:2021-08-15 10:47:39
【问题描述】:

在项目中,程序通过 websocket 接收数据。这些数据需要经过 n 个算法处理。算法的数量可以动态变化。

我的尝试是创建一些可以即时启动和取消订阅的发布/订阅模式。事实证明,这比预期的更具挑战性。

这是我想出的(基于https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):

package pubsub

import (
    "context"
    "sync"
    "time"
)

type Pubsub struct {
    sync.RWMutex
    subs   []*Subsciption
    closed bool
}

func New() *Pubsub {
    ps := &Pubsub{}
    ps.subs = []*Subsciption{}
    return ps
}

func (ps *Pubsub) Publish(msg interface{}) {
    ps.RLock()
    defer ps.RUnlock()

    if ps.closed {
        return
    }

    for _, sub := range ps.subs {
        // ISSUE1: These goroutines apparently do not exit properly... 
        go func(ch chan interface{}) {
            ch <- msg
        }(sub.Data)
    }
}

func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {
    ps.Lock()
    defer ps.Unlock()

    // prep channel
    ctx, cancel := context.WithCancel(context.Background())
    sub := &Subsciption{
        Data:   make(chan interface{}, 1),
        cancel: cancel,
        ps:     ps,
    }

    // prep subsciption
    ps.subs = append(ps.subs, sub)
    return ctx, sub, nil
}

func (ps *Pubsub) unsubscribe(s *Subsciption) bool {
    ps.Lock()
    defer ps.Unlock()

    found := false
    index := 0
    for i, sub := range ps.subs {
        if sub == s {
            index = i
            found = true
        }
    }
    if found {
        s.cancel()
        ps.subs[index] = ps.subs[len(ps.subs)-1]
        ps.subs = ps.subs[:len(ps.subs)-1]

        // ISSUE2: close the channel async with a delay to ensure
        // nothing will be written to the channel anymore
        // via a pending goroutine from Publish()
        go func(ch chan interface{}) {
            time.Sleep(500 * time.Millisecond)
            close(ch)
        }(s.Data)
    }
    return found
}

func (ps *Pubsub) Close() {
    ps.Lock()
    defer ps.Unlock()

    if !ps.closed {
        ps.closed = true
        for _, sub := range ps.subs {
            sub.cancel()

            // ISSUE2: close the channel async with a delay to ensure
            // nothing will be written to the channel anymore
            // via a pending goroutine from Publish()
            go func(ch chan interface{}) {
                time.Sleep(500 * time.Millisecond)
                close(ch)
            }(sub.Data)
        }
    }
}

type Subsciption struct {
    Data   chan interface{}
    cancel func()
    ps     *Pubsub
}

func (s *Subsciption) Unsubscribe() {
    s.ps.unsubscribe(s)
}

正如 cmets 中提到的,这有(至少)两个问题:

问题 1:

在执行此操作一段时间后,我遇到了一些此类错误:

goroutine 120624 [runnable]:
bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)
    /home/X/Projects/bm/internal/pubsub/pubsub.go:30
created by bookmaker/internal/pubsub.(*Pubsub).Publish
    /home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb

在我没有真正理解这一点的情况下,在我看来 Publish() 中创建的 goroutine 确实会累积/泄漏。这是正确的吗?我在这里做错了什么?

问题 2:

当我通过Unsubscribe() 结束订阅时,Publish() 尝试写入已关闭的频道并出现恐慌。为了缓解这种情况,我创建了一个 goroutine 来延迟关闭通道。这感觉真的不是最佳实践,但我无法找到合适的解决方案。确定性的方法是什么?

这里有一个小操场供您测试:https://play.golang.org/p/K-L8vLjt7_9

【问题讨论】:

  • 查看可能的重复:How to broadcast message using channel
  • @icza:在您的回复中发布的经纪人似乎很有吸引力,因为它看起来很简单 - 我正在试一试。但是,我仍然对解决我方法中问题的提示感兴趣;这将有助于我学习...

标签: go concurrency channel goroutine


【解决方案1】:

在深入研究您的解决方案及其问题之前,让我再次推荐此答案中提出的另一种 Broker 方法:How to broadcast message using channel

现在开始你的解决方案。


每当您启动一个 goroutine 时,请始终考虑它会如何结束,并确保 goroutine 不应该在您的应用程序的生命周期内运行。

// ISSUE1: These goroutines apparently do not exit properly... 
go func(ch chan interface{}) {
    ch <- msg
}(sub.Data)

这个 goroutine 尝试在 ch 上发送一个值。这可能是一个阻塞操作:如果ch 的缓冲区已满并且ch 上没有就绪接收器,它将阻塞。这不受启动的 goroutine 的控制,也不受 pubsub 包的控制。在某些情况下这可能很好,但这已经给包的用户带来了负担。尽量避免这些。尝试创建易于使用且难以误用的 API。

此外,启动一个 goroutine 只是为了在通道上发送一个值是一种资源浪费(goroutine 既便宜又轻便,但你不应该尽可能地向它们发送垃圾邮件)。

您这样做是因为您不想被阻止。为避免阻塞,您可以使用具有“合理”高缓冲区的缓冲通道。是的,这并不能解决阻塞问题,只能帮助“慢”客户端从频道接收。

要在不启动 goroutine 的情况下“真正”避免阻塞,您可以使用非阻塞发送:

select {
case ch <- msg:
default:
    // ch's buffer is full, we cannot deliver now
}

如果在ch 上发送可以继续,它就会发生。如果不是,则立即选择default 分支。你必须决定然后做什么。 “丢失”消息是否可以接受?等待一段时间直到“放弃”可以接受吗?或者启动一个 goroutine 来做到这一点是否可以接受(但是你会回到我们在这里试图修复的问题)?或者在客户端可以从频道接收之前被阻止是否可以接受...

选择合理的高缓冲区,如果遇到仍然满的情况,可以接受阻塞,直到客户端可以前进并接收消息。如果不能,那么您的整个应用可能处于不可接受的状态,“挂起”或“崩溃”可能是可以接受的。

// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
    time.Sleep(500 * time.Millisecond)
    close(ch)
}(s.Data)

关闭通道是向接收器发出的信号,表明通道上将不再发送任何值。因此,关闭通道始终应该是发送者的工作(和责任)。启动一个 goroutine 来关闭通道,你将这个工作和职责“交给”另一个不会同步到发送者的“实体”(一个 goroutine)。这可能很容易导致恐慌(在关闭的通道上发送是运行时恐慌,其他公理参见How does a non initialized channel behave?)。不要那样做。

是的,这是必要的,因为您启动了 goroutines 来发送。如果你不这样做,那么你可以在不启动 goroutine 的情况下“就地”关闭,因为发送者和关闭者将是同一个实体:Pubsub 本身,其发送和关闭操作受保护互斥体。所以解决第一个问题自然会解决第二个问题。

一般来说,如果一个通道有多个发送者,那么关闭通道必须协调。必须有一个实体(通常不是任何发件人)等待所有发件人完成,实际上使用sync.WaitGroup,然后该单个实体可以安全地关闭通道。见Closing channel of unknown length

【讨论】:

  • 谢谢!我已经采用了你提到的你的实现。拥有一个执行操作的主循环是有意义的,并且不需要Mutex。您的实现基本上有三个更改: 有Subscription 结构,主要是提供Unsubscribe() func 挂在它上面。此外,Subscribe() 提供了一个上下文,允许消费者对 Pubsub/Broker 的 stop 做出反应。最后,start() 不是 pulic 并由构造函数触发,以保持 API 与之前的实现相比稳定:参见play.golang.org/p/8N99orFKewe
  • @sontags 是的,现在看起来简单多了。我仍然相信使用地图来存储订阅者是值得的(但这取决于有多少订阅者,以及他们订阅/取消订阅的频率)。如果这是您描述的所有希望的功能,我还认为不需要 Subsciption 类型。但如果将来需要添加它会更加灵活,并且 API 不会改变。
猜你喜欢
  • 2019-07-18
  • 2018-09-29
  • 1970-01-01
  • 1970-01-01
  • 2018-02-05
  • 2017-01-10
  • 2020-08-02
  • 2018-01-12
  • 1970-01-01
相关资源
最近更新 更多