【发布时间】:2021-04-20 01:06:26
【问题描述】:
我正在尝试为 Apache Pulsar 设置异步使用者,但我的问题是只收到 1 条消息,除非我重新启动 Spring Boot 应用程序,否则没有其他消息通过。不幸的是,关于将 CompletableFuture 与 Pulsar 结合使用的文档不是很好,我是新手。
我的代码如下:
@EventListener(ApplicationReadyEvent.class)
public void subscribe() throws PulsarClientException {
Consumer consumer = this.pulsarClient.newConsumer(JSONSchema.of(TenantMicroserviceProvisioningRequestSchema.class))
.subscriptionName(this.pulsarSubscriptionName)
.topic(this.pulsarTopicName)
.ackTimeout(240, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while(true) {
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
asyncMessage.thenAcceptAsync(incomingMessage -> {
TenantMicroserviceProvisioningRequestSchema schema = (TenantMicroserviceProvisioningRequestSchema) incomingMessage.getValue();
LOGGER.info(String.format("***New provisioning request recieved for tenant database [%s] with instance code [%s]", schema.getDatabaseName(), schema.getInstanceCode()));
consumer.acknowledgeAsync(incomingMessage.getMessageId());
});
}
}
在 Java Docs 中确实说“一旦返回的 CompletableFuture 收到完整的消息,就应该随后调用receiveAsync()。否则它会在应用程序中创建接收请求的积压。”,但我不确定如何执行此操作.我认为这是导致问题的原因。
文档: 脉冲星文档:https://pulsar.apache.org/docs/en/client-libraries-java/#async-receive 用于接收 Async() 的 Pulsar Java 文档:https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Consumer.html
*** 更新 *** 我已经添加了 while 循环,但是当我这样做时,我的 Spring Boot 内存消耗会浮动到 10 GB。不确定,但想知道未来是否是这样设置的。
【问题讨论】:
标签: java completable-future apache-pulsar