【问题标题】:Avoid overwriting fields when for null values in Confluent Elasticsearch Connector在 Confluent Elasticsearch 连接器中为空值时避免覆盖字段
【发布时间】:2021-11-26 16:56:57
【问题描述】:

我有一个扩充管道,可以更新动态数量的字段,写入 Kafka,然后发送到 Elasticsearch。我们正在使用Confluent Elasticsearch Connector

例如,如果发送到 ES 连接器的第一条记录是这样的:

{id: 1, name: "Bob", age: null}

丰富的记录是这样的:

{id: 1, name: null, age: 34}

我希望 Elasticsearch 中的结果记录为:

{id: 1, name: "Bob", age: 34}

丰富的记录必须有一个空值(即在我们上面的示例中name: null)而不是根本不设置键的原因是它来自 Avro 数据,我们的架构列出了几个字段作为可选.由于扩充管道正在更新动态数量的字段,这似乎是最直接的解决方案(即,可能会更新一条记录中的 name 字段,但另一条记录中的 age 字段)。由于可选的 Avro 字段默认为 null,这就是我们的空值的来源。

我尝试了write.method=upsert 设置as shown in this post,但这似乎仍会覆盖所有以null 作为丰富记录值的字段。 IE。根据上面的示例,ES 中的结果记录看起来像 {id: 1, name: null, age: 34}。上面链接的帖子似乎通过为单个记录类型设置多个 Avro 模式解决了这个问题,这对我们不起作用,因为它增加了太多的复杂性。

我注意到 ES 连接器也有 behavior.on.null.values 的设置,但我的理解是,这是针对整个记录为 null 而不是单个字段的情况。

Confluent ES Sink 连接器中是否有类似nullToUnset in the Datastax C* Connector 的设置?

如果没有,有没有好的方法来实现这个?

【问题讨论】:

  • 既然为null,那么如果从记录中排除name字段会怎样呢? Avro 不关心该字段是否真的存在,但连接器呢?
  • @OneCricketeer 我可能对 Avro 的工作原理感到困惑,但我的理解是,如果我排除 name 字段,它将作为默认值发送,即 null。因此,我不知道是否可以在不发送 null 的情况下实际排除 name 字段。你知道解决这个问题的方法吗?
  • 连接转换器应该只将作为记录一部分的可用字段转发到接收器。我还没有调试足够多的 Avro 转换器来知道它是否首先完全反序列化记录以应用任何默认值
  • @OneCricketeer 我尝试过的一切似乎都表明它正在应用默认值。如果有办法解决这个问题那就太好了

标签: elasticsearch apache-kafka apache-kafka-connect


【解决方案1】:

相关的代码行在这里: https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java#L170

这基本上意味着源文档按原样发送到索引 - 没有修改。

您最好的选择可能是添加一个读取源文档并删除任何具有空值的字段的 SMT。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2015-03-01
  • 1970-01-01
  • 1970-01-01
  • 2013-06-02
  • 1970-01-01
  • 1970-01-01
  • 2017-03-27
  • 2020-07-28
相关资源
最近更新 更多