【问题标题】:Kafka connect transformation isn't applied未应用 Kafka 连接转换
【发布时间】:2019-08-14 18:34:23
【问题描述】:

我在使用分布式 kafka 连接的 kerberized 服务器中工作。 连接器运行良好,只是转换部分完全被忽略了。 我在日志中没有关于此问题的警告或错误或任何信息。

我没有转换的连接器运行良好:

{
    "name": "hdfs-avro-sink-X",
    "config": {
                "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
                "tasks.max": "1",
                "topics": "Y",
                "hdfs.url": "HA_name",
                "topics.dir": "/data/path",
                "logs.dir": "/tmp/path",
                "flush.size": "8800",
                "rotate.interval.ms": "6000",
                "hive.integration": "true",
                "hive.database": "hive_db_name",
                "hive.metastore.uris": "thrift://url:9083",
                "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
                "schema.compatibility": "BACKWARD",
                "hive.conf.dir": "/usr/hdp/current/hive/target-conf/",
                "hadoop.conf.dir": "/usr/hdp/current/hadoop/target-conf/",
                "hdfs.authentication.kerberos": "true",
                "connect.hdfs.principal": "principal",
                "connect.hdfs.keytab": "/keytab/path",
                "ssl.truststore.location": "truststore.jks",
                "ssl.truststore.password": "password",
                "security.protocol": "SASL_SSL",
                "ssl.keystore.location": "keystore.jks",
                "ssl.keystore.password": "password",
                "ssl.key.password": "password"               
    }
}

使用转换,转换被忽略:

{
    "name": "hdfs-avro-sink-X",
    "config": {
                "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
                ... same conf
                "ssl.key.password": "password",
                "transforms": "MaskField",
                "transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
                "transforms.MaskField.fields": "ano_ass"
    }
}

我可以输入任何东西,无论如何它都会被忽略:

{
    "name": "hdfs-avro-sink-X",
    "config": {
                "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
                ... same conf
                "ssl.key.password": "password",
                "transforms": "Masgfdgfdgfdhield",
                "transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
                "transforms.MaskField.fields": "anfshdghgh_ass"
    }
}

ano_ass 字段存在于我的 avro 架构中 游侠审核没问题

版本:

  • HDP:2.6.4
  • 卡夫卡:0.10.1
  • 架构注册表 hortonworks

【问题讨论】:

  • 我在 kafka 版本 2.8.1 (Commit:839b886f9b732b15) 和 kafka docker image wurstmeister/kafka:2.13-2.8.1 上遇到同样的问题

标签: apache-kafka apache-kafka-connect


【解决方案1】:

如果您运行的是 0.10.1,那么 SMT 还不存在 :) 2.5 多年前,在 0.10.2 版本中,使用 KIP-66 向 Apache Kafka 添加了单消息转换。

您可能需要考虑运行更新版本的 Kafka,其最新版本是 2.3。

它看起来像 latest version of HDP is 3.1,其中包含 Apache Kafka 2.0。如果您想要最新版本的 Kafka (2.3),您可以从 kafka.apache.org 获取它,也可以作为 Confluent Platform 的一部分。

【讨论】:

    猜你喜欢
    • 2017-12-04
    • 2020-06-03
    • 1970-01-01
    • 2019-10-03
    • 2019-08-02
    • 2017-11-09
    • 1970-01-01
    • 2019-05-11
    • 2019-12-27
    相关资源
    最近更新 更多