【问题标题】:Reactive-Kafka : How to pause the consumer on exception and retry on demandReactive-Kafka:如何在异常时暂停消费者并按需重试
【发布时间】:2017-06-20 09:47:26
【问题描述】:

我已经在 Google 网上论坛上问过 this 问题,但尚未收到任何回复。所以在这里为不同的观众发布这个。

我们正在为我们的应用程序使用 Reactive-Kafka。我们有一个如下场景,如果在处理消息时发生任何异常,我们希望停止向消费者发送消息。该消息应在规定的时间后或在消费者方的明确要求下重试。假设使用我们目前的方法,如果消费者的数据库关闭了一段时间,它仍然会尝试从 kafka 读取并处理消息,但由于数据库问题而导致处理失败。这将使应用程序不必要地忙碌。取而代之的是,我们希望暂停消费者以在规定的时间内接收消息(例如,等待 30 分钟重试)。 我们不确定如何处理这种情况。

是否可以这样做?我错过了什么吗?

这是从响应式 kafka 中获取的示例代码:

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .mapAsync(1) { msg =>
        Future {
          /**
            * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
            */
        }.map(_ => msg.committableOffset).recover {
          case ex => {
            /**
              * HOW TO DO ????????
              * On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time
              * or on demand from the last committed offset
              */
            throw ex
          }
        }
      }
      .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
        batch.updated(elem)
      }
      .mapAsync(3)(_.commitScaladsl())
      .runWith(Sink.ignore)

【问题讨论】:

    标签: scala akka-stream reactive-kafka


    【解决方案1】:

    注意,您可能需要将src 的具体化值从akka.kafka.scaladsl.Consumer.Control 映射到akka.NotUsed,以便在recoverWithRetries 中引用它:

    val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .mapAsync(1) { msg =>
        Future {
          /**
            * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
            */
        }.map(_ => msg.committableOffset)
      .mapMaterializedValue(_ => akka.NotUsed)
    

    【讨论】:

      【解决方案2】:

      为此目的有一个recoverWithRetries 组合器。如需参考,请参阅this answerdocs

      你可以提取你的源代码

      val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
            .mapAsync(1) { msg =>
              Future {
                /**
                  * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
                  */
              }.map(_ => msg.committableOffset)
      

      然后做

      src
        .recoverWithRetries(attempts = -1, {case e: MyDatabaseException => 
          logger.error(e)
          src.delay(30.minutes, DelayOverflowStrategy.backpressure)})
        ...
      

      (Retrying with attempt=-1 表示无限期重试)

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-09-01
        • 2016-06-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-06-19
        • 1970-01-01
        • 2017-09-23
        相关资源
        最近更新 更多