【问题标题】:kafka only consume message after specified timekafka 只在指定时间后消费消息
【发布时间】:2020-01-28 19:09:48
【问题描述】:

我有一个卡夫卡话题。

消息被放到主题上1个月后,我必须采取行动。

为此,我循环执行以下操作:

  1. 投票卡夫卡
  2. 处理一个多月前发布的所有消息
  3. 提交最新此类消息的偏移量+1
  4. 重复

这实际上不起作用,因为 poll 从它停止的地方返回消息,并忽略提交,除非发生重新平衡。

所以我必须缓冲未读消息。 但是我仍然必须调用 poll,否则 kafka 会假设你已经死了,并且会重新平衡。每次民意调查都会返回更多数据。 所以这意味着我最终将在这个缓冲区中存储大量数据,这不太理想。

理想的做法是告诉 kafka “我还活着”,而不是实际要求更多信息。这样我可以循环直到缓冲区为空,然后才请求更多消息。

这是我的代码的样子:

  def run(): Unit = {
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer = new KafkaConsumer[Long, Array[Byte]](properties, new LongDeserializer().asInstanceOf[Deserializer[Long]], new ByteArrayDeserializer)
    consumer.subscribe(List(topic).asJava)
    while (true) {
      val pollResult = consumer.poll(Duration.ofSeconds(1))

      val commitMap = for (partition <- pollResult.partitions().asScala) yield {
        val records = pollResult.records(partition).asScala
        val record = records.find(record => {
          if (record.timestamp() + secondsInMonth * 1000 < Instant.now.toEpochMilli) {
            DoAction(record)
            false
          }
          else {
            true
          }
        })
        val offset = new OffsetAndMetadata(record.map(_.offset()).getOrElse(records.last.offset() + 1))
        (partition, offset)
      }

      consumer.commitSync(commitMap.toMap.asJava, Duration.ofSeconds(1))
    }
  }

【问题讨论】:

  • 默认情况下,您的消息会在 7 天后消失...您可以在不提交的情况下进行轮询,这只会使其重新轮询现有数据。您还可以在消费者上调用 pause 以完全阻止它进行轮询。投票时间超过 1 秒也可能有意义
  • 我们在这里使用更长的保留时间:-)。如果您在未提交的情况下进行轮询,则在上次轮询数据之后继续。
  • 好吧,你不必立即在循环内轮询,你可以continue 没有轮询的循环,或者你可以使用ExecutorService 来安排一个实际的轮询事件。或者改变循环条件在某个时间窗口内被调用。
  • 但我必须在某个时间窗口内进行轮询,否则经纪人会重新平衡。这意味着我最终会在内存中存储大量消息。
  • 就像我说的,你可以暂停一个消费者,然后你就不必在 apk 消费者线程暂停后轮询和重新平衡

标签: apache-kafka


【解决方案1】:

您可以使用consumer.seek 确保投票从您停止的地方继续进行。所以代码将被修改为:

  def run(): Unit = {
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer = new KafkaConsumer[Long, Array[Byte]](properties, new LongDeserializer().asInstanceOf[Deserializer[Long]], new ByteArrayDeserializer)
    consumer.subscribe(List(topic).asJava)
    while (true) {
      val pollResult = consumer.poll(Duration.ofSeconds(1))

      val commitMap = for (partition <- pollResult.partitions().asScala) yield {
        val records = pollResult.records(partition).asScala
        val record = records.find(record => {
          if (record.timestamp() + secondsInMonth * 1000 < Instant.now.toEpochMilli) {
            DoAction(record)
            false
          }
          else {
            true
          }
        })
        val offset = new OffsetAndMetadata(record.map(_.offset()).getOrElse(records.last.offset() + 1))
        consumer.seek(partition, offset)
        (partition, offset)
      }

      consumer.commitSync(commitMap.toMap.asJava, Duration.ofSeconds(1))
    }
  }

【讨论】:

    猜你喜欢
    • 2015-07-17
    • 1970-01-01
    • 1970-01-01
    • 2015-04-19
    • 2019-08-16
    • 1970-01-01
    • 2017-09-23
    • 1970-01-01
    • 2022-06-15
    相关资源
    最近更新 更多