【问题标题】:Is message order not guaranteed using KafkaEmbedded?使用 KafkaEmbedded 是否不能保证消息顺序?
【发布时间】:2019-11-15 09:27:48
【问题描述】:

我使用KafkaEmbedded(和KafkaTemplate)进行了单元测试,但消息顺序是随机的。有谁知道这是否合乎逻辑,是否可能是担保令?

这是我的代码:

public class KafkaTest {

  private static String TOPIC = "test.topic";

  @ClassRule
  public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC);

  @Test
  public void testEmbeddedKafkaSendOrder() throws Exception {
    Map<String, Object> producerConfig = new HashMap<>();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

    KafkaTemplate<String, byte[]> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig));
    kafkaTemplate.send(TOPIC, "TEST1".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST2".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST3".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST4".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST5".getBytes()).get();

    Map<String, Object> consumerConfig = new HashMap<>();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test-group");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    consumerConfig.put("auto.offset.reset", "earliest");

    final Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerConfig);
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, TOPIC);
    ConsumerRecords<String, byte[]> records = consumer.poll(100L);

    // Tests
    final Iterator<ConsumerRecord<String, byte[]>> recordIterator = records.iterator();
    while (recordIterator.hasNext()) {
      System.out.println("received:" + new String(recordIterator.next().value()));
    }
  }

此代码打印例如(但顺序可以更改):

received:TEST2
received:TEST4
received:TEST1
received:TEST3
received:TEST5

【问题讨论】:

    标签: unit-testing apache-kafka embedded-kafka


    【解决方案1】:

    在 Kafka 中,您可以确保消息的顺序在同一个分区上是相同的,但在主题上是不同的。

    Note that as a topic typically has multiple partitions, there is
    no guarantee of message time-ordering across the entire topic, just within a single
    partition
    

    引用本书Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale 。 您可以对此做些什么以及如何按顺序接收消息? 选项 1:

            kafkaTemplate.send(TOPIC,"1", "TEST1".getBytes()).get();
            kafkaTemplate.send(TOPIC,"1", "TEST2".getBytes()).get();
            kafkaTemplate.send(TOPIC,"1", "TEST3".getBytes()).get();
            kafkaTemplate.send(TOPIC,"1", "TEST4".getBytes()).get();
            kafkaTemplate.send(TOPIC,"1", "TEST5".getBytes()).get();
    

    这样,对于每个值,您发送相同的键“1”。 Kafka 将根据您的密钥选择分区。由于所有的键都是相等的,所有的消息都会进入同一个分区,你会按顺序收到你的记录。

    选项 2: 以这种方式初始化 KafkaEmbedded:

    new KafkaEmbedded(1, true,1, TOPIC);
    

    这样你就告诉kafka,对于这个主题,你希望只有一个分区,所以每条记录都会进入那个分区。

    【讨论】:

    • 谢谢。清晰且合乎逻辑:-) 我使用了选项 2,因为它是对调用 kafka 模板的方法进行单元测试
    猜你喜欢
    • 2016-11-18
    • 1970-01-01
    • 1970-01-01
    • 2015-09-06
    • 2021-10-04
    • 2013-05-03
    • 1970-01-01
    • 2023-03-04
    • 2019-03-03
    相关资源
    最近更新 更多