【问题标题】:How to modify/update to the data before sending it to downstream如何在将数据发送到下游之前修改/更新数据
【发布时间】:2022-01-18 09:49:33
【问题描述】:

我有一个包含格式数据的主题

{
 before: {...},
 after: {...},
 source: {...},
 op: 'u'
}

数据由 Debezium 生成。我想将数据发送到 SQL Server db 表,所以我选择了 JDBC Sink Connector。我需要在将数据发送到下游之前对其进行处理。

需要应用的逻辑:

  1. 如果 op = 'u' 或 op = 'c' 或 op = 'r' // 更新或插入或快照

    选择 'after' 中存在的所有字段并执行 upsert 到下游。

  2. 如果 op = 'd' // 删除

    选择 'before' 中存在的所有字段 + 添加字段 IsActive=false 并执行 upsert 到下游。

我怎样才能做到这一点?

【问题讨论】:

  • 您可以使用 Kafka Streams 或 KSQL 在像 JDBC 接收器这样的消费者读取它之前将您的记录“处理”成一个新主题
  • 这种方法存在一个问题。我有 10 个具有相同架构的不同主题,所以我必须创建 10 个不同的 Kafka Streams
  • 就像消费者一样,Kafka Streams 可以订阅多个主题

标签: jdbc apache-kafka apache-kafka-connect confluent-platform debezium


【解决方案1】:

如果您不是必须接收到 kafka 主题的复杂 debezium 消息,请查看Debezium's New Record State Extraction SMT。您需要在 Debezium 的连接器配置中对其进行配置,如果您将其与 delete.handling.mode:rewrite 一起使用,您将在消息中获得一个字段 __deleted,该字段将用于您在中指出的字段 IsActive你的问题。

您将收到到 kafka 的消息的简化格式将匹配 jbdc sink connector 期望的消息格式,尽管您可能只需要将一些 Single Message Transforms for Confluent Platform 应用于 jdbc sink connector 的配置以过滤一些字段,替换一些字段等。

作为附带的好处,您还将获得更少的数据到 kafka。

【讨论】:

  • 这是一个很好的解决方案,但根据我的要求,这里有一个限制。我想要主题中的完整数据,即之前和之后。检查下面对我有用的解决方案
【解决方案2】:

我能够在接收器 jdbc 连接器中使用自定义转换来实现这一点。 我提取了after 字段和op 字段并应用了逻辑。没有直接的方法来更新记录,即没有方法来设置模式和设置值。所以我使用反射来更新模式和值。

下面的代码sn-ps:

private final ExtractField<R> afterDelegate = new ExtractField.Value<R>();
    private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>();
    private final ExtractField<R> operationDelegate = new ExtractField.Value<R>(); 

public R apply(R record) {
        R operationRecord = operationDelegate.apply(record);
        String op = String.valueOf(operationRecord.value());
        Boolean isDeletedRecord = op.equalsIgnoreCase(Operation.DELETE.getValue())? true: false;
       ...
       finalRecord = afterDelegate.apply(record);
       if(isDeletedRecord){
            addDeletedFlag(finalRecord);
        }
} 
private void addDeletedFlag(R finalRecord){
        final SchemaBuilder builder = SchemaBuilder.struct();
        builder.name(finalRecord.valueSchema().name());
        for(Field f: finalRecord.valueSchema().fields()){
            builder.field(f.name(),f.schema());
        }
        builder.field(deleteFlagName,Schema.BOOLEAN_SCHEMA).optional();
        Schema newValueSchema = builder.build();
        try{
            java.lang.reflect.Field s = finalRecord.getClass().getSuperclass().getDeclaredField("valueSchema");
            s.setAccessible(true);
            s.set(finalRecord,newValueSchema);
        }catch (Exception e){
            e.printStackTrace();
        }

        Struct s = (Struct) finalRecord.value();
        updateValueSchema(s,finalRecord.valueSchema());
        updateValue(finalRecord.value(),true);
    }
private void updateValueSchema(Object o,Schema newSchema) {
        if(!(o instanceof Struct)){
            return;
        }
        Struct value = (Struct) o;
        try{
            java.lang.reflect.Field s = value.getClass().getDeclaredField("schema");
            s.setAccessible(true);
            s.set(value,newSchema);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    private void updateValue(Object o, Object newValue){
        if(!(o instanceof Struct)){
            return;
        }
        Struct value = (Struct) o;

        try{
            java.lang.reflect.Field s = value.getClass().getDeclaredField("values");
            s.setAccessible(true);
            Object[] newValueArray = ((Object[]) s.get(value)).clone();
            List<Object> newValueList = new ArrayList<>(Arrays.asList(newValueArray));
            newValueList.add(newValue);
            s.set(value, newValueList.toArray());
        }catch (Exception e){
            e.printStackTrace();
        }
    }

【讨论】:

    猜你喜欢
    • 2016-07-06
    • 1970-01-01
    • 1970-01-01
    • 2016-10-31
    • 1970-01-01
    • 2017-05-03
    • 2015-08-16
    • 2021-12-15
    • 2015-09-21
    相关资源
    最近更新 更多