【发布时间】:2020-08-24 17:58:23
【问题描述】:
我有一个 Kafka 生产者和消费者。生产者不是我能控制的,这里我试图定义消费者的行为。
虽然生产者产生了足够多的数据来消费,但我希望消费者每秒轮询一次消息以使用,而不是像下面这样连续轮询它:
consumer.subscribe("topic")
while True:
message = consumer.poll(1.0) #the parameter here is timeout I believe
if message not None:
logger.info(message)
#or some logic here
我怎样才能让消费者每秒读取/轮询数据,而不是跳过/不一定使用其间的消息?
例如生产者的消息在 0.1 秒、0.5 秒、0.9 秒、1.3 秒等时进入,我只想读取 0.1 秒和 0.9 秒,然后是 1.7 秒等,而不是轮询每一条可用的消息。
谢谢!
【问题讨论】:
-
你能解释一下“跳过中间的消息”是什么意思吗?
-
另外,您正在写“每秒轮询消息”。这是否意味着您在该主题中只有一条消息?
-
@mike 消费者轮询的主题中有无数条消息,现在消费者在一秒钟内读取的消息不止一条。不过,我希望消费者每秒轮询一条消息。我觉得我可以用 sleep(1sec) 而不是 while True 来实现这一点;但是,我不确定是否有更好的方法,因为这是我第一次使用 Kafka。
-
仍然没有得到它。您究竟想消费哪些消息:假设生产者在 0 秒、0.1、0.7、1.2 秒的时间生产消息。您是想在 0 秒和 1.2 秒的时间读取消息,还是希望读取所有 4 条消息但每秒只读取一条?
-
哦,我明白你在问什么@mike。我的意思是前者,而不是阅读所有消息。
标签: apache-kafka