【发布时间】:2019-12-17 01:44:40
【问题描述】:
我正在开发连接到 kafka 并使用一些数据的 python 应用程序。
def main(argv):
params = parse_arg(argv)
logging.info("Connecting to topic\t" + params.tasks_topic)
consumer = KafkaConsumer(params.tasks_topic,
group_id='kafkatester',
bootstrap_servers=params.kafka.split(','),
auto_offset_reset='latest',
enable_auto_commit=False,
max_poll_records=1,
max_poll_interval_ms=18000)
def parse_arg(argv):
parser = argparse.ArgumentParser()
parser.add_argument('-k', '--kafka')
parser.add_argument('-t', '--tasks-topic')
args = parser.parse_args()
return AppParams(args.kafka, args.tasks_topic)
本地一切正常。但是,当我在 docker 中运行它时,我得到了意想不到的结果:
08/09/2019 09:54:57 AM Connecting to topic taskstest
08/09/2019 09:54:57 AM <BrokerConnection node_id=bootstrap-0 host=MySecretIP:9092 <connecting> [IPv4 ('MySecretIP', 9092)]>: connecting to MySecretIP:9092 [('MySecretIP', 9092) IPv4]
08/09/2019 09:54:57 AM Probing node bootstrap-0 broker version
08/09/2019 09:54:57 AM <BrokerConnection node_id=bootstrap-0 host=MySecretIP:9092 <connecting> [IPv4 ('MySecretIP', 9092)]>: Connection complete.
08/09/2019 09:54:57 AM Broker version identifed as 1.0.0
08/09/2019 09:54:57 AM Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
Traceback (most recent call last):
File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/app/src/main.py", line 40, in <module>
main(sys.argv[1:])
File "/app/src/main.py", line 26, in main
max_poll_interval_ms=18000)
File "/usr/local/lib/python3.7/site-packages/kafka/consumer/group.py", line 390, in __init__
self._subscription.subscribe(topics=topics)
File "/usr/local/lib/python3.7/site-packages/kafka/consumer/subscription_state.py", line 120, in subscribe
self.change_subscription(topics)
File "/usr/local/lib/python3.7/site-packages/kafka/consumer/subscription_state.py", line 169, in change_subscription
self._ensure_valid_topic_name(t)
File "/usr/local/lib/python3.7/site-packages/kafka/consumer/subscription_state.py", line 142, in _ensure_valid_topic_name
raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic))
" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"
看起来KafkaConsumer 无法处理params.tasks_topic 变量。为什么?
kafka-python版本是1.4.6,python是3.7.3。
【问题讨论】:
-
主题的名称是什么(即分配给
params.tasks_topic的值是什么)? -
主题名称为
taskstest -
如果你硬编码主题名称而不是
params.tasks_topic,它是否有效? -
是的,确实如此。 ://
-
看起来你以某种方式最终传递了带有一堆空格的主题名称,例如“____taskstest”。可以试试
trim(params.tasks_topic)吗?
标签: python apache-kafka kafka-consumer-api kafka-python