【问题标题】:Reactive Redis does not continually publish changes to the FluxReactive Redis 不会持续发布对 Flux 的更改
【发布时间】:2020-02-14 13:00:23
【问题描述】:

我试图在我的 redis 有序列表上获取实时更新,但没有成功。 似乎它获取了所有项目并仅在最后一个项目上结束。 我希望客户在我的订购列表中不断获取新订单的更新。 我错过了什么?

这是我的代码:

@RestController
class LiveOrderController {

    @Autowired
    lateinit var redisOperations: ReactiveRedisOperations<String, LiveOrder>

    @GetMapping(produces = [MediaType.TEXT_EVENT_STREAM_VALUE], value = "/orders")
    fun getLiveOrders(): Flux<LiveOrder> {
        val zops = redisOperations?.opsForZSet()
        return zops?.rangeByScore("orders", Range.unbounded())
    }
}

【问题讨论】:

  • zops 是什么数据类型?
  • zops 是一个 ReactiveZSetOperations
  • 那个方法并没有像你想象的那样做。您需要使用 pub/sub 或 keyspace 通知来监听更改。值得一读响应式 API 的作用及其动机 - github.com/lettuce-io/lettuce-core/wiki/Reactive-API-(5.0)

标签: spring-boot kotlin redis reactive-programming spring-webflux


【解决方案1】:

Redis 中没有这样的功能。首先,有序集合的响应式检索只是获取快照,但您的调用是以响应式方式进行的。所以你需要订阅。

如果您像这样选择keyspace notificationsK - 启用键空间通知,z - 包括 zset 命令):

config set notify-keyspace-events Kz

并像这样在您的服务中订阅它们:

  ReactiveRedisMessageListenerContainer reactiveRedisMessages;
  // ...
  reactiveRedisMessages.receive(new PatternTopic("__keyspace@0__:orders"))
      .map(m -> {
        System.out.println(m);
        return m;
      })
      <further processing>

您会看到如下消息:PatternMessage{channel=__keyspace@0__:orders, pattern=__keyspace@0__:orders, message=zadd}。它会通知您已添加某些内容。你可以以某种方式对此做出反应 - 再次获得全套,或者只获得一部分(头部/尾部)。您甚至可以记住上一组,获取新的并发送差异。

但我真正建议的是以某种方式重新架构流程以直接使用 Redis Pub/Sub 功能。例如:发布服务而不是直接调用zadd 将调用eval,这将发出2 个命令:zadd orders 1 xpublish orders "1:x"(您想要的任何自定义消息,可能是JSON)。

然后在您的代码中,您将像这样订阅您的自定义主题:

return reactiveRedisMessages.receive(new PatternTopic("orders"))
      .map(LiveOrder::fromNotification);

【讨论】:

    猜你喜欢
    • 2019-10-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-05-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-13
    相关资源
    最近更新 更多