【问题标题】:Returning value from asynchronous handler to containing method从异步处理程序返回值到包含方法
【发布时间】:2019-04-04 09:01:21
【问题描述】:

给定以下两个类:

public class Test {

    public void call() {
        MyClass myClass = new MyClass();

        myClass.methodOne().subscribe(v -> {...});
    }
}

public class MyClass {

    public Observable<String> methodOne() {
        Observable<String> response =  Observable.fromFuture(this.methodTwo());
        return response;
    }

    public CompletableFuture<String> methodTwo() {
        CompletableFuture<String> response = new CompletableFuture<>();
        KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);

        response.complete("initial value");

        kafkaProducer.write(record, done -> {
            if(done.succeeded()) {
                response.complete("done");
            }
            else {
                response.complete("not done");
            };
        );

        return response;
    }
}

其中kafkaProducerio.vertx.kafka.client.producer.impl.KafkaProducerImpl 的一个实例。

预期的行为是当在MyClass.methodTwo() 中调用response.complete() 时,response 值将从methodTwo() 返回到methodOne()。然后,该值将被包装在未来,并将在 Test.call() 中的 subscribe 的处理程序中可用。

但是,由于异步处理,methodTwo() 将始终返回在vertx. kafkaProducer 的写入方法之前设置的“初始值”。

即使稍后response 将在处理程序中设置为“完成”或“未完成”,该值也不会返回。

我已尝试将methodTwo 中的代码更改为以下内容:

AsyncResult<RecordMetadata> res = 
Single.create((SingleEmitter<AsyncResult<RecordMetadata>> emitter) ->
   producer.write(record,   result -> emitter.onSuccess(result)))
   .blockingGet();

然后返回 AsyncResult 中的值的想法,但这会无限期地阻塞。

解决这个问题的正确方法是什么?

谢谢

【问题讨论】:

  • 再次,这与 RxJava 有什么关系?再次调用response.complete 也将无效。另外,你想要的需要阻塞,因为 Java 还不支持暂停或延续。
  • 谢谢阿尔夫。使用 Vertx-Sync 并在 Sync.awaitResult() 中包装对 kafkaProducer.write() 的调用是正确的方法,还是有更好的方法?
  • 我尝试使用 Single.create 并编辑了问题以显示这一点,但这似乎只是无限期地阻止。

标签: java vert.x


【解决方案1】:

您可以使用 Vert.x Handler 来处理异步调用的结果。 异步调用完成后,您可以调用与结果一起传递的处理程序。

下面是一个与解决您的问题有关的小sn-p。

public class Test {

    public void call() {
        MyClass myClass = new MyClass();

        myClass.methodTwo(f-> {
            if (f.succeeded()) {
                //do something with f.result()
            }
            else {
                //handle;
            }
        });
    }
}

public class MyClass {

    public void methodTwo(Handler<AsyncResult<String>> handler) {
        KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);

        //response.complete("initial value");

        kafkaProducer.write(record, done -> {
            if(done.succeeded()) {
                handler.handle("done");
            }
            else {
                handler.handle("not done");
            };
        );

        //return response;
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-09-06
    • 2012-12-20
    • 2018-09-01
    • 2019-08-27
    • 1970-01-01
    • 2017-07-31
    • 2021-08-08
    • 1970-01-01
    相关资源
    最近更新 更多