【问题标题】:Implement delayed message processing with spring-kafka使用 spring-kafka 实现延迟消息处理
【发布时间】:2021-05-23 06:50:49
【问题描述】:

由于 kafka 没有隐式支持延迟消息可见性,我们必须手动实现。

我正在尝试使用以下方法来实现它-

  1. 在发送消息时在消息负载中添加delayInSecondseventTime 字段
public class DelayedMessage {
    
  String messageId;
  Instant eventTime;
  Long delayInSeconds;
}
  1. 收到消息后,计算应处理消息的确切实例
private Instant getInstantWhenToBeProcessed() {
  return eventTime.plus(delayInSeconds, ChronoUnit.SECONDS);
}
  1. 根据上面的值和现在,计算消费者必须等待的时间
public long getTimeToBeDelayedInMillis() {
  return Duration.between(Instant.now(), getInstantWhenToBeProcessed()).toMillis();
}
  1. 如果 timeToBeDelayed 是肯定的,则暂停容器,等待持续时间并恢复容器 - 在同一线程上
@Service
public class KafkaManager {
  
  @Autowired
  private KafkaListenerEndpointRegistry registry;
  
  public void pauseThenResumeAfter(long delayInMillis) throws InterruptedException {

    this.pause();

    log.info("Sleeping current thread by: {}ms", delayInMillis);
    Thread.sleep(delayInMillis);
    log.info("Waking up the thread");

    this.resume();

    // Throw exception, so that SeekToCurrentErrorHandle does a seek and tries to re-process
    throw new IllegalArgumentException("Failed to process record, as they instantToProcess has not yet arrived. Will retry after backoff");
  }

  public void pause() {
    registry.getListenerContainers().forEach(MessageListenerContainer::pause);
  }

  public void resume() {
    registry.getListenerContainers().forEach(MessageListenerContainer::resume);
  }
}
  1. 恢复调用后,抛出异常,这将使 SeekToCurrentErrorHandler 重试消息处理。 (如果不这样做,消费者就不会处理最后一条消息)

这一切正常,容器进入睡眠状态并根据上述逻辑恢复。但我有几个问题 -

  1. 我们是否需要在同一个线程上暂停和恢复容器?
  2. 是否有另一种方法来寻找当前的偏移量(不是通过使用 STCEH,就像这里所做的那样)?涉及的复杂性是什么?
  3. 上述实现在使用 ConcurrentListenerContaienrs 时会出现问题吗?
  4. 有没有办法根据消费者记录有效负载计算 STCEH 中的退避?如果是,那么是否会实现让这个 kafkaConsumer 在代理上保持活力(因为延迟不超过 max.poll.interval)?
  5. 最后 - 上述实现中可能存在哪些缺陷,这将使其成为不可行的选择。

谢谢!

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    如果你从监听线程调用pauseThenResumeAfter(),它没有效果;在侦听器方法执行之前,容器实际上并没有暂停。

    你应该看看新的 2.7 功能Non-blocking retries

    您应该能够重复使用它的一些基础架构来实现您的要求。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-17
      • 2019-07-26
      • 2018-06-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多