【问题标题】:Kafka Connect - how to send and handle nested JSON?Kafka Connect - 如何发送和处理嵌套的 JSON?
【发布时间】:2021-04-06 02:27:31
【问题描述】:

我有一条消息,它适用于我的 Redis Sink 连接器(连接器将价值赋予 Redis):

    {
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "registertime"
      },
      {
        "type": "string",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "regionid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "after"
      }
    ],
    "optional": false,
    "name": "ksql.users"
  },
  "payload": {
    "registertime": 1493819497170,
    "userid": "User_1",
    "regionid": "Region_5",
    "after": "MALE"
  }
}

但我希望将字段“之后”作为嵌套对象:

    "after": {
        "one": null,
        "two": "one"
    }

并在此基础上处理数据(即,如果“one”为空,则跳过)。

所以我有一个连接器:

{
    "name": "connector1",
    "config": {
        "topics": "topic1",
        "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
        "tasks.max": "1",
        "connect.redis.error.policy": "NOOP",
        "connect.redis.host": "localhost",
        "connect.redis.port": "6379",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "connect.redis.kcql": "INSERT INTO prod- SELECT * FROM topic1 PK after;"
    }
}

我想从消息中的嵌套 json 中提取数据。 我的命令引以为豪:

topc=topic1
message=message.json
echo "key:$(jq -rc . $message)" | $kafka_dir/bin/kafka-console-producer.sh --topic $topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

我如何发送一个嵌套的 json 对象,我如何通过 Transforms 从中提取一个字段,并且基于它的值处理与否?

【问题讨论】:

    标签: json apache-kafka redis apache-kafka-connect


    【解决方案1】:

    发送嵌套数据的工作方式与发送常规消息相同。

    您应该能够更新消息以包含类似此架构信息的内容,用于 after 字段

    "type": "struct", 
    "fields": [{"field": "one", "optional": false, "type":"string"},... ], 
    "optional": false, 
    "field": "after"
    

    然后相应地更新有效负载。

    我个人从未使用过 JSONConverter 架构/有效负载类型,因为 Avro 更适合这种情况


    据我所知,Kafka Connect 不能跳过消息;它会处理所有这些。也没有用于提取深度超过 1 的任意嵌套值的内置转换,因此获取 after 结构中的字段 可能是一个问题。但是,您可以通过修改此特定连接器的 SELECT * FROM topic1 KCQL 语句来获得它

    一般来说,如果您需要这样的逻辑,您会使用流处理器(例如 KSQL 或 Kafka Streams)在转储到数据库之前过滤/修改主题

    【讨论】:

    • 谢谢,但 AFAIK Kafka Connect 可以跳过消息 - 通过转换,例如 ExtractField($Key 或 $Value),然后是 Tombstone 或过滤器($Key 或 $Value)。我的问题是如何发送有效负载嵌套对象 - 而不是字符串(正如我试图展示的那样)。我想要获得的是不是在字符串上而是在对象{“one”:“two”,“three”:“four”}上使用Transforms。如何发送它,在 ExtractField$Value 之后拥有另一个对象,然后通过下一个转换(例如 Tombstone 或 Filter)处理对象,而不是字符串
    • 对于 kafka-console-producer 来说,生成嵌套数据与单行文本没有任何不同。如果您希望嵌套 after 字段,它需要是 Struct 模式类型,而不是 "type": "string",并且根据您的生产者控制的有效负载更新,而不是 Connect 或转换
    • 是的,我在问如何生成这样的有效负载,因为如果我在架构中将string 更改为object,连接器会显示错误,它不理解这种类型。所以我的问题是,这样的消息(有效负载)应该是什么样子
    • 你尝试过这样的事情吗? "type": "struct", "fields": [{"field": "one", "type":"string"},... ], "field": "after"?否则,我建议您在不需要架构和有效负载的情况下使用 Avro 而不是 JSON
    • 哦,我有!我使用的是连接器 com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector insted of com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector 所以这是问题的原因
    猜你喜欢
    • 2020-12-14
    • 2020-12-02
    • 2020-04-25
    • 2018-09-29
    • 1970-01-01
    • 2017-11-22
    • 2015-09-13
    • 2020-04-05
    • 2014-04-15
    相关资源
    最近更新 更多