【问题标题】:Goroutines and messages de-duplicatioinGoroutines 和消息去重
【发布时间】:2018-04-29 05:25:12
【问题描述】:

所以我有某种事件队列和几个 goroutine,它们在无限循环中从相应队列中获取事件,处理它们,并将结果发送到通道。不同的队列可能会给你相同的事件,所以我需要确保每个事件都被发送到通道一次,并且该消息在另一个队列中的任何出现都将被忽略。我认为这更多是架构问题,但我不知道如何正确处理。

我当前代码的简化版本如下。

获取和处理传入事件的 Goroutine 看起来有点像这样:

func (q *Queue) ProcessEvents(handler Handler) {
   lastEvent = 0
   for {
       events = getEvents(lastEvent)
       for _, e := range events {
           if e.ID > lastEvent  {
                lastEvent = event.ID
           }
           handler.Handle(e)
       }
   }
}

处理程序:

type Handler struct {
    c chan Event
}

func (h *Handler) Handle(event *Event) {
    //event processing omitted
    h.c <- event //Now it just sends a processed event into the channel no matter what.
}

在 main() 中我会这样做

func main() {
    msgc := make(chan Event)
    for _, q := range queues {
        go func(queue Queue) {
            queue.ProcessEvents(&Handler{msgc})
        }
    }
}

【问题讨论】:

  • 您似乎希望在通道的接收端执行此操作,而不是多次处理同一事件。
  • 但是鉴于 atm 例程彼此不知道,我如何检查事件是否已被另一个 goroutine 处理?这才是真正的主要问题。
  • 如果它是单线程的,你会怎么做?这与 goroutine 无关。如果要删除重复数据,则需要知道消息是否重复。这意味着(可能)存储您收到的每条消息,并将每条新消息与每条旧消息进行比较,看看您以前是否看过。

标签: go synchronization goroutine


【解决方案1】:

因此,您将当前架构表示如下:

使用这种类型的解决方案,生成器需要检查共享资源以查看是否已发出事件。这可能看起来像这样:

var hasEmmited map[string]bool
var lock sync.Mutex

func HasEmitted(event e) bool {
   lock.Lock()
   defer lock.Unlock()
   e,ok := hasEmmited[e.ID]
   return e && ok
}

func SetEmmited(event e) {
   lock.Lock()
   defer lock.Lock()
   hasEmmited[e.ID] = true
}

这需要锁定/解锁,即使在没有争用的最佳情况下,考虑到在关键部分完成的工作量很小,这也是一个很大的开销。

架构上的小改动,就像第二张图一样,一个 go-routine 可以在没有任何锁定的情况下进行过滤。

一些评论者表示,使用 go-routines 设计解决方案与设计单线程应用程序相同。我不相信是这样的。 我建议看看:

Golang 相关消息:https://blog.golang.org/pipelines

一些消息处理设计模式:http://www.enterpriseintegrationpatterns.com/

企业集成模式在这里可能看起来不合适,但它涵盖了许多也适用于 go 的消息传递模式。

【讨论】:

    猜你喜欢
    • 2023-01-31
    • 2019-08-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多