【问题标题】:Python, can't pass variable to KafkaConsumerPython,无法将变量传递给KafkaConsumer
【发布时间】:2019-12-17 01:44:40
【问题描述】:

我正在开发连接到 kafka 并使用一些数据的 python 应用程序。

def main(argv):
       params = parse_arg(argv)
       logging.info("Connecting to topic\t" + params.tasks_topic)
       consumer = KafkaConsumer(params.tasks_topic,
                             group_id='kafkatester',
                             bootstrap_servers=params.kafka.split(','),
                             auto_offset_reset='latest',
                             enable_auto_commit=False,
                             max_poll_records=1,
                             max_poll_interval_ms=18000)
def parse_arg(argv):
    parser = argparse.ArgumentParser()
    parser.add_argument('-k', '--kafka')
    parser.add_argument('-t', '--tasks-topic')
    args = parser.parse_args()
    return AppParams(args.kafka, args.tasks_topic)

本地一切正常。但是,当我在 docker 中运行它时,我得到了意想不到的结果:

08/09/2019 09:54:57 AM Connecting to topic      taskstest
08/09/2019 09:54:57 AM <BrokerConnection node_id=bootstrap-0 host=MySecretIP:9092 <connecting> [IPv4 ('MySecretIP', 9092)]>: connecting to MySecretIP:9092 [('MySecretIP', 9092) IPv4]
08/09/2019 09:54:57 AM Probing node bootstrap-0 broker version
08/09/2019 09:54:57 AM <BrokerConnection node_id=bootstrap-0 host=MySecretIP:9092 <connecting> [IPv4 ('MySecretIP', 9092)]>: Connection complete.
08/09/2019 09:54:57 AM Broker version identifed as 1.0.0
08/09/2019 09:54:57 AM Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/app/src/main.py", line 40, in <module>
    main(sys.argv[1:])
  File "/app/src/main.py", line 26, in main
    max_poll_interval_ms=18000)
  File "/usr/local/lib/python3.7/site-packages/kafka/consumer/group.py", line 390, in __init__
    self._subscription.subscribe(topics=topics)
  File "/usr/local/lib/python3.7/site-packages/kafka/consumer/subscription_state.py", line 120, in subscribe
    self.change_subscription(topics)
  File "/usr/local/lib/python3.7/site-packages/kafka/consumer/subscription_state.py", line 169, in change_subscription
    self._ensure_valid_topic_name(t)
  File "/usr/local/lib/python3.7/site-packages/kafka/consumer/subscription_state.py", line 142, in _ensure_valid_topic_name
    raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic))
" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"

看起来KafkaConsumer 无法处理params.tasks_topic 变量。为什么?

kafka-python版本是1.4.6,python是3.7.3

【问题讨论】:

  • 主题的名称是什么(即分配给params.tasks_topic的值是什么)?
  • 主题名称为taskstest
  • 如果你硬编码主题名称而不是params.tasks_topic,它是否有效?
  • 是的,确实如此。 ://
  • 看起来你以某种方式最终传递了带有一堆空格的主题名称,例如“____taskstest”。可以试试trim(params.tasks_topic)吗?

标签: python apache-kafka kafka-consumer-api kafka-python


【解决方案1】:

你需要修剪主题 看起来您发送的主题名称带有空格或空格 所以你需要首先验证主题是str。修剪过的str 所以我的建议是这样的:

def main(argv):
   params = parse_arg(argv)
   __topic  = params.tasks_topic
   print('before trim : ' + __topic)
   print('after trim : ' + __topic.strip())
   logging.info("Connecting to topic\t" + __topic.strip() )
   consumer = KafkaConsumer( __topic.strip() ,
                         group_id='kafkatester',
                         bootstrap_servers=params.kafka.split(','),
                         auto_offset_reset='latest',
                         enable_auto_commit=False,
                         max_poll_records=1,
                         max_poll_interval_ms=18000)
def parse_arg(argv):
    parser = argparse.ArgumentParser()
    parser.add_argument('-k', '--kafka', default='lola')
    parser.add_argument('-t', '--tasks-topic',default='pola')
    args = parser.parse_args()
    return AppParams(args.kafka, args.tasks_topic

我已经测试过它就像一个魅力;) 如果需要任何帮助,请联系我 ;)

【讨论】:

  • @mazaneicha 已经在上面的评论中给出了答案。因此,在实现中,在parse_arg() 中使用了strip() 函数,如果我不这样做,我将不得不在使用tasks_topic 时删除每个地方的空格......
猜你喜欢
  • 1970-01-01
  • 2016-10-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多