【发布时间】:2020-10-20 11:18:08
【问题描述】:
当使用相同的topic 和相同的group-id 时,我希望能够从 2 个不同的消费者那里消费一次数据(每个消费者将收到不同的消息)。
制作人:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for e in range(50):
data = {'number' : e}
print('Producer {}'.format(data))
producer.send('test', value=data)
sleep(2)
第一个消费者代码:
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
#auto_commit_interval_ms=100,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
print('[1] Consume {}'.format(message))
sleep(3)
第二个消费者代码:
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
#auto_commit_interval_ms=100,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
print('[2] Consume {}'.format(message))
sleep(5)
我希望看到一些消息被consumer-1 消费,而其他消息被consumer-2 消费(根据consumer 代码中的睡眠命令)
但似乎只有一个消费者在工作并获取所有消息。 (第一个消费者卡住了,第二个消费者得到消息)。
我错过了什么?
【问题讨论】:
标签: python apache-kafka