【问题标题】:groupByKey creates repartition topic even though there is no key change即使没有密钥更改,groupByKey 也会创建重新分区主题
【发布时间】:2018-04-07 06:42:09
【问题描述】:

我正在尝试借助 kafka 流 (Kafka 1.0.1) 和 spring 云流 (2.0.0-build-snapshot) 来实现一个简单的事件源服务。我的 StreamListener 方法只是读取与我的聚合的状态更改相对应的事件 Kstream 并将它们应用于聚合并将最新状态保存在本地状态存储中(kafka 提供的状态存储)。域事件消息也具有与聚合的 uuid(String) 相同的键。代码如下:

@StreamListener(Channels.EVENTS_INPUT_CHANNEL)
public void listen(KStream<String, DomainEvent> stream) {
    Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
    Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);
    stream
        .groupByKey(Serialized.with(Serdes.String(), domainEventSerde))
        .aggregate(
                Slot::new, 
                (s, domainEvent, slot) -> slot.handle(domainEvent),
                Materialized.<String, Slot, KeyValueStore<Bytes, byte[]>>
                as(Repository.SNAPSHOTS_FOR_SLOTS)
                    .withKeySerde(Serdes.String()).withValueSerde(slotSerde)
        );
}

上面的代码产生了一个变更日志主题(如预期的那样):slot-service-slots-changelog。虽然它也创建了一个重新分区主题:slot-service-slots-repartition。这两个主题似乎具有完全相同的消息(键和值)。我的理解是,如果没有对流进行密钥修改操作,则不需要重新分区。我在这里遗漏了什么吗?

更新: 这可能不再需要,因为 sobychacko 已经提供了解释,但是我确实尝试了没有像下面这样的云流绑定,它没有创建重新分区主题:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfiguration {

    @Bean
    KafkaTemplate<String, DomainEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    ProducerFactory<String,DomainEvent> producerFactory() {
        return new DefaultKafkaProducerFactory<>(config());
    }

    private Map<String, Object> config() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return config;
    }

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    StreamsConfig streamsConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "slot-service");
        return new StreamsConfig(config);
    }

    @Bean
    KTable<String, Slot> kTable(KStreamBuilder builder) {
        Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
        Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);

        return
                builder
                .stream(Serdes.String(), domainEventSerde, Repository.SLOT_EVENTS)
                .groupByKey(Serdes.String(), domainEventSerde)
                .aggregate(
                    Slot::new, 
                    (s, domainEvent, slot) -> slot.handle(domainEvent),
                    slotSerde,
                    Repository.SNAPSHOTS_FOR_SLOTS);
    }

    }

另外,生产者如下:

@Autowired
    public Repository(KafkaTemplate<String, DomainEvent> kafkaTemplate, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
        this.kafkaTemplate = kafkaTemplate;
        this.kStreamBuilderFactoryBean = kStreamBuilderFactoryBean;
    }

    public void save(Slot slot) {
        List<DomainEvent> newEvents = slot.getDirtyEvents();
        newEvents.forEach(
            domainEvent -> kafkaTemplate.send(SLOT_EVENTS, domainEvent.aggregateUUID().toString(),domainEvent) 
        );
        slot.flushEvents();
    }

更新 2:

这是带有云流的生产者代码:

public void save(Slot slot) {
        List<DomainEvent> newEvents = slot.getDirtyEvents();
        newEvents.forEach(domainEvent -> channels.eventsOutputChannel().send(MessageBuilder.withPayload(domainEvent)
                .setHeader(KafkaHeaders.MESSAGE_KEY, slot.getUuid().toString()).build()));
        slot.flushEvents();
    }

【问题讨论】:

  • 我认为这与 spring cloud stream binder 没有任何关系。你能直接用 Kafka Streams 试试代码,看看效果如何?我想你会在那里看到同样的行为。
  • 问题是,输入流从哪里来?它只是作为参数传入——也许,输入流是通过键更改操作创建的?请注意,重新分区主题是“懒惰地”插入的——即,如果你执行map() 操作,这只会设置一个标志,不会立即重新分区——连续的groupByKey() 会检查这个标志,如果需要,可能会创建一个重新分区主题.
  • 谢谢@MatthiasJ.Sax。在我们进行入站反序列化的方法之前发生了一个 map() 操作(我假设在上面的示例中禁用了本机反序列化)。我认为您可以启用 nativeDecoding 并使用 Kafka 提供的 Serde 来避免该 map() 操作。上面代码中使用的 JsonSerde 目前不能用作属性,因为它需要类信息,但是在下一个版本的 spring-cloud-stream binder 中,我们将更容易将其用作属性,以便您可以使用作为一个 Serde。
  • 这是有道理的。感谢@sobychacko 的澄清——你应该把它作为答案(你也可以在你的答案中包含我的评论)。
  • 感谢 sobychacko 和 Matthias J. Sax。为了清楚起见,我已使用非云流消费者和生产者代码以及云流生产者代码更新了帖子。

标签: apache-kafka apache-kafka-streams spring-cloud-stream


【解决方案1】:

在我们进行入站反序列化的方法被调用之前发生了一个 map() 操作(我假设在上面的示例中禁用了本机反序列化)。正如 Matthias 所指出的,如果有一个 map() 操作,它会设置一个标志,并在随后的 groupByKey() 中创建一个重新分区主题。因此,这可能是在您的情况下发生的情况,因为框架会为您执行此 map 操作作为入站消息转换的一部分。如果你真的想避免创建这个重新分区主题,你可以启用nativeDecoding,然后使用Kafka提供的Serde。这样框架就不会调用map 操作。问题是代码中使用的JsonSerde 不容易用作 Spring Cloud Stream 中的 Serde 属性,因为它需要类信息。在 Spring Cloud Stream 的下一个版本中,我们将改善这种情况。同时,您可以提供自定义 Serde。希望这可以帮助。

【讨论】:

  • 谢谢@sobychacko。这解释了这种行为
  • 另外,有我可以跟踪的 github 问题吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-01-04
  • 1970-01-01
  • 1970-01-01
  • 2020-05-21
  • 2013-02-12
  • 2013-08-30
  • 1970-01-01
相关资源
最近更新 更多