【问题标题】:Apache Pulsar Async Consumer Setup (Completable Future)Apache Pulsar 异步消费者设置(可完成的未来)
【发布时间】:2021-04-20 01:06:26
【问题描述】:

我正在尝试为 Apache Pulsar 设置异步使用者,但我的问题是只收到 1 条消息,除非我重新启动 Spring Boot 应用程序,否则没有其他消息通过。不幸的是,关于将 CompletableFuture 与 Pulsar 结合使用的文档不是很好,我是新手。

我的代码如下:

@EventListener(ApplicationReadyEvent.class)
public void subscribe() throws PulsarClientException {

Consumer consumer = this.pulsarClient.newConsumer(JSONSchema.of(TenantMicroserviceProvisioningRequestSchema.class))
            .subscriptionName(this.pulsarSubscriptionName)
            .topic(this.pulsarTopicName)
            .ackTimeout(240, TimeUnit.SECONDS)
            .subscriptionType(SubscriptionType.Exclusive)
            .subscribe();

while(true) {
    CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

asyncMessage.thenAcceptAsync(incomingMessage -> {
        TenantMicroserviceProvisioningRequestSchema schema = (TenantMicroserviceProvisioningRequestSchema) incomingMessage.getValue();
        
       LOGGER.info(String.format("***New provisioning request recieved for tenant database [%s] with instance code [%s]", schema.getDatabaseName(), schema.getInstanceCode()));
                
       consumer.acknowledgeAsync(incomingMessage.getMessageId());
       
    });
 }
}

在 Java Docs 中确实说“一旦返回的 CompletableFuture 收到完整的消息,就应该随后调用receiveAsync()。否则它会在应用程序中创建接收请求的积压。”,但我不确定如何执行此操作.我认为这是导致问题的原因。

文档: 脉冲星文档:https://pulsar.apache.org/docs/en/client-libraries-java/#async-receive 用于接收 Async() 的 Pulsar Java 文档:https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Consumer.html

*** 更新 *** 我已经添加了 while 循环,但是当我这样做时,我的 Spring Boot 内存消耗会浮动到 10 GB。不确定,但想知道未来是否是这样设置的。

【问题讨论】:

    标签: java completable-future apache-pulsar


    【解决方案1】:

    为了防止这种过多的内存消耗,您需要在类中添加一个中间数据结构,以限制未完成的CompletableFutures 的数量,例如LinkedBlockingQueue,如下所示

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.Schema;
    import org.apache.pulsar.client.api.SubscriptionType;
    
    public class AsyncConsumerDemo {
        
        private static final LinkedBlockingQueue<CompletableFuture<Message<String>>> outstandingMessages = 
          new LinkedBlockingQueue<CompletableFuture<Message<String>>>(1000);
    
        private static final ExecutorService executor = Executors.newCachedThreadPool();
    
        public static void main(String[] args) throws PulsarClientException, InterruptedException, ExecutionException {
            
            PulsarClient pulsarClient = PulsarClient.builder()
                    .serviceUrl("pulsar://broker:6650")
                    .build();
    
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                        .subscriptionName("pulsarSubscriptionName")
                        .topic("pulsarTopicName")
                        .ackTimeout(240, TimeUnit.SECONDS)
                        .subscriptionType(SubscriptionType.Shared)
                        .subscribe();
            
            new Thread(() -> { // Message Retrieval Thread
                CompletableFuture<Message<String>> future;
                
                while ( (future = consumer.receiveAsync()) != null) {
                    outstandingMessages.put(future);
                }
              }).start();
            
            for (int numConsumers = 0; numConsumers < 10; numConsumers++) {
              executor.submit(() -> { // Message Consumer Thread
                while(true) {
                    try {
                        CompletableFuture<Message<String>> future = outstandingMessages.take();
                        Message<String> msg = future.get();
                        
                        // Process the message
                        
                        consumer.acknowledgeAsync(msg.getMessageId());
                        
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
              });
           }
        }
        
    }
    

    【讨论】:

    • 谢谢!!作品!快速问题(我将努力解决这个问题),但如果我现在将其扩展为具有多个消费者线程或检索线程 - 这应该不是问题吗?我假设消费者必须至少是 Subscription.Shared?
    • 正确,您可以将订阅模式更改为 SHARDD,并且您将使用某种线程池来保存多个消费者线程实例,因此它们将轮流消费消息。
    • 编辑使用“put”而不是“add”
    猜你喜欢
    • 2022-11-02
    • 2018-03-04
    • 1970-01-01
    • 1970-01-01
    • 2019-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-31
    相关资源
    最近更新 更多