【问题标题】:Kafka Connect cannot cast custom storage sink partitioner to Partitioner interfaceKafka Connect 无法将自定义存储接收器分区器转换为分区器接口
【发布时间】:2020-05-28 15:28:00
【问题描述】:

我需要为 kafka connect S3 sink plugin 创建一个自定义分区器。 我已经使用 kotlin 在自定义类中扩展了 HourlyPartitioner

class RawDumpHourlyPartitioner<T> : HourlyPartitioner<T>() {
...
}

并相应地更改了我的连接器配置以使用自定义类:

"partitioner.class": "co.myapp.RawDumpHourlyPartitioner",

然后我创建了我们的 jar(我们使用shadow)并将其包含到基于 kafka 连接镜像的自定义 docker 镜像中(镜像版本与我们在项目中使用的依赖项相同):

FROM gradle:6.0-jdk8 as builder
WORKDIR /app
ADD . .
RUN gradle clean shadowJar

FROM confluentinc/cp-kafka-connect:5.3.2

COPY --from=builder /app/build/libs/kafka-processor-0.1-all.jar /usr/share/java/kafka/kafka-processor.jar

连接器启动时出现此错误:

ERROR WorkerSinkTask{id=staging-raw-dump-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.ClassCastException: co.myapp.RawDumpHourlyPartitioner cannot be cast to io.confluent.connect.storage.partitioner.Partitioner

为了仔细检查,我创建了一个尝试实例化类的 java 文件,它没有抛出任何错误:

import io.confluent.connect.storage.partitioner.Partitioner;

public class InstantiateTest {
    public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class<? extends Partitioner<?>> partitionerClass =
                (Class<? extends Partitioner<?>>) Class.forName("co.myapp.RawDumpHourlyPartitioner");

        Partitioner<?> partitioner = partitionerClass.newInstance();
    }
}

【问题讨论】:

  • 您的班级做了哪些在每小时或基于时间的分区程序中不可用的操作?尝试将 JAR 放入 /usr/share/java/kafka-connect-storage-common。另外,您的CONNECT_PLUGIN_PATH 设置是什么?
  • @cricket_007 基于自定义 avro 字段 + 时间的分区,CONNECT_PLUGIN_PATH=/usr/share/java/,/usr/share/confluent-hub-components/ 类已正确加载,如果我设置另一个不存在的类名,我会得到一个空指针异常
  • 如果我将我的 jar 添加到 /usr/share/java/kafka-connect-storage-common 我会收到另一个错误,因为我们使用同一个 jar 来提供自定义 CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY 并且使用该目录中的 jar 无法找到自定义策略类
  • 您的 JAR 与主题策略无关。你有没有错误地跟踪你的班级?顺便说一下,你需要将 kafka 依赖标记为 Gradle 中提供的那样
  • 顺便说一句,听起来你重写了这个github.com/confluentinc/kafka-connect-storage-common/pull/73

标签: java kotlin apache-kafka apache-kafka-connect


【解决方案1】:

查看 kafka 连接 guide 它说:

Kafka Connect 插件只是一组 JAR 文件,Kafka Connect 可以在其中找到一个或多个连接器、转换和/或转换器的实现。 Kafka Connect 将每个插件相互隔离,以便一个插件中的库不受任何其他插件中的库的影响。这在混合和匹配来自多个提供商的连接器时非常重要。

这意味着,由于我使用的是 S3 接收器连接器,因此我必须将带有自定义分区器的 jar 放在 S3 插件的目录中。

将 jar 文件移动到 /usr/share/java/kafka-connect-s3 解决了这个问题

在我提到的 cmets 中,我的 jar 还包含我们在主 kafka-connect 配置(环境变量)中使用的自定义主题名称策略,在这种情况下,jar 需要位于 /usr/share/java/kafka 文件夹中

更新:正如cricket_007 所说,最好将自定义分区器jar 放入所有其他分区器所在的/usr/share/java/kafka-connect-storage-common 文件夹中

【讨论】:

  • 这不是真的。我目前正在上面提到的文件夹中使用自定义分区器运行 S3 接收器。另外,无论如何,这就是每小时分区器存在的地方。您的模式注册表类需要在 CLASSPATH 中,并且应该在模式注册表目录中,而不是 Kafka
  • 我会在几天内做更多的测试并相应地更新答案,同时我能够只使用分区器类创建一个 jar 并且效果很好
  • @cricket_007 有效,那么使用存储与 s3 文件夹有什么好处?只是在存储文件夹中更好的位置,因为它是其他分区器所在的位置?
  • @cricket_007 你想添加你自己的回复,以便我可以将你的回复标记为已接受,还是我只是更新我的回复?
  • 你的很好。我只是添加评论
【解决方案2】:

根据您使用的 Sink,我们需要将分区器类推到那里,就像我们使用 Confluent Kafka 5.5 和连接器类 Azure Gen2 Storage 时一样。

为此,我们需要编写类似于以下Repo in Github 的自定义分区器。

然后我们将自定义 JAR 放在以下路径中:

/usr/share/confluent-hub-components/confluentinc-kafka-connect-azure-data-lake-gen2-storage/lib/ 

之后我们的连接器类就成功了!

【讨论】:

  • 欢迎来到 StackOverflow。我建议你改进你的答案,看看这里→How do I write a good answer?
  • 另外,虽然 GitHub Repo 的这个链接可能会回答这个问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接的答案可能会失效。
猜你喜欢
  • 2019-08-06
  • 2021-01-27
  • 1970-01-01
  • 1970-01-01
  • 2019-12-21
  • 1970-01-01
  • 1970-01-01
  • 2019-06-17
  • 1970-01-01
相关资源
最近更新 更多