【问题标题】:Error while using dataflow Kafka to bigquery template使用数据流 Kafka 到 bigquery 模板时出错
【发布时间】:2021-03-18 12:27:44
【问题描述】:

我正在使用数据流 kafka 到 bigquery 模板。启动数据流作业后,它会在队列中停留一段时间,然后失败并出现以下错误:

Error occurred in the launcher container: Template launch failed. See console logs.

查看日志时,我看到以下堆栈跟踪:

at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:192) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) 
at com.google.cloud.teleport.v2.templates.KafkaToBigQuery.run(KafkaToBigQuery.java:343) 
at com.google.cloud.teleport.v2.templates.KafkaToBigQuery.main(KafkaToBigQuery.java:222) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata –

在启动工作时,我提供了以下参数:

  1. kafka 主题名称
  2. 引导服务器名称
  3. bigquery 主题名称
  4. SA 电子邮件
  5. 区域。

我的 kafka 主题仅包含消息:你好

kafka 安装在 gcp 实例中,该实例与 dataflow worker 位于同一区域和子网中。

【问题讨论】:

  • 您是否尝试过更改日志设置以在 INFO 级别显示日志?模板启动日志显示为 INFO(甚至是错误)。让我知道这是否有帮助。
  • 感谢@Pablo,这是我在 INFO 部分看到的 java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
  • 啊,我明白了——你看到这个信息日志周围的堆栈跟踪了吗?通常堆栈跟踪将显示为堆栈中每一行的一个日志条目
  • @Pablo,在 org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:192) 在 org.apache.beam.sdk.Pipeline.run(Pipeline.java:317 ) 在 com.google.cloud.teleport.v2.templates.KafkaToBigQuery.run(KafkaToBigQuery.java:343) 在 com.google.cloud 的 org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)。 teleport.v2.templates.KafkaToBigQuery.main(KafkaToBigQuery.java:222) 原因:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
  • @Pablo,只有一个查询.. 我没有为 UDF 指定路径。一定要提供UDF函数路径吗?基本上我没有进行任何转换

标签: google-cloud-platform apache-kafka google-cloud-dataflow dataflow


【解决方案1】:

在这里添加这个作为后代的答案:

“获取主题元数据时超时”表示Kafka客户端无法连接到代理以获取元数据。这可能是由于各种原因造成的,例如工作虚拟机无法与代理通信(您是通过公共 ip 还是私有 ip?如果使用公共 ip,请检查传入的防火墙设置)。也可能是由于端口不正确或代理需要 SSL 连接。一种确认方法是将 Kafka 客户端安装在与 Dataflow 工作线程位于同一子网的 GCE VM 上,然后验证 kafka 客户端是否可以连接到 Kafka 代理。

请参阅 [1] 为 Kafka 客户端配置 ssl 设置(您可以在 GCE 实例上使用 cli 进行测试)。管理代理的团队可以告诉您他们是否需要 SSL 连接。

[1]https://docs.confluent.io/platform/current/kafka/authentication_ssl.html#clients

【讨论】:

    【解决方案2】:

    嘿,谢谢大家的帮助,我正在尝试使用内部 ip 访问 kafka。当我将它添加到公共 IP 时它起作用了。实际上我在同一个子网中同时运行卡夫卡机器和工人。所以它也应该与内部 ip 一起使用......我现在正在检查它

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-05-23
      • 2020-12-02
      • 2018-10-24
      • 2018-07-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多