【问题标题】:Cannot subscribe multiple consumers to Spring Kafka Test embedded Kafka broker无法为多个消费者订阅 Spring Kafka Test 嵌入式 Kafka 代理
【发布时间】:2021-05-17 16:21:31
【问题描述】:

我尝试让两个消费者订阅一个EmbeddedKafkaBroker。第一个成功了,第二个失败了。 @EmbeddedKafka@ClassRule 代理都失败了。

@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(topics = { "topic" })
public class AnnotationEmbeddedKafkaTest {

    @Autowired
    private EmbeddedKafkaBroker broker;

    @Test
    public void annotationEmbeddedKafkaTest() {
        Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
        broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
        System.out.println("consumer1 assignments=" + consumer1.assignment());

        Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
        broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
        System.out.println("consumer2 assignments=" + consumer2.assignment());
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ClassRuleEmbeddedKafkaTest {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, false, "topic");

    private EmbeddedKafkaBroker broker = embeddedKafkaRule.getEmbeddedKafka();

    @Test
    public void classRuleEmbeddedKafkaTest() {
        Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
        broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
        System.out.println("consumer1 assignments=" + consumer1.assignment());

        Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
        broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
        System.out.println("consumer2 assignments=" + consumer2.assignment());
    }
}

我希望这两个消费者可以订阅一个EmbeddedKafkaBroker。 Spring Kafka Test 可以吗?

我在这里复制了这个:https://github.com/yraydhitya/spring-kafka-test-multiple-consumers

【问题讨论】:

    标签: spring spring-kafka spring-kafka-test


    【解决方案1】:

    如果您希望两个消费者都接收来自该主题的所有消息,您需要他们成为不同消费者组的一部分,例如:

    Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded1", "false", broker);
    

    Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded2", "false", broker);
    

    否则每个消费者将被分配到主题的不同分区,并且由于您嵌入的 kafka 主题(默认情况下)只有一个分区,因此只会分配一个消费者。

    【讨论】:

    • 这行得通,但我故意希望两个消费者成为同一个消费者组的一部分。我将分区号增加到 4 并得到了相同的结果。第一个消费者分配给所有四个分区。当第二个消费者尝试订阅时,发生了重新平衡,第二个消费者订阅超时。
    • 您在发送的消息中使用哪些键?首先尝试不使用密钥,然后使用 2 个分区。这应该使 kafka 将每个消费者分配给一个分区
    猜你喜欢
    • 2019-04-07
    • 1970-01-01
    • 2018-01-19
    • 2019-11-21
    • 2017-05-16
    • 1970-01-01
    • 2017-07-31
    • 1970-01-01
    • 2018-06-18
    相关资源
    最近更新 更多