【问题标题】:Synchronized reactive chain with Reactor与 Reactor 同步的反应链
【发布时间】:2020-06-18 13:55:32
【问题描述】:

我有以下反应链:

CacheMono.lookup(key -> this.retrieveFromCache(tenantId, key).map(Signal::next), carId)
    .onCacheMissResume(this.retrieveFromService(tenantId, carId))
    .andWriteWith((key, signal) ->
        Mono.fromRunnable(() -> this.carCache.cacheAndGetCar(key, signal.get(), tenantId)));

当一次只有一个请求进入时一切正常,但如果有很多请求进入,retrieveFromService 会被多次调用。

如何限制每个tenantId 一次只有一个请求可以调用retrieveFromService,而其他请求使用第一个请求的缓存响应?

【问题讨论】:

  • 这段代码怎么叫?作为 HTTP 请求的一部分?您可能会研究的一件事是 Resilience4j 隔板,它可以让您限制并发:resilience4j.readme.io/docs/bulkhead
  • 是的,它是HTTP请求的一部分,并调用另一个API来检索信息(retrieveFromService)来调用第三个API。我已经在使用 Resilience4j CircuitBreaker,我会看看bulkhead。
  • 虽然可能很棘手的是您必须在舱壁中定义一些延迟(maxWaitDuration),我不确定反应器版本是否支持。 ratelimiter 确实支持这一点。

标签: java spring-boot concurrency project-reactor reactor


【解决方案1】:

尝试使用惰性版本:https://projectreactor.io/docs/extra/snapshot/api/reactor/cache/CacheMono.MonoCacheBuilderCacheMiss.html#onCacheMissResume-java.util.function.Supplier-

CacheMono.lookup(key -> this.retrieveFromCache(tenantId, key).map(Signal::next), carId)
    .onCacheMissResume(() -> this.retrieveFromService(tenantId, carId))
    .andWriteWith((key, signal) ->
        Mono.fromRunnable(() -> this.carCache.cacheAndGetCar(key, signal.get(), tenantId)));

或相同但使用惰性 Mono.defer

CacheMono.lookup(key -> this.retrieveFromCache(tenantId, key).map(Signal::next), carId)
    .onCacheMissResume(Mono.defer(() -> this.retrieveFromService(tenantId, carId)))
    .andWriteWith((key, signal) ->
        Mono.fromRunnable(() -> this.carCache.cacheAndGetCar(key, signal.get(), tenantId)));

【讨论】:

    猜你喜欢
    • 2020-03-22
    • 2021-12-07
    • 1970-01-01
    • 1970-01-01
    • 2021-09-05
    • 2017-02-15
    • 2016-07-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多