【发布时间】:2017-01-24 00:25:40
【问题描述】:
我正在使用带有通配符的模式订阅 Kafka,如下所示。通配符表示动态客户 ID。
consumer.subscribe(pattern='customer.*.validations')
这很好用,因为我可以从主题字符串中提取客户 ID。但现在我需要扩展功能以听一个类似的主题,目的略有不同。我们称之为customer.*.additional-validations。代码需要存在于同一个项目中,因为有很多功能是共享的,但我需要能够根据队列的类型采用不同的路径。
在Kafka documentation 中,我可以看到可以订阅一系列主题。然而,这些是硬编码的字符串。不是允许灵活性的模式。
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
所以我想知道是否有可能以某种方式将两者结合起来?有点像这样(不工作):
consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
【问题讨论】:
标签: python apache-kafka kafka-python