【问题标题】:How to consumer messages with 2 consumers with same group-id?如何使用具有相同组 ID 的 2 个消费者消费消息?
【发布时间】:2020-10-20 11:18:08
【问题描述】:

当使用相同的topic 和相同的group-id 时,我希望能够从 2 个不同的消费者那里消费一次数据(每个消费者将收到不同的消息)。

制作人:

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:
                             dumps(x).encode('utf-8'))
    for e in range(50):
        data = {'number' : e}
        print('Producer {}'.format(data))
        producer.send('test', value=data)
        sleep(2)

第一个消费者代码:

consumer = KafkaConsumer(
        'test',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        #auto_commit_interval_ms=100,
        group_id='my-group',
        value_deserializer=lambda x: loads(x.decode('utf-8')))

for message in consumer:
    message = message.value
    print('[1] Consume {}'.format(message))
    sleep(3)

第二个消费者代码:

consumer = KafkaConsumer(
        'test',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        #auto_commit_interval_ms=100,
        group_id='my-group',
        value_deserializer=lambda x: loads(x.decode('utf-8')))

 for message in consumer:
        message = message.value
        print('[2] Consume {}'.format(message))
        sleep(5)

我希望看到一些消息被consumer-1 消费,而其他消息被consumer-2 消费(根据consumer 代码中的睡眠命令)

但似乎只有一个消费者在工作并获取所有消息。 (第一个消费者卡住了,第二个消费者得到消息)。

我错过了什么?

【问题讨论】:

    标签: python apache-kafka


    【解决方案1】:

    根据问题的描述,我假设您的主题只有一个分区。

    如果您想在一个组中运行多个消费者,您的主题需要多个分区

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-04
      • 1970-01-01
      • 1970-01-01
      • 2021-05-14
      • 2020-04-19
      • 1970-01-01
      • 2023-03-24
      • 1970-01-01
      相关资源
      最近更新 更多