【发布时间】:2021-05-23 06:50:49
【问题描述】:
由于 kafka 没有隐式支持延迟消息可见性,我们必须手动实现。
我正在尝试使用以下方法来实现它-
- 在发送消息时在消息负载中添加
delayInSeconds和eventTime字段
public class DelayedMessage {
String messageId;
Instant eventTime;
Long delayInSeconds;
}
- 收到消息后,计算应处理消息的确切实例
private Instant getInstantWhenToBeProcessed() {
return eventTime.plus(delayInSeconds, ChronoUnit.SECONDS);
}
- 根据上面的值和现在,计算消费者必须等待的时间
public long getTimeToBeDelayedInMillis() {
return Duration.between(Instant.now(), getInstantWhenToBeProcessed()).toMillis();
}
- 如果
timeToBeDelayed是肯定的,则暂停容器,等待持续时间并恢复容器 - 在同一线程上
@Service
public class KafkaManager {
@Autowired
private KafkaListenerEndpointRegistry registry;
public void pauseThenResumeAfter(long delayInMillis) throws InterruptedException {
this.pause();
log.info("Sleeping current thread by: {}ms", delayInMillis);
Thread.sleep(delayInMillis);
log.info("Waking up the thread");
this.resume();
// Throw exception, so that SeekToCurrentErrorHandle does a seek and tries to re-process
throw new IllegalArgumentException("Failed to process record, as they instantToProcess has not yet arrived. Will retry after backoff");
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
- 恢复调用后,抛出异常,这将使 SeekToCurrentErrorHandler 重试消息处理。 (如果不这样做,消费者就不会处理最后一条消息)
这一切正常,容器进入睡眠状态并根据上述逻辑恢复。但我有几个问题 -
- 我们是否需要在同一个线程上暂停和恢复容器?
- 是否有另一种方法来寻找当前的偏移量(不是通过使用 STCEH,就像这里所做的那样)?涉及的复杂性是什么?
- 上述实现在使用 ConcurrentListenerContaienrs 时会出现问题吗?
- 有没有办法根据消费者记录有效负载计算 STCEH 中的退避?如果是,那么是否会实现让这个 kafkaConsumer 在代理上保持活力(因为延迟不超过 max.poll.interval)?
- 最后 - 上述实现中可能存在哪些缺陷,这将使其成为不可行的选择。
谢谢!
【问题讨论】:
标签: spring-kafka