【问题标题】:Kafka Consumer Cluster Environment OffsetKafka消费者集群环境偏移
【发布时间】:2016-12-07 05:28:01
【问题描述】:

我试图让 x 数量的消费者访问 kafka 中的指定主题,但不使用相同的消息。例如,我想要...

消费者 1 取货偏移量 1 消费者 2 拾取偏移量 2 消费者 1 取货偏移量 3 消费者 2 拾取偏移量 4

我希望 kafka 充当这两个消费者的队列。我注意到 group.id 配置,我认为您可以使用同一个组,它会相应地处理它,但它似乎不像我想象的那样工作。

这是我正在使用的代码...

     public void init(){
            Properties props = new Properties();
            props.put("bootstrap.servers", kafkaUrl);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "true");
            props.put("group.id", "group1");
            props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());

            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("event1", "event2"));

            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
     }

     public void pollTopics() {
        try {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

            for (ConsumerRecord<String, String> record : records) {
                AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
                if(processor != null) {
                    kafkaThreadPool.execute(processor);
                }
            }
        }catch (Exception e){
            LOG.error("Polling exception occurred", e);
        }
    }

我希望能够在集群环境中运行此代码并让 kafka 成为队列。我希望它拉取消息并同时转到下一个偏移量,然后下一个 kafka poll 将抓取下一个偏移量。这可能吗?如果是这样,我做错了什么?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    这在 Kafka 中是不可能的(按照您的描述)。

    如果使用消费者组,则单个分区只能由单个消费者读取。因此,Kafka 确实按分区进行扩展,也就是说,如果您想要有多个消费者(读取不同的数据),您需要为每个消费者至少一个分区。如果你的分区比消费者多,一些(或所有)消费者会同时读取多个分区。

    您的解决方案是,创建具有多个分区的主题(或使用多个主题并让您组的所有消费者订阅一个主题)。

    【讨论】:

    • 好吧,这是有道理的,但我读到如果你有 2 个分区,你必须至少有 2 个消费者。那么如果其中一个消费者宕机一小时会发生什么?其他消费者不会接收这些消息吧?
    • 等等,我想你确实说过如果有 2 个分区并且只有一个消费者会从这两个分区中选择?所以只是为了确保我理解正确。如果我创建两个分区并有两个消费者,它应该解析不同的消息,如果一个确实发生了故障,那么另一个消费者会从两个分区中获取所有消息吗?如果这是正确的,您是否有订阅特定分区所需的示例?如果是这样,那么我会接受你的回答:)。
    • 我也刚刚从 kafkas 网站上阅读了这一段。 “Kafka 中的消费者组概念概括了这两个概念。与队列一样,消费者组允许您将处理划分为一组进程(消费者组的成员)。与发布-订阅一样,Kafka 允许您广播向多个消费者组发送消息。”这表明我想要完成的事情是可能的。只是不知道该怎么做。
    • 您的第二条评论是正确的。消费者可以阅读多个主题。如果您使用组管理并且一旦消费者失败,则另一个接管失败的消费者的分区。您可以通过在配置中指定相同的group.id 将不同的消费者放入同一个消费者组中。
    • 太棒了,这就是我想要的。我设法使用相同的 group.id 让它工作,我还在某处读到它们必须在不同的线程上,这可能是我的问题的一部分。我在具有相同 group.id 的单独线程上进行了测试,它似乎完美地实现了负载平衡。
    猜你喜欢
    • 1970-01-01
    • 2023-03-20
    • 1970-01-01
    • 1970-01-01
    • 2020-06-11
    • 1970-01-01
    • 2021-11-10
    • 2019-05-01
    • 2018-02-03
    相关资源
    最近更新 更多