【问题标题】:NATS JetStream: Is it possible to explicitly ask from JetStream to (re)send the last few messages it received in subject foo.*?NATS JetStream:是否可以明确要求 JetStream (重新)发送它在主题 foo.* 中收到的最后几条消息?
【发布时间】:2023-03-07 23:04:01
【问题描述】:

基本上是主题所说的内容。

我想知道是否可以通过某种方式查询 JetStream,使我们能够重新获取主题“foo.*”的最后 15 条消息或 JetStream 在过去 1.5 秒内收到的关于主题“foo.*”的消息.

如果可能的话,任何代码示例或代码示例的链接都将受到赞赏。

【问题讨论】:

    标签: go messaging jetstream nats.io event-stream


    【解决方案1】:

    据官方docs

    • 可以从某个时间开始抓取消息:最后 1.5 秒内。

    DeliverByStartTime
    首次使用消息时,请从此时或之后的消息开始。消费者需要指定 OptStartTime,即流中开始的时间。它将在该时间或之后收到最接近的可用消息。

    • 另外一个要求,最后15条消息,我觉得不可能

    【讨论】:

      【解决方案2】:
      • 在JetStream中有一种方法可以实现时间相关的检索。

        now := time.Now()
        oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500)
        
        js, _ := nc.JetStream()
        sub, err := js.SubscribeSync(
             "foo.*",
             nats.OrderedConsumer(),
             nats.StartTime(oneAndHalfSecondAgo),
        )
        
        for {
            msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones
            if err != nil {
                log.Fatal(err)
            }
        
            // 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here
        
            // 2. if the message is before now we can push it in an array here
        }
        

        请注意,这种技术虽然很有用,但效率很低,因为我们是一个接一个地抓取消息。

        我们可以使用 .Subscribe() (这是异步的)来修改它,但是我们会遇到不同的问题:

        我们会从 JetStream 过度拉动到现在,然后我们必须确保我们抓取的缓冲消息确实会返回给 JetStream。据我所知,没有配置选项可以告诉 JetStream “MaxTime”。

      • 至于如何“获取最新的 N 消息”,可以修改上面的代码示例,以便他将获得相当多的消息块(例如,最后 5 秒或 10 秒或 30 秒内的所有消息)以及在获得到现在为止的所有消息,然后他可以抓取最新的“N”条消息。

        当然,这种技术并不理想,但似乎没有其他方法可以做到这一点 - 至少在撰写本文时没有。

      【讨论】:

        猜你喜欢
        • 2022-12-20
        • 2021-10-29
        • 1970-01-01
        • 2022-06-20
        • 2022-08-20
        • 2022-10-25
        • 2023-03-16
        • 1970-01-01
        • 2021-07-14
        相关资源
        最近更新 更多