【问题标题】:Unit-testing a Flux.take(Duration duration) with StepVerifier使用 StepVerifier 对 Flux.take(Duration duration) 进行单元测试
【发布时间】:2017-06-20 15:42:49
【问题描述】:

我正在使用 Spring Reactor Core 3.0.6,并且我有一个返回 Flux 的方法:

public Flux<Foo> createFlux(){
    return Flux.<List<Foo>,String>generate(/* generator omitted for clarity's sake */ )
        .take(Duration.ofSeconds(10)
        .flatMap(Flux::fromIterable);
}

生成器函数调用分页的 REST api 来获取结果,如果 API 继续返回数据,我希望 Flux 只运行 10 秒。

它工作正常,但我想创建一些单元测试,但我无法创建一个测试来验证 Flux 最多只能运行 10 秒。

我模拟了其余的服务,让它总是返回数据并写了这个:

StepVerifier.withVirtualTime(() -> createFlux())
    .thenAwait(Duration.ofSeconds(10))
    .verifyComplete();

但它失败了:

java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext([my toString() Foo bean]))

我想我应该以某种方式使用生成的项目,但我找不到正确的 StepVerifier 方法来执行此操作。

编辑

我尝试使用thenConsumeWhile 跳过每个项目:

StepVerifier.withVirtualTime(() -> createFlux())
    .thenAwait(Duration.ofSeconds(10))
    .thenConsumeWhile(t -> true)
    .verifyComplete();

但现在测试只是无限期地运行并且永远不会结束。

【问题讨论】:

    标签: java spring unit-testing project-reactor


    【解决方案1】:

    生成器实际上可能非常重要...StepVerifier 受限于无限序列,在使用虚拟时间时更是如此。问题是生成器和thenAwait 都在主线程中运行,因此生成器是无限的会阻止 stepverifier 提前时间,从而防止序列超时。

    由于您想测试take 的持续时间,我认为虚拟时间不合适(您正在测试时间的模拟)。我会让 createFlux 方法可以使用持续时间参数化,并执行 StepVerifier.create(),持续时间要短得多。

    如果你真的想使用某种形式的虚拟时间,我发现让它工作的最低要求是

    1. 通过在测试开始时实例化Scheduler 来隔离非虚拟线程上的生成器循环,然后在StepVerifier 的Supplier 中使用subscribeOn(scheduler)
    2. 首先调用.expectNextCount(1),确保在尝试提前时间之前订阅所有内容并且数据开始流动。

    像这样:

    public Flux<Integer> createFlux() {
        return Flux.<List<Integer>>generate(sink -> {
            sink.next(Arrays.asList(1, 2, 3));
        })
                .take(Duration.ofSeconds(10))
                .flatMap(Flux::fromIterable);
    }
    
    @Test
    public void so44657525() throws InterruptedException {
        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger adder = new AtomicInteger();
    
        StepVerifier.withVirtualTime(() -> createFlux()
                .subscribeOn(scheduler)
                .doOnNext(v -> adder.incrementAndGet())
        )
                    .expectNextCount(1)
                    .thenAwait(Duration.ofSeconds(10))
                    .thenConsumeWhile(t -> true)
                    .verifyComplete();
    
        System.out.println("Total number of values in generated lists: " + adder.get());
    }
    

    expectNextCount(1) 修改为expectNextCount(100_000),我运行了一个打印Total number of values in generated lists: 102405 并花费了40 毫秒的运行。

    【讨论】:

    • 谢谢,确实有效。我只做了一个更改:我使用.expectComplete().verify(Duration.ofSeconds(10L)) 而不是.verifyComplete()。这样,如果 Flux 运行超过 10 秒,就会抛出断言错误。使用 verifyComplete 它可以无限期地运行。如果助焊剂在预期的 10 秒内运行,则两种解决方案都可以。
    【解决方案2】:

    如果您错过了reference guide,也许reference guide 可以让您走上正确的道路?

    除了最常见的expectNext(您必须对序列中的每个项目重复)之外,如果您知道元素的数量,您可以使用expectNextCount,或者使用thenConsumeWhile基于谓词跳过元素,

    【讨论】:

    • 我在 3.0.6 或 3.0.7 中找不到consumeNextWhile,我想它变成了thenConsumeWhile;因为我不想对序列做出断言,所以我在thenAwait 之后放了一个.thenConsumeWhile(t-&gt;true),以跳过每个元素并只检查持续时间,但测试会无限期运行
    猜你喜欢
    • 2022-06-10
    • 2016-07-20
    • 1970-01-01
    • 2023-04-02
    • 2020-03-03
    • 1970-01-01
    • 2014-08-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多