【问题标题】:What is correct way to generate and handle exceptions when using Flux from projectreactor从 projectreactor 使用 Flux 时生成和处理异常的正确方法是什么
【发布时间】:2019-03-06 13:50:54
【问题描述】:

我正在使用 io.projectreactor 3 (reactor-core 3.2.6.RELEASE),我注意到在错误处理方面存在一些差异。不幸的是,官方文档没有提供足够的细节来解决我的问题。

我有以下 4 个 sn-ps。在某些情况下,异常将被忽略,而在其他情况下,它会进一步抛出。实际产生和消费异常的方式是什么?

片段 1

在这种情况下,异常将被忽略,main() 将在不接收异常的情况下完成。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
        }).doOnNext(e -> {
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

输出:

DONE

片段 2

与上面的示例类似,只是我们不使用 Flux.push 而是使用 Flux.just。 Main() 将收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.just(
                1
        ).doOnNext(e -> {
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

输出:

Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
    at Scratch.lambda$main$1(scratch_15.java:10)
...

片段 3

我们通过调用 sink.error 来发出异常信号。 Main() 不会收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
            sink.error(new RuntimeException("HELLO WORLD"));
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

输出:

1
2
DONE

片段 4

我们直接抛出异常。 Main() 将收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

输出

1
2
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
    at Scratch.lambda$main$1(scratch_15.java:10)
...

在使用响应式核心时处理异常的正确方法是什么?唯一可靠的方法似乎根本不使用错误回调,而是用 try/catch 包围flux.subscribe。但是在那种情况下我总是收到UnsupportedOperationException而不是原始异常,然后我需要使用Exceptions.isErrorCallbackNotImplemented来检查它是否来自反应,提取嵌套异常然后抛出它。

这当然可以做到,但需要在我们使用 Flux 的每个订阅的地方始终如一地做到这一点。这对我来说看起来不太好。我在这里缺少什么?

【问题讨论】:

  • Snippet 1 中,main() 方法在 subscribe() 方法 lambda 回调之前完成执行(因为执行是异步的)。如果你在最终的println() 之前添加Thread.sleep(),你会看到它。

标签: java reactive-programming project-reactor


【解决方案1】:

在您的所有示例中,问题都是从 .subscribe(...) 错误处理 lambda 重新抛出的。

如果您希望在主块中引发异常,请使用block() 变体。

如果您想测试错误是否在整个管道中传播,请使用StepVerifier.create(pipeline).expectError(...).verify()

.subscribe 通常是为了获得“终端”状态的通知,而不是为了恢复或重新抛出错误(为此使用上游的 onError* 运算符)。

基于just 的示例似乎正确传播了异常,因为它们在订阅时不执行用户提供的代码,因此在subscribe(Consumer<Throwable>) 期间没有尝试/捕获。

push,如generate/create/defercompose,在订阅时执行一些用户定义的逻辑(Consumer<FluxSink>)。他们防范整个Consumer 抛出异常并尝试传播它(作为onError 信号)而不是直接抛出它。

但是,如果Consumer 的失败是在执行sink 的方法之一时导致的,那么如果subscriber 重新抛出可能会出现问题:我们进入一个递归,调用接收器调用接收器.当我们检测到水槽的递归耗尽时,我们通过退出来防止这种无限情况。

这就是为什么在sink.nextsink.error(示例1 和3)中触发错误的基于push 的示例无法在主程序中产生异常:

  1. Consumer 已应用
  2. sink.next 被调用并且 next 运算符创建异常 1,或者 sink.error 被调用
  3. 异常 1 到达 subscribe 并作为异常 2 重新抛出
  4. 这会使Consumer.apply 短路,异常2 被传递给sink.error
  5. sink 已经被调用,所以我们突破以避免无限递归
  6. 从未见过异常 2

另一方面,在示例 4 中,我们不再处于调用 sink 的方法的过程中,并且原始异常不会首先到达订阅者:

  1. Consumer 已应用
  2. 直接抛出异常1
  3. 这会使Consumer.apply 短路,异常1 被传递给sink.error
  4. 传播到订阅者
  5. 将其作为异常 2 重新抛出
  6. main 方法中出现异常 2

【讨论】:

  • 如果我理解正确,而不是使用.subscribe(this::finalConsumer),我应该使用.doOnNext(this::finalConsumer).blockLast() 来获得我的异常?
  • 取决于您想要与消费者达成的具体目标。主要目标应该是在您传递给subscribeConsumer<Throwable> 中消除throw。如果您这样做是为了测试异常,请将该进程替换为StepVerifier。如果异常包装/重新抛出有一些业务逻辑,请在 subscribe 之前使用 .onErrorMap 替换它。
  • 我不是在谈论测试,我真的希望在我订阅通量的地方抛出异常。但即使我使用.onErrorMap 然后.subscribe,我仍然不会得到我的异常,但会将异常打包到我不喜欢的reactor.core.Exceptions$ErrorCallbackNotImplemented 中。引发原始异常的唯一方法似乎是.doOnNext(this::finalConsumer).blockLast()。我没问题,我只需要习惯那个结构而不是使用.subscribe()
  • 是的,好吧,正如我所说:在 Reactor 中,投掷不是一等公民,并且仅在 lambda 中被接受为生活质量功能(使用 lambda 的操作员会将其转换为 @ 987654364@ 信号)。 subscribe(...) 不打算抛出,因为它可以异步执行(在这种情况下,抛出的异常可能会被忽略)。如果您希望订阅代码抛出,那么这意味着您真正想要的是恢复到命令式和阻塞式编程,这就是 .block* 运算符的目的(订阅并等待值或抛出异常)。跨度>
猜你喜欢
  • 2013-08-31
  • 2014-03-25
  • 2012-03-29
  • 1970-01-01
  • 2021-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多