【发布时间】:2020-05-28 15:28:00
【问题描述】:
我需要为 kafka connect S3 sink plugin 创建一个自定义分区器。 我已经使用 kotlin 在自定义类中扩展了 HourlyPartitioner:
class RawDumpHourlyPartitioner<T> : HourlyPartitioner<T>() {
...
}
并相应地更改了我的连接器配置以使用自定义类:
"partitioner.class": "co.myapp.RawDumpHourlyPartitioner",
然后我创建了我们的 jar(我们使用shadow)并将其包含到基于 kafka 连接镜像的自定义 docker 镜像中(镜像版本与我们在项目中使用的依赖项相同):
FROM gradle:6.0-jdk8 as builder
WORKDIR /app
ADD . .
RUN gradle clean shadowJar
FROM confluentinc/cp-kafka-connect:5.3.2
COPY --from=builder /app/build/libs/kafka-processor-0.1-all.jar /usr/share/java/kafka/kafka-processor.jar
连接器启动时出现此错误:
ERROR WorkerSinkTask{id=staging-raw-dump-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.ClassCastException: co.myapp.RawDumpHourlyPartitioner cannot be cast to io.confluent.connect.storage.partitioner.Partitioner
为了仔细检查,我创建了一个尝试实例化类的 java 文件,它没有抛出任何错误:
import io.confluent.connect.storage.partitioner.Partitioner;
public class InstantiateTest {
public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
Class<? extends Partitioner<?>> partitionerClass =
(Class<? extends Partitioner<?>>) Class.forName("co.myapp.RawDumpHourlyPartitioner");
Partitioner<?> partitioner = partitionerClass.newInstance();
}
}
【问题讨论】:
-
您的班级做了哪些在每小时或基于时间的分区程序中不可用的操作?尝试将 JAR 放入
/usr/share/java/kafka-connect-storage-common。另外,您的CONNECT_PLUGIN_PATH设置是什么? -
@cricket_007 基于自定义 avro 字段 + 时间的分区,
CONNECT_PLUGIN_PATH=/usr/share/java/,/usr/share/confluent-hub-components/类已正确加载,如果我设置另一个不存在的类名,我会得到一个空指针异常 -
如果我将我的 jar 添加到
/usr/share/java/kafka-connect-storage-common我会收到另一个错误,因为我们使用同一个 jar 来提供自定义CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY并且使用该目录中的 jar 无法找到自定义策略类 -
您的 JAR 与主题策略无关。你有没有错误地跟踪你的班级?顺便说一下,你需要将 kafka 依赖标记为 Gradle 中提供的那样
-
顺便说一句,听起来你重写了这个github.com/confluentinc/kafka-connect-storage-common/pull/73
标签: java kotlin apache-kafka apache-kafka-connect