【发布时间】:2019-04-04 09:01:21
【问题描述】:
给定以下两个类:
public class Test {
public void call() {
MyClass myClass = new MyClass();
myClass.methodOne().subscribe(v -> {...});
}
}
public class MyClass {
public Observable<String> methodOne() {
Observable<String> response = Observable.fromFuture(this.methodTwo());
return response;
}
public CompletableFuture<String> methodTwo() {
CompletableFuture<String> response = new CompletableFuture<>();
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);
response.complete("initial value");
kafkaProducer.write(record, done -> {
if(done.succeeded()) {
response.complete("done");
}
else {
response.complete("not done");
};
);
return response;
}
}
其中kafkaProducer 是io.vertx.kafka.client.producer.impl.KafkaProducerImpl 的一个实例。
预期的行为是当在MyClass.methodTwo() 中调用response.complete() 时,response 值将从methodTwo() 返回到methodOne()。然后,该值将被包装在未来,并将在 Test.call() 中的 subscribe 的处理程序中可用。
但是,由于异步处理,methodTwo() 将始终返回在vertx. kafkaProducer 的写入方法之前设置的“初始值”。
即使稍后response 将在处理程序中设置为“完成”或“未完成”,该值也不会返回。
我已尝试将methodTwo 中的代码更改为以下内容:
AsyncResult<RecordMetadata> res =
Single.create((SingleEmitter<AsyncResult<RecordMetadata>> emitter) ->
producer.write(record, result -> emitter.onSuccess(result)))
.blockingGet();
然后返回 AsyncResult 中的值的想法,但这会无限期地阻塞。
解决这个问题的正确方法是什么?
谢谢
【问题讨论】:
-
再次,这与 RxJava 有什么关系?再次调用
response.complete也将无效。另外,你想要的需要阻塞,因为 Java 还不支持暂停或延续。 -
谢谢阿尔夫。使用 Vertx-Sync 并在 Sync.awaitResult() 中包装对 kafkaProducer.write() 的调用是正确的方法,还是有更好的方法?
-
我尝试使用 Single.create 并编辑了问题以显示这一点,但这似乎只是无限期地阻止。