【发布时间】:2019-07-11 13:04:56
【问题描述】:
在 confluent kafka rest proxy 中,我们可以获得特定消费者组的最后提交偏移量,但是我们如何获得主题的最新偏移量来计算延迟。
【问题讨论】:
在 confluent kafka rest proxy 中,我们可以获得特定消费者组的最后提交偏移量,但是我们如何获得主题的最新偏移量来计算延迟。
【问题讨论】:
您可以使用 Kafka REST 代理获取最新提交的偏移量特定分区。根据Confluent Docs,
GET /consumers/(string: group_name)/instances/(string: instance)/offsets获取给定分区的最后提交的偏移量(无论是 提交由这个或另一个过程发生)。
请注意,此请求必须针对特定的 REST 代理 持有消费者实例的实例。
参数:
group_name (string) -- 消费组的名称
instance(string) -- 消费者实例请求JSON的ID
对象数组:
- partitions -- 查找最后提交的偏移量的分区列表
- partitions[i].topic (string) -- 主题名称
- partitions[i].partition (int) -- 分区 ID
响应 JSON 对象数组:
- offsets -- 已提交的偏移量列表
- offsets[i].topic (string) -- 提交偏移量的主题名称
- offsets[i].partition (int) -- 已提交偏移量的分区 ID
- offsets[i].offset (int) -- 提交的偏移量
- offsets[i].metadata (string) -- 提交偏移的元数据
状态码:
- 404 未找到 --
- 错误代码 40402 -- 未找到分区
- 错误代码 40403 -- 未找到使用者实例
示例请求:
GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json
{
"partitions": [
{
"topic": "test",
"partition": 0
},
{
"topic": "test",
"partition": 1
}
]
}
示例响应:
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json
{"offsets":
[
{
"topic": "test",
"partition": 0,
"offset": 21,
"metadata":""
},
{
"topic": "test",
"partition": 1,
"offset": 31,
"metadata":""
}
]
}
【讨论】: