【发布时间】:2021-08-15 10:47:39
【问题描述】:
在项目中,程序通过 websocket 接收数据。这些数据需要经过 n 个算法处理。算法的数量可以动态变化。
我的尝试是创建一些可以即时启动和取消订阅的发布/订阅模式。事实证明,这比预期的更具挑战性。
这是我想出的(基于https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):
package pubsub
import (
"context"
"sync"
"time"
)
type Pubsub struct {
sync.RWMutex
subs []*Subsciption
closed bool
}
func New() *Pubsub {
ps := &Pubsub{}
ps.subs = []*Subsciption{}
return ps
}
func (ps *Pubsub) Publish(msg interface{}) {
ps.RLock()
defer ps.RUnlock()
if ps.closed {
return
}
for _, sub := range ps.subs {
// ISSUE1: These goroutines apparently do not exit properly...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)
}
}
func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {
ps.Lock()
defer ps.Unlock()
// prep channel
ctx, cancel := context.WithCancel(context.Background())
sub := &Subsciption{
Data: make(chan interface{}, 1),
cancel: cancel,
ps: ps,
}
// prep subsciption
ps.subs = append(ps.subs, sub)
return ctx, sub, nil
}
func (ps *Pubsub) unsubscribe(s *Subsciption) bool {
ps.Lock()
defer ps.Unlock()
found := false
index := 0
for i, sub := range ps.subs {
if sub == s {
index = i
found = true
}
}
if found {
s.cancel()
ps.subs[index] = ps.subs[len(ps.subs)-1]
ps.subs = ps.subs[:len(ps.subs)-1]
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)
}
return found
}
func (ps *Pubsub) Close() {
ps.Lock()
defer ps.Unlock()
if !ps.closed {
ps.closed = true
for _, sub := range ps.subs {
sub.cancel()
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(sub.Data)
}
}
}
type Subsciption struct {
Data chan interface{}
cancel func()
ps *Pubsub
}
func (s *Subsciption) Unsubscribe() {
s.ps.unsubscribe(s)
}
正如 cmets 中提到的,这有(至少)两个问题:
问题 1:
在执行此操作一段时间后,我遇到了一些此类错误:
goroutine 120624 [runnable]:
bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)
/home/X/Projects/bm/internal/pubsub/pubsub.go:30
created by bookmaker/internal/pubsub.(*Pubsub).Publish
/home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb
在我没有真正理解这一点的情况下,在我看来 Publish() 中创建的 goroutine 确实会累积/泄漏。这是正确的吗?我在这里做错了什么?
问题 2:
当我通过Unsubscribe() 结束订阅时,Publish() 尝试写入已关闭的频道并出现恐慌。为了缓解这种情况,我创建了一个 goroutine 来延迟关闭通道。这感觉真的不是最佳实践,但我无法找到合适的解决方案。确定性的方法是什么?
这里有一个小操场供您测试:https://play.golang.org/p/K-L8vLjt7_9
【问题讨论】:
-
@icza:在您的回复中发布的经纪人似乎很有吸引力,因为它看起来很简单 - 我正在试一试。但是,我仍然对解决我方法中问题的提示感兴趣;这将有助于我学习...
标签: go concurrency channel goroutine