【问题标题】:How reset a consumer group in kafka?如何在kafka中重置消费者组?
【发布时间】:2017-04-21 20:58:38
【问题描述】:

不确定标题是否正确...无论如何假设我有以下场景(运行测试时经常发生这种情况):

  • 消费者从具有单个分区和组 ID 测试的主题开始

  • 我不使用close()就杀死了消费者

  • 我重新开始相同的测试。

消费者开始输出一些日志:

2017-04-20 12:01:46 DEBUG NetworkClient:476 - Completed connection to node 1003
    2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
    2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 3 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
    2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
    2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106738, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@2bea5ab4, request=RequestSend(header={api_key=10,api_version=0,correlation_id=3,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106738, sendTimeMs=1492686106738), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
    2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
    2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 4 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
    2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
    2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106840, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@3d8314f0, request=RequestSend(header={api_key=10,api_version=0,correlation_id=5,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106839, sendTimeMs=1492686106839), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
    2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
    2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 5 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
    2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
    2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106941, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@2df32bf7, request=RequestSend(header={api_key=10,api_version=0,correlation_id=7,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106940, sendTimeMs=1492686106940), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
    2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
    2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 6 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
    2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
    2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107042, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@530612ba, request=RequestSend(header={api_key=10,api_version=0,correlation_id=9,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107041, sendTimeMs=1492686107041), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
    2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
    2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 7 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
    2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
    2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107144, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@2a40cd94, request=RequestSend(header={api_key=10,api_version=0,correlation_id=11,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107144, sendTimeMs=1492686107144), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
    2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003

我必须解决的唯一方法是:

  • 杀死 kafka 及其存储(因此有关主题的元数据消失了)
  • 更改消费者组 ID

有没有其他方法可以解决这种情况,不涉及以上内容?

【问题讨论】:

  • 您可以为您的消费者组中的所有主题分区提交 0 个偏移量
  • 这怎么可能......我在尝试加入集群时遇到了这种行为。我怎么能承诺?
  • zookeeper-shell.sh 脚本有办法做到这一点。也许,这可能会有所帮助:stackoverflow.com/questions/29791268/…
  • 哦,好吧,你以为你在谈论使用 kafka api

标签: apache-kafka kafka-consumer-api


【解决方案1】:

您正在寻找的是寻找主题的开头。

我认为在脏元数据状态下执行此操作的一种安全方法是:

consumer.seekToBeginning(Collections.emptySet()) // Rewind (lazy) all assigned partitions
consumer.poll(0) // Actually rewind by forcing a poll

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-01-06
    • 1970-01-01
    • 1970-01-01
    • 2020-09-19
    • 2017-07-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多