【发布时间】:2019-03-06 13:50:54
【问题描述】:
我正在使用 io.projectreactor 3 (reactor-core 3.2.6.RELEASE),我注意到在错误处理方面存在一些差异。不幸的是,官方文档没有提供足够的细节来解决我的问题。
我有以下 4 个 sn-ps。在某些情况下,异常将被忽略,而在其他情况下,它会进一步抛出。实际产生和消费异常的方式是什么?
片段 1
在这种情况下,异常将被忽略,main() 将在不接收异常的情况下完成。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
}).doOnNext(e -> {
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
输出:
DONE
片段 2
与上面的示例类似,只是我们不使用 Flux.push 而是使用 Flux.just。 Main() 将收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.just(
1
).doOnNext(e -> {
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
输出:
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
at Scratch.lambda$main$1(scratch_15.java:10)
...
片段 3
我们通过调用 sink.error 来发出异常信号。 Main() 不会收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
sink.error(new RuntimeException("HELLO WORLD"));
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
输出:
1
2
DONE
片段 4
我们直接抛出异常。 Main() 将收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
输出
1
2
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
at Scratch.lambda$main$1(scratch_15.java:10)
...
在使用响应式核心时处理异常的正确方法是什么?唯一可靠的方法似乎根本不使用错误回调,而是用 try/catch 包围flux.subscribe。但是在那种情况下我总是收到UnsupportedOperationException而不是原始异常,然后我需要使用Exceptions.isErrorCallbackNotImplemented来检查它是否来自反应,提取嵌套异常然后抛出它。
这当然可以做到,但需要在我们使用 Flux 的每个订阅的地方始终如一地做到这一点。这对我来说看起来不太好。我在这里缺少什么?
【问题讨论】:
-
在 Snippet 1 中,
main()方法在subscribe()方法 lambda 回调之前完成执行(因为执行是异步的)。如果你在最终的println()之前添加Thread.sleep(),你会看到它。
标签: java reactive-programming project-reactor