【问题标题】:Generate many other individual mono out of one originating mono with R2DBC and Reactor使用 R2DBC 和 Reactor 从一个原始单声道生成许多其他单独的单声道
【发布时间】:2022-02-02 01:26:54
【问题描述】:

我创建了一个增量为 5 的数据库序列,允许我在内存中保留一系列值,并将我对数据库的访问次数除以 5。(我的真实案例是 100000 个值)

我的 R2DBC 服务将调用 R2DBC API 并执行以单声道返回 long 的查询。

public class SequenceService {
   public Mono<Long> nextVal() {
      return r2dbc.query("select nextval('mysequence')");
   }
}

假设我们在测试中调用此服务

   sequenceService = new SequenceService( ... );

   assertTrue(sequenceService.nextVal().block() == 10L);
   assertTrue(sequenceService.nextVal().block() == 15L);
   assertTrue(sequenceService.nextVal().block() == 20L);

我想要做的是我的 nextVal 调用可以填充值之间,所以

   assertTrue(sequenceService().nextVal().block() == 10L);
   assertTrue(sequenceService().nextVal().block() == 11L);
   assertTrue(sequenceService().nextVal().block() == 12L);
   assertTrue(sequenceService().nextVal().block() == 13L);
   assertTrue(sequenceService().nextVal().block() == 14L);
   assertTrue(sequenceService().nextVal().block() == 15L);
   assertTrue(sequenceService().nextVal().block() == 16L);
   assertTrue(sequenceService().nextVal().block() == 17L);
   assertTrue(sequenceService().nextVal().block() == 18L);
   assertTrue(sequenceService().nextVal().block() == 19L);
   assertTrue(sequenceService().nextVal().block() == 20L);

像下面这样编码是行不通的,因为你总是会得到“n+0”,而永远不会得到“n+1”、“n+2”、“n+3”、“n+4”:

public class SequenceService {

    public Mono<Long> nextVal() {
        return r2dbc.query("select nextval('mysequence')")
                        .map(n -> Flux.range(0,5)).map(i -> n + i)).repeat().next();
        }
}

我不知道如何从一个原始单声道中生成许多其他单独的单声道。

我必须保留某种状态,即通量、当前序列值的状态,但不知道该怎么做。

【问题讨论】:

  • 您对问题的解释不是很清楚,I had a working solution in JDBC 该解决方案的代码在哪里? I tried to use shareNext() share() repeat() 代码在哪里?为什么它不起作用? I also tried using a field with a AtomicReference&lt;Mono&gt; 这段代码在哪里? But I fail to have something that works 这没有帮助,更确切地说是什么失败了。不仅“失败了”。 it didn't made much sense to do that. 做什么没有意义?
  • 我清理了帖子,宁愿忘记现有代码并使用正确的反应器概念重新开始。
  • 你正在以一种命令式的方式处理这个问题。 Flux 是一种列表,它不是。它是您订阅的东西,它会在它拥有它们时为您提供价值,并且当它用完项目时,订阅就会结束。我不能给你写一个代码示例,因为我仍然不明白你想要什么,但我认为你应该使用projectreactor.io/docs/core/release/reference/… 生成你的序列
  • 我的数据源是通过 r2dbc 查询获得的 Mono。我不能使用 Flux.generate 。
  • 请提供一个工作示例来演示您的问题,现在您的代码主要包括对未知 API 的调用,例如 sequenceDao.getSequenceIncrement sequenceDao.nextVal("mysequence") sequenceDao.getIncrement("mysequence")SequenceState.builder().dbSeq(seq).increment(inc).virtual(seq).build() 如上所述,您的解释不清楚,您提供的所有代码都是不可运行的。如果您有问题,请阅读此stackoverflow.com/help/minimal-reproducible-example 祝您好运

标签: project-reactor reactor r2dbc


【解决方案1】:

一种可能性是创建一个将生成下一个值的 Flux,就像您所做的那样:

    r2dbc.query("select nextval('mysequence')")
        .flatMapMany(next -> Flux.range(0, 5).map(n -> next + n))
        .repeat()

然后立即订阅此通量,使用自定义订阅者,每次您需要下一个值时,该订阅者将使用 request(1) 方法按需请求下一个值。

但是,因为您的方法nextVal() 可能会被同时调用,所以您需要一种机制,例如等待列表。

这是一个完整的例子:

public class SequenceService {
        
    private Subscription subscription;
    private LinkedList<MonoSink<Long>> waitingList = new LinkedList<>();
    private Throwable error;
        
    public SequenceService() {
        r2dbc.query("select nextval('mysequence')")
            .flatMapMany(next -> Flux.range(0, 5).map(n -> next + n))
            .repeat()
            .subscribe(new Subscriber<Long>() {
                @Override
                public void onSubscribe(Subscription s) {
                    subscription = s;
                }
    
                @Override
                public void onNext(Long val) {
                    boolean needRequest;
                    MonoSink<Long> sink;
                    synchronized (waitingList) {
                        sink = waitingList.poll();
                        needRequest = !waitingList.isEmpty();
                    }
                    if (sink != null) {
                        sink.success(val);
                    }
                    if (needRequest)
                        subscription.request(1);
                }
    
                @Override
                public void onError(Throwable t) {
                    error = t;
                    synchronized (waitingList) {
                        MonoSink<Long> sink;
                        while ((sink = waitingList.poll()) != null)
                            sink.error(t);
                    }
                }
    
                @Override
                public void onComplete() {
                }
            });
    }
    
    public Mono<Long> nextVal() {
        return Mono.create(sink -> {
            boolean needRequest;
            synchronized (waitingList) {
                if (error != null) {
                    sink.error(error);
                    return;
                }
                waitingList.push(sink);
                needRequest = waitingList.size() == 1;
            }
            if (needRequest)
                subscription.request(1);
        });
    }
}

【讨论】:

  • 感谢您的解决方案,没想到会这么复杂。太糟糕了,没有一些预先存在的结构可以做到这一点。我还不明白水槽是什么,到目前为止我可以不用这个。
  • 我实现了它,但是在并行使用 nextVal() 函数的情况下它会出现内存不足。我试图让它更安全,但它应该做的事情太慢了。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-17
  • 2019-03-05
  • 1970-01-01
相关资源
最近更新 更多