【问题标题】:Kafka Consumer subscription vs. assigned partitionKafka 消费者订阅与分配分区
【发布时间】:2016-03-16 16:39:50
【问题描述】:

卡夫卡让我很困惑。我正在使用标准值在本地运行它。 仅启用自动创建主题。 1 个分区,1 个节点,一切都是本地且简单的。 如果它写

consumer.subscribe("test_topic");
consumer.poll(10);

它根本不起作用,也永远找不到任何数据。 如果我改为分配一个分区,例如

consumer.assign(new TopicPartition("test_topic",0));

检查我在 995 的位置。现在可以轮询并接收我的生产者输入的所有数据。

关于订阅,我有哪些不明白的地方?我不需要多个消费者,每个消费者只处理一部分数据。我的消费者需要获取某个主题的所有数据。为什么所有教程中都显示的标准订阅方法对我不起作用? 我确实了解分区用于负载平衡消费者。我不明白我在订阅方面做错了什么。

consumer config properties
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "postproc-" + EnvUtils.getAppInst()); // jeder ist eine eigene gruppe -> kriegt alles
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<Long, byte[]>(props);

producer config
 props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 2);
        props.put("batch.size", 16384);
        props.put("linger.ms", 5000);
        props.put("buffer.memory", 1024 * 1024 * 10); // 10mb
        props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer(props);

producer execution
try (ByteArrayOutputStream out = new ByteArrayOutputStream()){
            event.writeDelimitedTo(out);
            for (long a = 10; a<20;a++){
                long rand=new Random(a).nextLong();
                producer.send(new ProducerRecord<>("test_topic",rand ,out.toByteArray()));
            }
            producer.flush();
        }catch (IOException e){

消费者执行

consumer.subscribe(Arrays.asList("test_topic"));
ConsumerRecords<Long,byte[]> records = consumer.poll(10);
for (ConsumerRecord<Long,byte[]> r :records){ ...

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    我设法解决了这个问题。问题是超时。打桩时,我没有给它足够的时间来完成。我认为分配一个分区要快得多,因此可以及时完成。标准订阅投票需要更长的时间。从未真正完成并且没有提交。 至少我认为这是问题所在。超时时间越长,它就可以工作。

    【讨论】:

    • 哪个具体超时有问题?
    【解决方案2】:

    我认为你缺少这个属性

    auto.offset.reset=earliest
    

    当 Kafka 中没有初始偏移量或当前 服务器上不再存在偏移量(例如,因为该数据 已被删除):

    • 最早:自动将偏移量重置为最早的偏移量
    • latest:自动将偏移量重置为最新的偏移量
    • 无:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常
    • 其他:向消费者抛出异常。

    参考:http://kafka.apache.org/documentation.html#highlevelconsumerapi

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-06-14
      • 2018-02-05
      • 2021-01-26
      • 2017-01-04
      • 2017-10-17
      相关资源
      最近更新 更多