【发布时间】:2021-04-06 02:27:31
【问题描述】:
我有一条消息,它适用于我的 Redis Sink 连接器(连接器将价值赋予 Redis):
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "registertime"
},
{
"type": "string",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "regionid"
},
{
"type": "string",
"optional": false,
"field": "after"
}
],
"optional": false,
"name": "ksql.users"
},
"payload": {
"registertime": 1493819497170,
"userid": "User_1",
"regionid": "Region_5",
"after": "MALE"
}
}
但我希望将字段“之后”作为嵌套对象:
"after": {
"one": null,
"two": "one"
}
并在此基础上处理数据(即,如果“one”为空,则跳过)。
所以我有一个连接器:
{
"name": "connector1",
"config": {
"topics": "topic1",
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"tasks.max": "1",
"connect.redis.error.policy": "NOOP",
"connect.redis.host": "localhost",
"connect.redis.port": "6379",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.redis.kcql": "INSERT INTO prod- SELECT * FROM topic1 PK after;"
}
}
我想从消息中的嵌套 json 中提取数据。 我的命令引以为豪:
topc=topic1
message=message.json
echo "key:$(jq -rc . $message)" | $kafka_dir/bin/kafka-console-producer.sh --topic $topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"
我如何发送一个嵌套的 json 对象,我如何通过 Transforms 从中提取一个字段,并且基于它的值处理与否?
【问题讨论】:
标签: json apache-kafka redis apache-kafka-connect