【问题标题】:How to reassign a variable in reactive Flux stream?如何在反应通量流中重新分配变量?
【发布时间】:2021-03-02 14:05:21
【问题描述】:

我正在向网络服务发送请求,将结果转换为大的 csv 并将 csv 行保存到数据库中。

由于请求长时间运行(10-20 秒),我想并行化请求。 我将所有数据收集在一个包含转换后的 csv 行的 StringBuilder 中。

问题:如果在 csv 中达到我的 1000 行块,我如何将数据取出以进行持久化,而任何其他并发响应将写入新的 StringBuilder

因为,流的最终变量无法重新初始化。

final StringBuilder sb = new StringBuilder();
AtomicInteger count = new AtomicInteger();

Flux.fromIterable(requests)
    .flatMap(req -> {
        return webClientService.send(req); //assume long running response
    }, 8) //send 8 requests in parallel, as response takes up to 10s
    .map(rsp -> {
        //convert response to csv values and add to StringBuilder
        int c = addCsv(sb, rsp);
        if (count.addAndGet(c) > 1000) {
            //TODO how can I assign a new StringBuilder,
            //so that all further finished responses will append the csv to the new builder?
            //same problem with the counter.
            
            databaseWriter.write(sb.build()); //writes the content so far to db, but not threadsafe so far
        }
        return c;
    })
    .blockLast();
    

【问题讨论】:

  • addCsv 做了什么,存储的格式是什么样的。你得到了你不满意的答案,因为你的解释不够清楚。如果您想避免某些事情等,请写出ALL的要求,这样我们就不必猜测了。
  • 如所写,addCsv 将响应的结果作为 csv 行添加到 StringBuilder

标签: java spring java-stream spring-webflux project-reactor


【解决方案1】:

也许您可以尝试完全避免副作用,例如类似于:

.map(x -> toCsv(x))
.reduce((a, b) -> {
    if (length(a) < 1000) {
        return concat(a, b);
    }
    databaseWriter.write(a);
    return b;
})
.doOnNext(x -> databaseWriter.write(x))

【讨论】:

  • 但这又意味着我不能使用全局 StringBuilder,但会产生许多 String 对象。我试图避免这种情况。
【解决方案2】:

在我看来,您可以使用内置运算符来实现相同的结果:

      Flux.fromIterable(requests)
            .flatMap(req -> webClientService
                    .send(req)
                    .subscribeOn(Schedulers.boundedElastic()), 8)// subscribeOn to subscribe from different threads
            .map(resp -> converToCsvLine(resp)) //make some transformations on the respnse
            .window(1000) //split incoming data into 1000 lines
            .flatMap(stringFlux -> stringFlux.collect(Collectors.joining("\n")))// collect last 1000
            .flatMap(s -> Mono.fromRunnable(() -> writeToDb(s))) //do some logic on the collected 1000 lines
            .blockLast();

【讨论】:

  • 这意味着我必须为每一行创建一个String 对象。但这正是我试图通过使用 StringBuilder 来避免的。
  • 您可以创建自己的收藏家:.flatMap(stringFlux -&gt; stringFlux.collect(StringBuilder::new,StringBuilder::append))
  • 或类似.window(1000).flatMap(stringFlux -&gt; stringFlux.collect(StringBuilder::new, (sb, resp) -&gt; addCsv(sb, resp)))
猜你喜欢
  • 1970-01-01
  • 2017-09-13
  • 1970-01-01
  • 2023-02-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多