【问题标题】:How to limit number of records in Kafka-consumer如何限制 Kafka-consumer 中的记录数
【发布时间】:2018-12-03 11:52:45
【问题描述】:

我正在使用 confluent Kafka-rest 产品来使用主题中的记录。我的意图是只使用主题的前 100 条记录。我正在使用以下 REST API 来获取记录

GET /consumers/testgroup/instances/my_consumer/records

如何做到这一点?有什么想法吗?

【问题讨论】:

  • 您正在使用一个保持消费者偏移量的消费者组,因此当您请求新记录时,您不会获得该主题的第一条记录。您会获得尚未使用的新记录。您真的想要主题的前 100 条记录,还是想要在每个 API 休息调用中消耗 100 条记录?
  • 另外,您可以与GET records endpoint 一起使用的唯一大小控制参数似乎是max_bytes,它不会直接转换为记录数,但应该适合您。
  • 不认为这是可能的:Consumer configuration - Although consumer instances are not shared, they do share the underlying server resources. Therefore, limited configuration options are exposed via the API. However, you can adjust settings globally by passing consumer settings in the REST Proxy configuration.docs.confluent.io/current/kafka-rest/docs/config.html 没有提及任何相关设置

标签: apache-kafka confluent-platform kafka-rest


【解决方案1】:

据我所知,目前这是不可能的。如另一个答案中所述,您可以指定最大字节数(尽管在某些情况下实际上可以被代理忽略),但您无法指定所需的消息数量。

但是,这样的功能可以很容易地在您的客户端代码中实现。您可以猜测一个粗略的大小,查询 REST API 并查看您收到了多少条消息。如果小于 100,则再次查询以获取接下来的几条消息,直到达到 100。

【讨论】:

  • 好的。但我不知道传入的数据。它是动态的。因此,与 max_bytes 一起,confluent 可以支持从主题中检索的消息数量。
【解决方案2】:

可以使用属性ConsumerConfig.MAX_POLL_RECORDS_CONFIG 来配置您的KafkaConsumer。请看doc

【讨论】:

    【解决方案3】:

    如果您尝试从您的消费者组消费 100 条消息的新批次,您应该将 max_bytes 设置为一个值,对于您的数据模型,该值将始终返回大约 100 条记录。你可以有一个更保守的逻辑(得到更少,然后得到更多,直到截止到 100),或者你可以总是得到更多然后忽略。在这两种方式中,您都应该对您的消费者组采用手动偏移管理。

    GET /consumers/testgroup/instances/my_consumer/records?max_bytes=300000
    

    如果您收到超过 100 条消息并且由于某种原因忽略了它们,如果启用了偏移量自动提交(它是在您创建消费者时定义的),您将不会在该消费者组上再次收到它们。您可能不希望这种情况发生!

    如果您手动提交偏移量,那么如果您提交正确的偏移量以保证不会丢失任何消息,那么您可以忽略任何您想要的内容。您可以像这样手动提交偏移量:

    POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
    Host: proxy-instance.kafkaproxy.example.com
    Content-Type: application/vnd.kafka.v2+json
    
    {
      "offsets": [
        {
          "topic": "test",
          "partition": 0,
          "offset": <calculated offset ending where you stopped consuming for this partition>
        },
        {
          "topic": "test",
          "partition": 1,
          "offset": <calculated offset ending where you stopped consuming for this partition>
        }
      ]
    }
    

    如果您想准确获取主题的前 100 条记录,则需要在再次消费之前重置该主题和每个分区的消费者组偏移量。你可以这样做(taken from confluent):

    POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
    Host: proxy-instance.kafkaproxy.example.com
    Content-Type: application/vnd.kafka.v2+json
    
    {
      "offsets": [
        {
          "topic": "test",
          "partition": 0,
          "offset": 0
        },
        {
          "topic": "test",
          "partition": 1,
          "offset": 0
        }
      ]
    }
    

    【讨论】:

    • 通过使用max_bytes 控制返回消息的数量,您假设所有消息的大小完全相同。在实践中,这种情况很少见,所以这不太可能奏效
    • 是的,这就是为什么我强调“您应该将 max_bytes 设置为一个值,对于您的数据模型,将始终返回大约 100 条记录”,但我会进一步澄清。
    猜你喜欢
    • 1970-01-01
    • 2022-01-12
    • 2016-03-28
    • 2023-03-02
    • 2021-04-19
    • 2023-01-13
    • 1970-01-01
    • 2021-12-22
    • 1970-01-01
    相关资源
    最近更新 更多