【问题标题】:aiokafka: is lock required for processing message when considering consumer group rebalance for kafkaaiokafka:在考虑 kafka 的消费者组重新平衡时,处理消息需要锁定
【发布时间】:2020-11-08 15:48:14
【问题描述】:

当使用ConsumerRebalanceListener 启动 aiokafka 消费者时,我们对从 kafka 接收到的一批消息(getmany())有一个处理过程。我们也将此处理过程添加到on_partitions_revoked,以确保在重新平衡发生时完成这些处理。

当重新平衡发生,同时处理过程已经发生时,on_partitions_revoked 将再次调用处理过程并可以处理两次消息,为避免这种情况,我们在此过程的开头添加了一个锁。

实际上我不太确定在这种情况下我们是否真的需要锁,所以如果这里的人能就此提出建议,我将不胜感激。虽然我们在这种情况下使用 aiokafka,但我想这可能是 Kafka 的一个普遍问题。

class KafkaWrapper(ConsumerRebalanceListener):
    def __init__(
        self,
        consumer_bootstrap_servers: List[str],
        consumer_topic: str,
        consumer_group_id: str,
    ):
        self.records_lock = asyncio.Lock()  
        
        self.kafka_consumer = AIOKafkaConsumer(
            bootstrap_servers=consumer_bootstrap_servers,
            group_id=consumer_group_id,            
        )
        self.kafka_consumer.subscribe(topics=[consumer_topic], listener=self) 

    async def process_records(self):
        async with self.records_lock:  # Is this lock really required??
            # processing message

    async def on_partitions_revoked(self, _revoked):
        await self.process_records()

    async def on_partitions_assigned(self, _assigned):
        pass

    async def _watch_kafka(self):
        await self.kafka_consumer.start()

        while not self.should_stop.is_set():
            messages = await self.kafka_consumer.getmany()
            
            if len(local_records_by_topic_partition) > 0:
                async with self.records_lock:
                    self.messages = messages
                await self.process_records()

【问题讨论】:

    标签: python apache-kafka aio


    【解决方案1】:

    当然你应该使用asyncio.lock

    如果您不使用lock,撤销可能会发生在流程的中间。 这可能会对您的实施产生意想不到的影响。 更糟糕的是,如果进程中有commit方法,则可能出现以下场景:

    进程启动->撤销启动->提交->错误!

    发生错误是因为消费者失去了对分区的权限。

    因此,通过使用lock,应该在进程完全完成后进行撤销。

    【讨论】:

      猜你喜欢
      • 2020-02-13
      • 2017-06-19
      • 2017-04-20
      • 2020-08-04
      • 1970-01-01
      • 2020-06-30
      • 1970-01-01
      • 2018-02-21
      • 2018-05-23
      相关资源
      最近更新 更多