【问题标题】:Project Reactor: buffer with parallel executionProject Reactor:并行执行的缓冲区
【发布时间】:2022-01-02 02:46:03
【问题描述】:

我需要将日期从一个来源(并行)批量复制到另一个来源。

我这样做了:

 Flux.generate((SynchronousSink<String> sink) -> {
                    try {
                        String val = dataSource.getNextItem();
                        if (val == null) {
                            sink.complete();
                            return;
                        }
                        sink.next(val);

                    } catch (InterruptedException e) {
                        sink.error(e);
                    }
                })
                .parallel(4)
                .runOn(Schedulers.parallel())
                .doOnNext(dataTarget::write)
                .sequential()
                .blockLast();
class dataSource{
  public Item getNextItem(){ 
    //...
  }
}
class dataTarget{
  public void write(List<Item> items){ 
    //...
  }
}

它并行接收数据,但一次写入一个。

我需要分批收集它们(比如按 10 个项目),然后编写批处理。

我该怎么做?

更新:

主要思想来源是适合高效发送消息的消息系统(即rabbitmq或nats),但目标是插入批处理效率更高的数据库。

所以最终的结果应该是——我并行接收消息,直到缓冲区没有填满,然后我将所有缓冲区一次性写入数据库。

在常规 java 中很容易做到,但在流的情况下——我不知道该怎么做。如何缓冲数据以及如何暂停读取器,直到写入器尚未准备好获取下一部分。

【问题讨论】:

  • “来源”是什么意思。什么样的来源?如果您显示 write 方法会有所帮助。
  • @lkatiforis 我从消息系统 (nats.io) 中并行读取消息,我需要每次插入将 1000 条记录放入数据库。更新了问题

标签: java reactive-programming project-reactor


【解决方案1】:

您只需要Flux#buffer(int maxSize) 操作员:

Flux.generate((SynchronousSink<String> sink) -> {
        try {
            String val = dataSource.getNextItem();
            if (val == null) {
                sink.complete();
                return;
            }
            sink.next(val);

        } catch (InterruptedException e) {
            sink.error(e);
        }
    })
    .buffer(10) //Flux<List<String>>
    .flatMap(dataTarget::write)
    .blockLast();

class DataTarget{
    public Mono<Void> write(List<String> items){
         return reactiveDbClient.insert(items);
    }
}

在这里,buffer 将项目收集到多个 List 的 10 个项目(批次)中。您不需要使用并行调度程序。 flatmap 将异步运行这些操作。见Understanding Reactive’s .flatMap() Operator

【讨论】:

  • 这正是我所需要的!如此简单和简短)谢谢你
【解决方案2】:

您需要在单个 Publisher-s 中完成繁重的工作,这些工作将在 flatMap() 中并行实现。像这样

Flux.generate((SynchronousSink<String> sink) -> {
    try {
        String val = dataSource.getNextItem();
        if (val == null) {
            sink.complete();
            return;
        }
        sink.next(val);

    } catch (InterruptedException e) {
        sink.error(e);
    }
})
.parallel(4)
.runOn(Schedulers.parallel())
.flatMap(item -> Mono.fromCallable(() -> dataTarget.write(item)))
.sequential()
.blockLast();

【讨论】:

  • 这并不能解决我的问题。例如,我从消息代理中读取,在读取一策略中效率很高,但随后我将数据写入数据库,在批量插入时有效,而不是一次插入一行。这就是为什么我需要收集 1000 件物品,然后写一次。
  • 我明白了。好的,我以为你想并行编写(你也可以批量编写)。在接受的答案中,如果目标比源慢得多但具有无限的水平可扩展性(例如 AWS S3),您将遇到瓶颈。
  • 说得好
【解决方案3】:

最好的方法(从算法的角度来看)是使用 ringbuffer 并使用 microbatching 技术。对 ringbuffer 的写入是从 rabbitmq 一个接一个(或多个并行)完成的。读取线程(仅单个)将一次​​获取所有消息(在批处理启动时呈现),将它们插入数据库并再次执行...一次意味着单个消息(如果只有一个)或一堆它们(如果它们是在上次插入的持续时间足够长时累积的)。

这种技术也用在 jdbc 中(如果我没记错的话),并且可以使用 java 中的 lmax 中断器库轻松实现。

示例项目(使用 ractor /Flux/ 和 System.out.println)可以在 https://github.com/luvarqpp/reactorBatch 上找到

核心代码:

    final Flux<String> stringFlux = Flux.interval(Duration.ofMillis(1)).map(x -> "Msg number " + x);

    final Flux<List<String>> stringFluxMicrobatched = stringFlux
            .bufferTimeout(100, Duration.ofNanos(1));

    stringFluxMicrobatched.subscribe(strings -> {
        // Batch insert into DB
        System.out.print("Inserting in batch " + strings.size() + " strings.");
        try {
            // Inserting into db is simulated by 10 to 40 ms sleep here...
            Thread.sleep(rnd.nextInt(30) + 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(" ... Done");
    });

欢迎使用技术名称和参考来编辑和改进这篇文章。这是社区维基...

【讨论】:

  • 你能举个例子吗?
猜你喜欢
  • 1970-01-01
  • 2018-09-26
  • 2019-03-25
  • 2016-02-19
  • 2021-12-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-14
相关资源
最近更新 更多