【发布时间】:2022-07-01 21:21:15
【问题描述】:
我使用 Flink 使用 FlinkKafkaConsumer 从 Kafka 读取数据,然后将数据流转换为表,最后使用 FlinkSQL 将数据接收回 kafka(kafka-connector 表)。为了获得一次性交付保证,我设置了带有属性的 kafka 表:sink.semantic=exactly-once。 进行测试时,出现错误“事务超时大于代理允许的最大值”。 Flink 默认 Kafka 生产者最大事务超时:1h kafka 默认设置是 transaction.max.timeout.ms=900000。
所以,我需要在 kafka 生产者中添加“transaction.timeout.ms”属性。我的问题是我可以在哪里使用 FlinkSQL 添加这个属性。
我的代码:
tableEnv.executeSql("INSERT INTO sink_kafka_table select * from source_table")
我知道使用 table api
tableEnv.connect(new Kafka()
.version("")
.topic("")
.property("bootstrap.server","")
.property("transaction.timeout.ms","120000"))
.withSchema()
.withFormat()
.createTemporaryTable("sink_table")
table.executeInsert("sink_table")
不建议修改 kafka 配置文件。 任何建议都会有所帮助,谢谢。
【问题讨论】:
标签: apache-kafka-connect flink-sql