【问题标题】:How to read from subset of channels?如何从通道子集中读取?
【发布时间】:2020-11-10 04:37:33
【问题描述】:

有 N 个频道。

每个接收者都想从通道的一个子集中获取下一条消息。

不需要松散消息并保持消息按时间顺序排列。

这是一个例子。

我们有两个频道 C1 和 C2。

C2 频道中有一条新消息。没有接收器。等待。

第一个接收者只想从 C1 读取消息。 C1 中没有消息。等待。

一条新消息出现在 C1 中。第一个接收者收到此消息。

C1 频道有一条新消息。没有接收器。我们在 C1 和 C2 频道中有消息。

第二个接收者想要从 C1 和 C2 读取消息。以下是非常重要的。第二个接收者应该完全从 C2 获得消息(不是来自 C1 或 C2 的随机消息),因为 C2 通道中的消息出现得更早!

您对如何使用 Go 通道实现这一点有任何想法吗?

在实际任务中,我有固定数量的通道(或生产者)和来自客户端的调用,其中客户端希望从中获取下一条消息的随机通道子集。像这样:

type MyChannels map[string] chan int

func NextMessage(wantChannels []string) int {
...
}

NextMessage 函数是一个阻塞函数。

【问题讨论】:

  • 如果您需要绝对排序,则需要将其添加到有效负载本身。您无法知道哪个频道已准备好首先接收(如果来源没有以任何方式协调,“第一个”甚至没有意义)。
  • 我可以在消息中包含有效负载(不仅仅是int)。你能详细解释一下你的想法吗?
  • 我不确定我是否足够了解这里的问题以提出解决方案。听起来您需要某种消息代理来强制执行排序。消息可能仅可通过有效负载进行排序,或者损坏的可能会为您维护顺序,但我认为您将需要某种元数据来跟踪消息的目的地和/或到达的时间。跨度>
  • 这是关于通道的“到达之前”关系的问题:对通道的写入操作会阻塞,直到有人从它读取,并且在您写入之前,您不知道它是否会阻塞。因此,如果您面对多个等待读取的频道,您不知道它们到达的顺序是什么,因为如果您这样做了,那么您已经阅读了它们。您可能可以使用一个额外的共享结构来保持应该读取的通道顺序。
  • 当你想要一个直接的订单时,你为什么有两个渠道呢?我会使用一个渠道并按照消息发送的顺序处理(和委托)消息。

标签: go


【解决方案1】:

您应该使用反射从动态尺寸通道中进行选择。如需完整源代码,您可以查看my github。这是快照

func NextMessage(wantChannels []string) int64 {
    // create select array with dynamic size
    cases := make([]reflect.SelectCase, len(wantChannels) + 1)

    // insert default value so it can be waiting
    cases[0] = reflect.SelectCase{Dir: reflect.SelectDefault}

    for i, wantChan := range wantChannels {
        fmt.Printf("want from Chan %s\n", wantChan)
        cases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(myChan[wantChan])}
    }

    for {
        index, value, _ := reflect.Select(cases)
        if index == 0 {
            fmt.Printf("%+v waiting for 2 seconds before checking again\n", wantChannels)
            <-time.After(2 * time.Second)
        } else {
            return value.Int()
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-06-30
    • 2016-01-14
    • 2014-09-30
    • 2011-08-22
    • 2016-10-15
    • 2019-08-14
    • 2021-09-15
    • 2017-06-01
    相关资源
    最近更新 更多