【问题标题】:Get pending messages with Redis Streams and Spring Data使用 Redis Streams 和 Spring Data 获取待处理消息
【发布时间】:2020-07-07 09:22:30
【问题描述】:

我在我的 Spring Boot 应用程序中使用 Redis Streams。在调度程序中,我经常想要获取所有待处理的消息并检查它们已经处理了多长时间,并在必要时重新触发它们。

我现在的问题是我可以获取待处理的消息,但我不确定如何获取有效负载。

我的第一种方法使用了pendingrange 操作。这里的缺点是,range 不会增加 totalDeliveryCount - 所以我不能使用 range 方法

val pendingMessages = stringRedisTemplate.opsForStream<String, Any>().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
    if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
            return@filter true
    } else {
        ...
        return@filter false
    }
}.map { //map from PendingMessage::class to a MapRecord with the content
    val map = stringRedisTemplate.opsForStream().range(redisStreamName, Range.just(it.idAsString)) // does not increase totalDeliveryCount !!!
    if (map != null && map.size > 0) { 
        return@map map[0]
    } else {
        return@map null
    }
}.filterNotNull().toList()

我的第二种方法使用了pendingread 操作。对于读取操作,我可以使用当前 ID 指定偏移量。问题是我只能得到高于指定 ID 的 ID。

val pendingMessages = stringRedisTemplate.opsForStream().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
    if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
            return@filter true
    } else {
        ...
        return@filter false
    }
}.map { //map from PendingMessage::class to a MapRecord with the content
    val map = stringRedisTemplate.opsForStream<String, Any>()
            .read(it.consumer, StreamReadOptions.empty().count(1),
                    StreamOffset.create(redisStreamName, ReadOffset.from(it.id)))
    if (map != null && map.size > 0 && map[0].id.value == it.idAsString) { // map[0].id.value == it.idAsString does not match
        return@map map[0]
    } else {
        return@map null
    }
}.filterNotNull().toList()

所以当我使用ReadOffset.from('1234-0') 时,我没有收到1234-0 的消息,而是该消息之后的所有内容。有没有办法获得确切的信息并尊重totalDeliveryCountelapsedTimeSinceLastDelivery 统计数据?

我正在使用 spring-data-redis 2.3.1.RELEASE

【问题讨论】:

    标签: kotlin redis spring-data-redis redis-streams


    【解决方案1】:

    我现在正在使用以下解决方法,这应该适用于大多数情况:

    return if (id.sequence > 0) {
                "${id.timestamp}-${id.sequence - 1}"
            } else {
                "${id.timestamp - 1}-99999"
            }
    

    这取决于每毫秒插入的消息不超过 99999 条这一事实。

    【讨论】:

      猜你喜欢
      • 2016-02-12
      • 2020-08-11
      • 2018-05-13
      • 1970-01-01
      • 1970-01-01
      • 2017-07-28
      • 1970-01-01
      • 1970-01-01
      • 2016-12-31
      相关资源
      最近更新 更多