【发布时间】:2022-01-18 09:49:33
【问题描述】:
我有一个包含格式数据的主题
{
before: {...},
after: {...},
source: {...},
op: 'u'
}
数据由 Debezium 生成。我想将数据发送到 SQL Server db 表,所以我选择了 JDBC Sink Connector。我需要在将数据发送到下游之前对其进行处理。
需要应用的逻辑:
-
如果 op = 'u' 或 op = 'c' 或 op = 'r' // 更新或插入或快照
选择 'after' 中存在的所有字段并执行 upsert 到下游。
-
如果 op = 'd' // 删除
选择 'before' 中存在的所有字段 + 添加字段 IsActive=false 并执行 upsert 到下游。
我怎样才能做到这一点?
【问题讨论】:
-
您可以使用 Kafka Streams 或 KSQL 在像 JDBC 接收器这样的消费者读取它之前将您的记录“处理”成一个新主题
-
这种方法存在一个问题。我有 10 个具有相同架构的不同主题,所以我必须创建 10 个不同的 Kafka Streams
-
就像消费者一样,Kafka Streams 可以订阅多个主题
标签: jdbc apache-kafka apache-kafka-connect confluent-platform debezium