【发布时间】:2019-06-05 04:19:09
【问题描述】:
所以我的用例是在 Spring Webflux 应用程序中使用来自 Kafka 的消息,同时使用 Project Reactor 以反应式编程,并以与接收消息相同的顺序对每条消息执行非阻塞操作卡夫卡。系统也应该能够自行恢复。
这是设置为使用的代码 sn-p:
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.map(this::performAction)
.flatMapSequential(receiverRecordMono -> receiverRecordMono)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
如您所见,我所做的是:从 Kafka 获取消息,将其转换为用于新目的地的对象,然后将其发送到目的地,然后确认偏移量以将消息标记为已使用并已处理.以与从 Kafka 消费的消息相同的顺序确认偏移量至关重要,这样我们就不会将偏移量移动到未完全处理的消息之外(包括将一些数据发送到目的地)。因此,我使用flatMapSequential 来确保这一点。
为简单起见,我们假设transformToOutputFormat() 方法是一个恒等变换。
public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
return record;
}
performAction() 方法需要通过网络执行某些操作,例如调用 HTTP REST API。所以适当的 API 会返回一个 Mono,这意味着需要订阅该链。此外,我需要通过此方法返回 ReceiverRecord,以便可以在上面的 flatMapSequential() 运算符中确认偏移量。因为我需要订阅 Mono,所以我在上面使用 flatMapSequential。如果没有,我可以改用map。
public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
return Mono.just(record)
.flatMap(receiverRecord ->
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
)
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.then(Mono.just(record));
我在这个方法中有两个相互冲突的需求: 1. 订阅发起 HTTP 调用的链 2.返回ReceiverRecord
使用 flatMap() 意味着我的返回类型更改为 Mono。在同一个地方使用 doOnNext() 将保留链中的 ReceiverRecord,但不允许自动订阅 HttpClient 响应。
我无法在asString() 之后添加.subscribe(),因为我想等到完全接收到 HTTP 响应后才能确认偏移量。
我也不能使用.block(),因为它在并行线程上运行。
因此,我需要作弊并从方法范围返回record对象。
另一件事是在performAction 内重试时,它会切换线程。由于 flatMapSequential() 急切地订阅外部流中的每个 Mono,这意味着虽然可以保证按顺序确认偏移量,但我们不能保证 performAction 中的 HTTP 调用将以相同的顺序执行。
所以我有两个问题。
- 是否可以以自然的方式返回
record而不是返回方法范围对象? - 是否可以确保 HTTP 调用和偏移确认都以与发生这些操作的消息相同的顺序执行?
【问题讨论】:
标签: java apache-kafka reactive-programming spring-webflux project-reactor