【问题标题】:Set Table/Topic order in Apache Kafka JDBC在 Apache Kafka JDBC 中设置表/主题顺序
【发布时间】:2021-03-09 14:57:07
【问题描述】:

有 2 个主题,source_topic.asource_topic.bsource_topic.asource_topic.b 有依赖关系(例如,需要先接收 source_topic.b)。为了注意 sink 过程,需要先从 source_topic.b 接收数据,然后再从 source_topic.a 接收数据。有没有办法在源/接收器配置中设置主题/表的顺序?

以下是使用的配置,并且有多个表格和主题。 时间戳用于每次轮询时更新表的模式。并将 timestamp.initial 设置为特定的时间戳。

源配置

name=jdbc-mssql-prod-5
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:sqlserver:
connection.user=
connection.password=
topic.prefix= source_topic.
mode=timestamp
table.whitelist=A,B,C
timestamp.column.name=ModifiedDateTime

connection.backoff.ms=60000
connection.attempts=300

validate.non.null= false
# enter timestamp in milliseconds 
timestamp.initial= 1604977200000 

接收器配置

name=mysql-sink-prod-5
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics= sink_topic_a, sink_topic_b
connection.url=jdbc:mysql:
connection.user=
connection.password=

insert.mode=upsert
delete.enabled=true
pk.mode=record_key


errors.log.enable= true
errors.log.include.messages=true

【问题讨论】:

    标签: jdbc apache-kafka apache-kafka-connect


    【解决方案1】:

    不,JDBC Sink 连接器不支持这种逻辑。

    您正在将批处理思维应用于流世界 :) 考虑一下:Kafka 怎么知道它已经“完成”了下沉 topic_a?流是无限的,所以你最终不得不说“如果你在给定的时间窗口内没有收到更多消息,那么假设你已经完成了从这个主题接收数据并进入下一个一个”。

    您最好在 Kafka 本身内进行必要的数据连接(例如,使用 Kafka Streams 或 ksqlDB),然后将结果写回一个新的 Kafka 主题,然后将其接收到您的数据库中。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-24
      • 2018-02-08
      • 2023-03-31
      • 1970-01-01
      • 1970-01-01
      • 2016-07-26
      • 1970-01-01
      • 2020-10-17
      相关资源
      最近更新 更多