【问题标题】:Listener for NATS JetStreamNATS JetStream 的监听器
【发布时间】:2022-06-20 01:04:58
【问题描述】:
有人可以帮助如何在 Spring Boot 中异步配置 NATS 喷射流订阅示例:寻找类似 @kafkalistener for Nats jetstream 的等效注释
我可以使用端点提取消息,但是当尝试使用 pushSubscription 提取消息时调度程序处理程序没有被调用。需要知道如何使侦听器处于活动状态并在消息发布到主题后立即使用消息。
任何有关这方面的见解/示例都会有所帮助,在此先感谢。
【问题讨论】:
标签:
spring-boot
jetstream
nats.io
nats-streaming-server
【解决方案1】:
我不知道您的 JetStream 保留政策是什么,也不知道您想要订阅的方式。但我有 WorkQueuePolicy 推送订阅的示例代码希望这会对你有所帮助。
public static void subscribe(String streamName, String subjectKey,
String queueName, IMessageHandler iMessageHandler) throws IOException,
InterruptedException, JetStreamApiException {
long s = System.currentTimeMillis();
Connection nc = Nats.connect(options);
long e = System.currentTimeMillis();
logger.info("Nats Connect in " + (e - s) + " ms");
JetStream js = nc.jetStream();
Dispatcher disp = nc.createDispatcher();
MessageHandler handler = (msg) -> {
try {
iMessageHandler.onMessageReceived(msg);
} catch (Exception exc) {
msg.nak();
}
};
ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(queueName)
.deliverGroup(queueName)
.maxDeliver(3)
.ackWait(Duration.ofMinutes(2))
.build();
PushSubscribeOptions so = PushSubscribeOptions.builder()
.stream(streamName)
.configuration(cc)
.build();
js.subscribe(subjectKey, disp, handler, false, so);
System.out.println("NatsUtil: " + durableName + "subscribe");
}
IMessageHandler 是我处理 nats.io 收到的消息的自定义接口。