【问题标题】:How does NATS / JetStream "remember" subscribers?NATS / JetStream 如何“记住”订阅者?
【发布时间】:2022-01-07 14:11:56
【问题描述】:

我正在使用 NATS 迈出第一步,并看到我无法理解的行为,即使在非常仔细地阅读了文档之后也是如此。我有一个本地 NATS 服务器(2.6.5)正在运行。它开始于

./nats-server -js

我使用以下代码生成一些消息:

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.delete_stream(name="hello")
    await js.add_stream(
        name="hello",
        subjects=["hello"],
    )
    for i in range(0, 10):
        await js.publish("hello", f"hello world: {i}".encode())
    await nc.close()


if __name__ == "__main__":
    asyncio.run(main())

如果我运行代码并执行./nats stream ls,我会看到 10 条消息。到目前为止一切都很好。接下来我运行我的消费者:

async def main():
    nc = await nats.connect()
    js = nc.jetstream()
    sub = await js.pull_subscribe("hello", "hello")

    msg_count = 0
    while msg_count < 10:
        for msg in await sub.fetch(1):
            print("Received:", msg.data)
            msg_count = msg_count + 1

            # Try nack'ing every third message
            if msg_count % 3 == 0:
                await msg.nak()
            else:
                await msg.ack()

    await nc.close()


if __name__ == "__main__":
    asyncio.run(main())

输出显示:

Received: b'hello world: 0'
Received: b'hello world: 1'
Received: b'hello world: 2'
Received: b'hello world: 2'
Received: b'hello world: 3'
Received: b'hello world: 4'
Received: b'hello world: 4'
Received: b'hello world: 5'
Received: b'hello world: 6'
Received: b'hello world: 6'

这是有道理的:我提取了 10 条消息。每第三条消息都是“裸露的”,因此下一次调用会再次检索它。如果我再次启动脚本,输出是:

Received: b'hello world: 7'
Received: b'hello world: 8'
Received: b'hello world: 9'
Received: b'hello world: 9'

几秒钟后,我暂停了。显然 NATS 以某种方式记住了我的脚本并继续传递消息。但我不明白这是怎么发生的!?流中是否有“全局”光标?但在那种情况下,多个客户会干扰,这对我来说没有意义。所以我假设 NATS 以某种方式记住了我的客户。如何?我如何告诉 NATS 我要重新启动?我也很感激指向我显然错过的文档的指针!?

【问题讨论】:

    标签: python jetstream nats.io nats-streaming-server


    【解决方案1】:

    在创建拉取订阅时,jetstream 客户端 API 还会创建一个 durable 消费者,该消费者具有匹配的消费者选项,在本例中为流名称和 durable name(第二个参数)。

    sub = await js.pull_subscribe("hello", "hello")
    

    持久的消费者旨在长期存在,服务器将 跟踪消费者在流中的位置。所以如果消费者 停止然后重新启动,它将自动从停止的地方重新启动 用于初始化消费者的配置将是 记得。拉动时需要耐用的消费者 订阅,并且在进行推送订阅时是可选的。

    来源:https://nats.io/blog/jetstream-java-client-03-consume/#durable-vs-ephemeral

    【讨论】:

      猜你喜欢
      • 2022-08-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-12
      • 1970-01-01
      • 2022-06-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多