【发布时间】:2018-04-07 06:42:09
【问题描述】:
我正在尝试借助 kafka 流 (Kafka 1.0.1) 和 spring 云流 (2.0.0-build-snapshot) 来实现一个简单的事件源服务。我的 StreamListener 方法只是读取与我的聚合的状态更改相对应的事件 Kstream 并将它们应用于聚合并将最新状态保存在本地状态存储中(kafka 提供的状态存储)。域事件消息也具有与聚合的 uuid(String) 相同的键。代码如下:
@StreamListener(Channels.EVENTS_INPUT_CHANNEL)
public void listen(KStream<String, DomainEvent> stream) {
Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);
stream
.groupByKey(Serialized.with(Serdes.String(), domainEventSerde))
.aggregate(
Slot::new,
(s, domainEvent, slot) -> slot.handle(domainEvent),
Materialized.<String, Slot, KeyValueStore<Bytes, byte[]>>
as(Repository.SNAPSHOTS_FOR_SLOTS)
.withKeySerde(Serdes.String()).withValueSerde(slotSerde)
);
}
上面的代码产生了一个变更日志主题(如预期的那样):slot-service-slots-changelog。虽然它也创建了一个重新分区主题:slot-service-slots-repartition。这两个主题似乎具有完全相同的消息(键和值)。我的理解是,如果没有对流进行密钥修改操作,则不需要重新分区。我在这里遗漏了什么吗?
更新: 这可能不再需要,因为 sobychacko 已经提供了解释,但是我确实尝试了没有像下面这样的云流绑定,它没有创建重新分区主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfiguration {
@Bean
KafkaTemplate<String, DomainEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
ProducerFactory<String,DomainEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(config());
}
private Map<String, Object> config() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
StreamsConfig streamsConfig() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "slot-service");
return new StreamsConfig(config);
}
@Bean
KTable<String, Slot> kTable(KStreamBuilder builder) {
Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);
return
builder
.stream(Serdes.String(), domainEventSerde, Repository.SLOT_EVENTS)
.groupByKey(Serdes.String(), domainEventSerde)
.aggregate(
Slot::new,
(s, domainEvent, slot) -> slot.handle(domainEvent),
slotSerde,
Repository.SNAPSHOTS_FOR_SLOTS);
}
}
另外,生产者如下:
@Autowired
public Repository(KafkaTemplate<String, DomainEvent> kafkaTemplate, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
this.kafkaTemplate = kafkaTemplate;
this.kStreamBuilderFactoryBean = kStreamBuilderFactoryBean;
}
public void save(Slot slot) {
List<DomainEvent> newEvents = slot.getDirtyEvents();
newEvents.forEach(
domainEvent -> kafkaTemplate.send(SLOT_EVENTS, domainEvent.aggregateUUID().toString(),domainEvent)
);
slot.flushEvents();
}
更新 2:
这是带有云流的生产者代码:
public void save(Slot slot) {
List<DomainEvent> newEvents = slot.getDirtyEvents();
newEvents.forEach(domainEvent -> channels.eventsOutputChannel().send(MessageBuilder.withPayload(domainEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, slot.getUuid().toString()).build()));
slot.flushEvents();
}
【问题讨论】:
-
我认为这与 spring cloud stream binder 没有任何关系。你能直接用 Kafka Streams 试试代码,看看效果如何?我想你会在那里看到同样的行为。
-
问题是,输入流从哪里来?它只是作为参数传入——也许,输入流是通过键更改操作创建的?请注意,重新分区主题是“懒惰地”插入的——即,如果你执行
map()操作,这只会设置一个标志,不会立即重新分区——连续的groupByKey()会检查这个标志,如果需要,可能会创建一个重新分区主题. -
谢谢@MatthiasJ.Sax。在我们进行入站反序列化的方法之前发生了一个 map() 操作(我假设在上面的示例中禁用了本机反序列化)。我认为您可以启用 nativeDecoding 并使用 Kafka 提供的 Serde 来避免该 map() 操作。上面代码中使用的 JsonSerde 目前不能用作属性,因为它需要类信息,但是在下一个版本的 spring-cloud-stream binder 中,我们将更容易将其用作属性,以便您可以使用作为一个 Serde。
-
这是有道理的。感谢@sobychacko 的澄清——你应该把它作为答案(你也可以在你的答案中包含我的评论)。
-
感谢 sobychacko 和 Matthias J. Sax。为了清楚起见,我已使用非云流消费者和生产者代码以及云流生产者代码更新了帖子。
标签: apache-kafka apache-kafka-streams spring-cloud-stream