【问题标题】:How can i write kafka consumer without using infinite loop during deserialization?在反序列化期间如何在不使用无限循环的情况下编写 kafka 消费者?
【发布时间】:2019-09-14 05:55:36
【问题描述】:

如何在java中编写kafka消费者而不使用无限循环进行轮询?

我通过使用这个link 作为参考创建了 kafka 消费者。这里在处理传入记录函数时编写了 while(true) 循环,它在其中轮询新事件。如果我在我的项目中使用它,除此之外我无法做任何其他事情。有没有办法避免使用这个无限循环来获取新事件?

 public static void main(String[] str) throws InterruptedException {
    System.out.println("Starting  AtMostOnceConsumer ...");
    execute();
}
private static void execute() throws InterruptedException {
    KafkaConsumer<String, Event> consumer = createConsumer();
    // Subscribe to all partition in that topic. 'assign' could be used here
    // instead of 'subscribe' to subscribe to specific partition.
    consumer.subscribe(Arrays.asList("topic"));
    processRecords(consumer);
}
private static KafkaConsumer<String, Event> createConsumer() {
    Properties props = new Properties();
    String consumeGroup = "group_id";
    props.put("group.id", consumeGroup);
    props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
    props.put("client.id", "clientId");
    props.put("security.protocol", "SASL_SSL");

    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "servers");
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "username" + " password="" + "password";");
    props.put("enable.auto.commit", "true");
    // Auto commit interval, kafka would commit offset at this interval.
    props.put("auto.commit.interval.ms", "101");
    // This is how to control number of records being read in each poll
    props.put("max.partition.fetch.bytes", "135");
    // Set this if you want to always read from beginning.
    // props.put("auto.offset.reset", "earliest");
    props.put("heartbeat.interval.ms", "3000");
    props.put("session.timeout.ms", "6001");
    props.put("schema.registry.url", "https://avroregistry.octanner.io");
    props.put("key.deserializer",
            "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("value.deserializer",
            "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    return new KafkaConsumer<String, Event>(props);
}
private static void processRecords(KafkaConsumer<String, Event> consumer) throws InterruptedException {
    while (true) {
        ConsumerRecords<String, Event> records = consumer.poll(TimeUnit.MINUTES.toMillis(1));
        long lastOffset = 0;
        for (ConsumerRecord<String, Event> record : records) {
            System.out.printf("\n\n\n\n\n\n\roffset = %d, key = %s\n\n\n\n\n\n", record.offset(), record.value());
            lastOffset = record.offset();
        }
        System.out.println("lastOffset read: " + lastOffset);
        process();
    }
}
private static void process() throws InterruptedException {
    // create some delay to simulate processing of the message.
    Thread.sleep(TimeUnit.MINUTES.toMillis(1));
}

有人可以帮我修改一下,这样我就可以避免 while(true) 循环并且可以只听我传入的事件吗?

【问题讨论】:

  • 你为什么不简单地让一个线程做这件事,另一个线程做你想做的其他事情呢?像这样的处理通常是通过无限循环实现的(可选地带有一些用于关闭系统的退出条件)。
  • 轮询意味着无限循环。如果您使用的是 Spring,则可以使用 @KafkaListener,但它也会在内部进行轮询。您应该在不同的线程上进行轮询。您可能需要某种机制来退出循环。

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

您可以使用@KafkaListener (https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html)。但是,它也将在无限循环中进行轮询,因为这就是 Kafka 的设计方式——它不是队列,而是存储记录一段时间的事件总线。没有通知消费者的机制。

在不同的线程上轮询,并以优雅的方式退出循环。

【讨论】:

    【解决方案2】:

    你可以试试这样的:

    public class ConsumerDemoWithThread {
    private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());
    private String bootstrapServers = "127.0.0.1:9092";
    private String groupId = "my-first-application";
    private String topic = "first-topic";
    
    KafkaConsumer consumer = createConsumer(bootstrapServers, groupId, topic);
    
    private void pollForRecords() {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> processRecords());
    }
    
    
    private KafkaConsumer createConsumer(String bootstrapServers, String groupId, String topic) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // create consumer
        KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
        // subscribe consumer to our topic(s)
        consumer.subscribe(Arrays.asList(topic));
        return consumer;
    }
    
    
    private void processRecords() {
        try {
            while (true) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(100));
    
                for (ConsumerRecord<String, String> record : records) {
                    logger.info("Key: " + record.key() + ", Value: " + record.value());
                    logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());
                }
            }
        } catch (WakeupException e) {
            logger.info("Received shutdown signal!");
        } finally {
            consumer.close();
        }
    }
    
    public static void main(String[] args) {
        ConsumerDemoWithThread consumerDemoWithThread = new ConsumerDemoWithThread();
        consumerDemoWithThread.pollForRecords();
    }
    }
    

    基本上,正如 Joachim 所提到的,整个轮询和处理逻辑都需要委托给一个线程

    【讨论】:

      【解决方案3】:

      如果您希望能够在代码中同时执行多项操作,则需要后台线程。

      为了更轻松地做到这一点,您可以使用更高级别的 Kafka 库,例如 Spring(已回答)、Vert.xSmallrye

      这是一个 Vert.x 示例,首先是 create a KafkaConsumer,然后分配处理程序并订阅您的主题

      consumer.handler(record -> {
        System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
          ",partition=" + record.partition() + ",offset=" + record.offset());
      });
      
      // subscribe to a single topic
      consumer.subscribe("a-single-topic");
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-05-11
        • 2018-12-08
        • 2017-03-06
        • 2017-11-22
        相关资源
        最近更新 更多