【发布时间】:2019-08-14 18:34:23
【问题描述】:
我在使用分布式 kafka 连接的 kerberized 服务器中工作。 连接器运行良好,只是转换部分完全被忽略了。 我在日志中没有关于此问题的警告或错误或任何信息。
我没有转换的连接器运行良好:
{
"name": "hdfs-avro-sink-X",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "Y",
"hdfs.url": "HA_name",
"topics.dir": "/data/path",
"logs.dir": "/tmp/path",
"flush.size": "8800",
"rotate.interval.ms": "6000",
"hive.integration": "true",
"hive.database": "hive_db_name",
"hive.metastore.uris": "thrift://url:9083",
"format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
"schema.compatibility": "BACKWARD",
"hive.conf.dir": "/usr/hdp/current/hive/target-conf/",
"hadoop.conf.dir": "/usr/hdp/current/hadoop/target-conf/",
"hdfs.authentication.kerberos": "true",
"connect.hdfs.principal": "principal",
"connect.hdfs.keytab": "/keytab/path",
"ssl.truststore.location": "truststore.jks",
"ssl.truststore.password": "password",
"security.protocol": "SASL_SSL",
"ssl.keystore.location": "keystore.jks",
"ssl.keystore.password": "password",
"ssl.key.password": "password"
}
}
使用转换,转换被忽略:
{
"name": "hdfs-avro-sink-X",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
... same conf
"ssl.key.password": "password",
"transforms": "MaskField",
"transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskField.fields": "ano_ass"
}
}
我可以输入任何东西,无论如何它都会被忽略:
{
"name": "hdfs-avro-sink-X",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
... same conf
"ssl.key.password": "password",
"transforms": "Masgfdgfdgfdhield",
"transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskField.fields": "anfshdghgh_ass"
}
}
ano_ass 字段存在于我的 avro 架构中 游侠审核没问题
版本:
- HDP:2.6.4
- 卡夫卡:0.10.1
- 架构注册表 hortonworks
【问题讨论】:
-
我在 kafka 版本 2.8.1 (Commit:839b886f9b732b15) 和 kafka docker image wurstmeister/kafka:2.13-2.8.1 上遇到同样的问题
标签: apache-kafka apache-kafka-connect