【发布时间】:2020-11-08 15:48:14
【问题描述】:
当使用ConsumerRebalanceListener 启动 aiokafka 消费者时,我们对从 kafka 接收到的一批消息(getmany())有一个处理过程。我们也将此处理过程添加到on_partitions_revoked,以确保在重新平衡发生时完成这些处理。
当重新平衡发生,同时处理过程已经发生时,on_partitions_revoked 将再次调用处理过程并可以处理两次消息,为避免这种情况,我们在此过程的开头添加了一个锁。
实际上我不太确定在这种情况下我们是否真的需要锁,所以如果这里的人能就此提出建议,我将不胜感激。虽然我们在这种情况下使用 aiokafka,但我想这可能是 Kafka 的一个普遍问题。
class KafkaWrapper(ConsumerRebalanceListener):
def __init__(
self,
consumer_bootstrap_servers: List[str],
consumer_topic: str,
consumer_group_id: str,
):
self.records_lock = asyncio.Lock()
self.kafka_consumer = AIOKafkaConsumer(
bootstrap_servers=consumer_bootstrap_servers,
group_id=consumer_group_id,
)
self.kafka_consumer.subscribe(topics=[consumer_topic], listener=self)
async def process_records(self):
async with self.records_lock: # Is this lock really required??
# processing message
async def on_partitions_revoked(self, _revoked):
await self.process_records()
async def on_partitions_assigned(self, _assigned):
pass
async def _watch_kafka(self):
await self.kafka_consumer.start()
while not self.should_stop.is_set():
messages = await self.kafka_consumer.getmany()
if len(local_records_by_topic_partition) > 0:
async with self.records_lock:
self.messages = messages
await self.process_records()
【问题讨论】:
标签: python apache-kafka aio