【问题标题】:Can't get out of infinite loop of kafka consumer daemon in Python无法摆脱 Python 中 kafka 消费者守护进程的无限循环
【发布时间】:2015-12-03 08:01:40
【问题描述】:

我写了一个程序来消费 kafka 事件。它有一个守护进程,我想在 10 秒后终止它。

def kafkaConsumer():
consumer = KafkaConsumer(sys.argv[1],group_id='test-consumer-group',bootstrap_servers=sys.argv[2].split(','))
schema_path=sys.argv[3]
schema = avro.schema.parse(open(schema_path).read())

for msg in consumer:
    value = bytearray(msg.value)
    bytes_reader = io.BytesIO(value[9:])
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    try:
        event = reader.read(decoder)
    except:
        pass
    eventInJsonFormat=json.dumps(event)
    print(eventInJsonFormat)

if __name__ == '__main__':
run_thread = Thread(target=kafkaConsumer())
run_thread.daemon = True
run_thread.start()
time.sleep(10)

请忽略缩进。
但是这个程序并没有在 10 秒后终止。想知道我在这里缺少什么吗?

【问题讨论】:

标签: python python-multithreading kafka-python


【解决方案1】:

为您的消费者添加 10 秒超时:

consumer = KafkaConsumer(sys.argv[1],
consumer_timeout_ms=10000
group_id='test-consumer-group',
bootstrap_servers=sys.argv[2].split(','),
)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-03-06
    • 2013-06-20
    • 1970-01-01
    • 2016-09-19
    • 1970-01-01
    • 2017-06-19
    相关资源
    最近更新 更多