【发布时间】:2019-03-08 20:43:21
【问题描述】:
我有一个如下所示的 kafka es sink 属性文件
name=elasticsearch.sink.direct
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=16
topics=data.my_setting
connection.url=http://dev-elastic-search01:9200
type.name=logs
topic.index.map=data.my_setting:direct_my_setting_index
batch.size=2048
max.buffered.records=32768
flush.timeout.ms=60000
max.retries=10
retry.backoff.ms=1000
schema.ignore=true
transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=MY_SETTING_ID
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=MY_SETTING_ID
这非常适用于单个主题 (data.my_setting)。我想对来自多个主题的数据使用相同的连接器。不同主题中的消息将具有不同的键,我需要对其进行转换。我想知道是否有一种方法可以使用带有主题名称或消息中单个字段条件的 if else 语句,以便我可以然后以不同的方式转换密钥。所有传入的消息都是带有模式和有效负载的 json。
根据答案更新:
在我的 jdbc 连接器中,我按如下方式添加密钥:
name=data.my_setting
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
poll.interval.ms=500
tasks.max=4
mode=timestamp
query=SELECT * FROM MY_TABLE with (nolock)
timestamp.column.name=LAST_MOD_DATE
topic.prefix=investment.ed.data.app_setting
transforms=ValueToKey
transforms.ValueToKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ValueToKey.fields=MY_SETTING_ID
但是,当 elasticsearch sink 读取从此连接器生成的消息时,我仍然会收到错误
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
Caused by: org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id
有效载荷如下所示:
{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "MY_SETTING_ID"
}, {
"type": "string",
"optional": true,
"field": "MY_SETTING_NAME"
}
],
"optional": false
},
"payload": {
"MY_SETTING_ID": 9,
"MY_SETTING_NAME": "setting_name"
}
}
连接独立属性文件如下所示:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/apps/{env}/logs/infrastructure/offsets/connect.offsets
rest.port=8084
plugin.path=/usr/share/java
有没有办法实现我的目标,即让来自多个主题的消息(在我的例子中是 db 表)具有自己的唯一 ID(也将是 ES 中文档的 ID)发送到单个 ES 接收器。
我可以使用 avro 来完成这项任务吗?有没有办法在模式注册表中定义键或者我会遇到同样的问题?
【问题讨论】:
-
下面关于“带走流处理器”的评论 - 观看confluent.io/kafka-summit-nyc17/…
-
谢谢!非常感谢
标签: elasticsearch apache-kafka-connect confluent-platform