【问题标题】:Where can transaction.timeout.ms be set in Flink SQLFlink SQL 中的 transaction.timeout.ms 可以设置在哪里
【发布时间】:2022-07-01 21:21:15
【问题描述】:

我使用 Flink 使用 FlinkKafkaConsumer 从 Kafka 读取数据,然后将数据流转换为表,最后使用 FlinkSQL 将数据接收回 kafka(kafka-connector 表)。为了获得一次性交付保证,我设置了带有属性的 kafka 表:sink.semantic=exactly-once。 进行测试时,出现错误“事务超时大于代理允许的最大值”。 Flink 默认 Kafka 生产者最大事务超时:1h kafka 默认设置是 transaction.max.timeout.ms=900000。

所以,我需要在 kafka 生产者中添加“transaction.timeout.ms”属性。我的问题是我可以在哪里使用 FlinkSQL 添加这个属性。

我的代码:

tableEnv.executeSql("INSERT INTO sink_kafka_table select * from source_table")

我知道使用 table api

tableEnv.connect(new Kafka()
          .version("") 
          .topic("")
          .property("bootstrap.server","")
          .property("transaction.timeout.ms","120000"))
          .withSchema()
          .withFormat()
          .createTemporaryTable("sink_table")
 table.executeInsert("sink_table")

不建议修改 kafka 配置文件。 任何建议都会有所帮助,谢谢。

【问题讨论】:

    标签: apache-kafka-connect flink-sql


    【解决方案1】:

    使用连接器声明https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/common/#connector-tables,您可以使用.option 方法设置properties.* option,该option 将被转发到带有properties. 剥离的kafka 客户端。所以你需要设置properties.transaction.max.timeout.ms

    您还可以使用 SQL DDL 语句创建 sink_table,并使用 properties.* 选项传递任何配置:https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#properties

    我不熟悉您是如何创建表的,但我认为它在 1.14 中已被弃用和删除:https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/api/TableEnvironment.html#connect-org.apache.flink.table.descriptors.ConnectorDescriptor- cmets 建议创建表的方法执行 SQL DDL 语句。

    【讨论】:

      猜你喜欢
      • 2014-08-08
      • 2016-12-20
      • 1970-01-01
      • 1970-01-01
      • 2013-07-07
      • 1970-01-01
      • 2023-03-26
      • 2018-07-19
      • 2013-09-08
      相关资源
      最近更新 更多