【发布时间】:2022-02-02 01:26:54
【问题描述】:
我创建了一个增量为 5 的数据库序列,允许我在内存中保留一系列值,并将我对数据库的访问次数除以 5。(我的真实案例是 100000 个值)
我的 R2DBC 服务将调用 R2DBC API 并执行以单声道返回 long 的查询。
public class SequenceService {
public Mono<Long> nextVal() {
return r2dbc.query("select nextval('mysequence')");
}
}
假设我们在测试中调用此服务
sequenceService = new SequenceService( ... );
assertTrue(sequenceService.nextVal().block() == 10L);
assertTrue(sequenceService.nextVal().block() == 15L);
assertTrue(sequenceService.nextVal().block() == 20L);
我想要做的是我的 nextVal 调用可以填充值之间,所以
assertTrue(sequenceService().nextVal().block() == 10L);
assertTrue(sequenceService().nextVal().block() == 11L);
assertTrue(sequenceService().nextVal().block() == 12L);
assertTrue(sequenceService().nextVal().block() == 13L);
assertTrue(sequenceService().nextVal().block() == 14L);
assertTrue(sequenceService().nextVal().block() == 15L);
assertTrue(sequenceService().nextVal().block() == 16L);
assertTrue(sequenceService().nextVal().block() == 17L);
assertTrue(sequenceService().nextVal().block() == 18L);
assertTrue(sequenceService().nextVal().block() == 19L);
assertTrue(sequenceService().nextVal().block() == 20L);
像下面这样编码是行不通的,因为你总是会得到“n+0”,而永远不会得到“n+1”、“n+2”、“n+3”、“n+4”:
public class SequenceService {
public Mono<Long> nextVal() {
return r2dbc.query("select nextval('mysequence')")
.map(n -> Flux.range(0,5)).map(i -> n + i)).repeat().next();
}
}
我不知道如何从一个原始单声道中生成许多其他单独的单声道。
我必须保留某种状态,即通量、当前序列值的状态,但不知道该怎么做。
【问题讨论】:
-
您对问题的解释不是很清楚,
I had a working solution in JDBC该解决方案的代码在哪里?I tried to use shareNext() share() repeat()代码在哪里?为什么它不起作用?I also tried using a field with a AtomicReference<Mono>这段代码在哪里?But I fail to have something that works这没有帮助,更确切地说是什么失败了。不仅“失败了”。it didn't made much sense to do that.做什么没有意义? -
我清理了帖子,宁愿忘记现有代码并使用正确的反应器概念重新开始。
-
你正在以一种命令式的方式处理这个问题。 Flux 是一种列表,它不是。它是您订阅的东西,它会在它拥有它们时为您提供价值,并且当它用完项目时,订阅就会结束。我不能给你写一个代码示例,因为我仍然不明白你想要什么,但我认为你应该使用projectreactor.io/docs/core/release/reference/… 生成你的序列
-
我的数据源是通过 r2dbc 查询获得的 Mono
。我不能使用 Flux.generate 。 -
请提供一个工作示例来演示您的问题,现在您的代码主要包括对未知 API 的调用,例如
sequenceDao.getSequenceIncrementsequenceDao.nextVal("mysequence")sequenceDao.getIncrement("mysequence")和SequenceState.builder().dbSeq(seq).increment(inc).virtual(seq).build()如上所述,您的解释不清楚,您提供的所有代码都是不可运行的。如果您有问题,请阅读此stackoverflow.com/help/minimal-reproducible-example 祝您好运
标签: project-reactor reactor r2dbc