【问题标题】:Kafka Connect with custom timestamp.extractorKafka Connect 与自定义 timestamp.extractor
【发布时间】:2018-05-31 16:52:17
【问题描述】:

我在尝试将消息从 Kafka 读取到 S3 时,将 jar 添加到 Kafka 连接类路径时遇到问题。

目标是根据时间戳在分区中写入消息,时间戳是Kafka消息中Key的一部分。

为了简短起见,我必须提供自定义时间戳提取器。按照文档here 创建了一个实现TimestampExtractor 接口的类,并将JAR 位置添加到plugin.path 属性。

问题是当我开始连接时,找不到类。不知何故,jar 不在类路径中,我得到了

org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor

其他数据:

版本:Confluent 4.0.0

连接:独立连接

启动命令:

sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties

请提供任何帮助。

【问题讨论】:

    标签: apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    要使自定义时间戳提取器类可用于您的 S3 连接器,您将需要以下内容:

    • 使用您的自定义类以及其他连接器的依赖项添加 jar。示例:

      如果您希望这样,请保存在 ./share/java/kafka-connect-s3 下 仅在 S3 连接器中可用,或在 ./share/java/kafka-connect-storage-common 使其可用于 所有存储接收器连接器(当前为 S3 和 HDFS 连接器)。

    • 确保您的自定义类实现了io.confluent.connect.storage.partitioner.TimestampExtractor 接口。
    • 当您在连接器的配置中设置timestamp.extractor 属性时,请使用完全限定的类名,当然要确保它与您在jar 中定义和打包的包相匹配。例如:

      timestamp.extractor=me.connectors.MyTimestampExtractor

    最后,您将按照类似的过程使自定义分区器可用于您的连接器。

    【讨论】:

    • 谢谢你,你能解释一下为什么plugin.path 不能加载自定义提取器或分区器吗?
    • 当您说它不起作用时,您的意思是它不会从plugin.path 中的单独目录加载它?这是因为 partitioner 类和 timestamp.extractor 类都不是 Connect 插件(至少目前是这样)。连接插件有:连接器、变换器、转换器。
    猜你喜欢
    • 2020-05-26
    • 2021-07-15
    • 2020-08-28
    • 2020-01-07
    • 2020-05-01
    • 2020-10-29
    • 2018-03-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多