【问题标题】:Cancel task on timeout in RxJava在 RxJava 中超时取消任务
【发布时间】:2015-06-25 07:01:37
【问题描述】:

我正在试验 RxJava 和 Java 8 的 CompletableFuture 类 并且不太了解如何处理超时条件。

import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable;

// ...

    Observable<String> doSomethingSlowly() {
        CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> {
            // this call may be very slow - if it takes too long, 
            // we want to time out and cancel it.
            return processor.slowExternalCall();

        });

        return toObservable(task);
    }

    // ...

    doSomethingSlowly()
        .single()
        .timeout(3, TimeUnit.SECONDS, Observable.just("timeout"));

这基本上是可行的(如果达到三秒的超时,则发布“超时”)。但是,我还想取消包含在 Observable 中的未来任务 - 以 RxJava 为中心的方法是否可能?

我知道一种选择是自己处理超时,使用task.get(3, TimeUnit.SECONDS),但我想知道是否可以在 RxJava 中完成所有任务处理。

【问题讨论】:

标签: java future rx-java completable-future


【解决方案1】:

是的,您可以这样做。您可以将Subscription 添加到Subscriber

这使您可以监听取消订阅,如果您明确调用 subscribe().unsubscribe()Observable 成功完成或出现错误,则会发生这种情况。

如果您在未来完成之前看到取消订阅,您可以认为这是因为明确的 unsubscribe 或超时。

public class FutureTest {
    public static void main(String[] args) throws IOException {
        doSomethingSlowly()
                .timeout(1, TimeUnit.SECONDS, Observable.just("timeout"))
                .subscribe(System.out::println);
        System.in.read(); // keep process alive
    }

    private static Observable<String> doSomethingSlowly() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            return "Something";

        });
        return toObservable(future);
    }

    private static <T> Observable<T> toObservable(CompletableFuture<T> future) {
        return Observable.create(subscriber -> {
            subscriber.add(new Subscription() {
                private boolean unsubscribed = false;
                @Override
                public void unsubscribe() {
                    if (!future.isDone()){
                        future.cancel(true);
                    }
                    unsubscribed = true;
                }

                @Override
                public boolean isUnsubscribed() {
                    return unsubscribed;
                }
            });

            future.thenAccept(value -> {
                if (!subscriber.isUnsubscribed()){
                    subscriber.onNext(value);
                    subscriber.onCompleted();
                }
            }).exceptionally(throwable -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(throwable);
                }
                return null;
            });
        });
    }
}

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-04-09
  • 1970-01-01
  • 1970-01-01
  • 2015-08-23
相关资源
最近更新 更多