【问题标题】:extract and transform kafka message specific fields for jdbc sink connector提取和转换 jdbc sink 连接器的 kafka 消息特定字段
【发布时间】:2020-07-31 21:50:59
【问题描述】:

我有一个 kafka 主题,它使用 Debezium mysql 源连接器从 mysql 数据库中获取数据,以下是其中一条消息的格式:

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "annek@noanswer.org"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "annek@noanswer.org"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

我想使用 jdbc sink 连接器将 ts_ms, before, afterid(从对象/行)列推送到另一个数据库,表架构为 (id,before(text),after(text),timestamp),对于 kafka 的新手无法弄清楚:

  • 如何仅从消息中提取这些字段以推送并忽略其他字段?

  • 如何将字段前后转换为字符串/序列化格式?

  • 如何从对象中提取id? (插入操作,before为null,delete,after为null)

对于上面的消息,sink目标表最后应该有如下数据:

id:     1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815

【问题讨论】:

    标签: mysql apache-kafka transformation apache-kafka-connect debezium


    【解决方案1】:

    您可以使用Kafka Connect Transformations 的链,例如solution

    【讨论】:

    • 感谢@Iskuskov Alexander,您能否设计一些具体的东西来满足这两个需求,转换AND Json 序列化?
    • 你需要beforeafter作为字符串吗?
    • @@Iskuskov Alexander 是的
    • 我认为您可以执行以下步骤: 1. 使用 JDBC Sink Connector 的 record_key 属性从消息键中使用 id; 2、afterbefore字段使用Cast SMT; 3、使用Replace SMT重命名ts_ms; 4. 使用 Replace SMT 过滤字段
    【解决方案2】:

    您可以创建一个 DTO(您从 kafka 主题中获得的 json 有效负载的 Java 对象)利用此在线转换器帮助您将 json 转换为 Java 对象。 [http://pojo.sodhanalibrary.com/][1]

    一旦您收到来自您的 kafka 主题的消息,您就可以使用 objectmapper 转换该 json 并将其映射到您适当的 DTO 对象。一旦您准备好对象。您可以使用该对象通过调用 getId()、getBefore() 等来提取所需的字段,

    这里有一些参考代码可以帮助你理解:

        @KafkaListener(topics = "test")
            public void listen(String payload)  {
    
                logger.info("Message Received from Kafka topic: {}", payload);
    
                ObjectMapper objectMapper = new ObjectMapper();
                objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    
                DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);
    
                    logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));
    
                    logger.info("Get Before:{}", dtoObject.getId());
    
    
    
            }
    

    【讨论】:

    • 这是和 SMT 还是什么,正如我提到的,我正在为目标数据库使用 jdbc 接收器连接器,我怎样才能使用你提到的这段代码 sn-p 以及在哪里?谢谢
    猜你喜欢
    • 2018-10-21
    • 2021-12-19
    • 2022-07-29
    • 2019-06-15
    • 2019-06-05
    • 1970-01-01
    • 2020-02-08
    • 2019-03-08
    • 2020-07-31
    相关资源
    最近更新 更多