【问题标题】:Custom partition assignment in Kafka JDBC connectorKafka JDBC 连接器中的自定义分区分配
【发布时间】:2019-08-06 21:09:59
【问题描述】:

我有一个用例,我需要编写一个自定义逻辑来根据消息中的某些关键参数分配分区。我对此做了一些研究,发现 kafka 转换支持覆盖转换接口中的一些方法,但我无法在 git hub 或其他地方做一些示例代码。有人可以分享示例代码或 git hub 链接以在 kafka JDBC 源连接器中进行自定义分区分配吗?

提前致谢!

【问题讨论】:

  • 你能发布一些代码来展示你的尝试吗?
  • 您好 Kieveli,感谢您的回复。我在自定义生产者中做了这个自定义分区,它工作正常,但我想在 kafka 源连接器中做同样的事情。由于我找不到一些样品,我还没有尝试过。
  • 我之所以问这个问题,是因为当您有示例代码显示您遇到的问题而不是对代码的一般请求时,该站点运行良好。添加您的示例并重新表述问题可能会更快地为您提供帮助。

标签: java apache-kafka apache-kafka-connect


【解决方案1】:

Kafka Connect默认分配分区使用:DefaultPartitioner (org.apache.kafka.clients.producer.internals.DefaultPartitioner)

如果您需要用一些自定义覆盖默认值,这是可能的,但您必须记住,覆盖适用于所有源连接器。 为此,您必须设置producer.partitioner.class 属性,例如producer.partitioner.class=com.example.CustomPartitioner。 此外,您必须将带有分区器的 jar 复制到带有 Kafka Connect 库的目录中。

变换方式:

在Transformation中也可以设置分区,但这不是正确的方法。 从Transformation,您无权访问主题元数据,这对于分配分区至关重要。

如果您想为记录设置分区,代码应如下所示:

public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {

    public static final ConfigDef CONFIG_DEF = new ConfigDef();

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    }

    @Override
    public R apply(R record) {
        return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
    }

    private Integer calculatePartition(R record) {
        // Partitions calcuation based on record information
        return 0;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}

【讨论】:

  • @wardziniak- 我按照您的建议尝试了自定义分区,但我无法覆盖分区分配。我已在原始帖子中发布了连接器和自定义分区的详细信息。如果我在这里遗漏任何东西,请您检查并告诉我。感谢您继续回复!。
  • @JamesMark, producer.partitioner.class 是所有连接器的全局设置。您必须在 kafka 连接全局属性中设置它,例如文件 connect-standalone.properties(对于独立)或 connect-distributed.properties
  • @wardziniak-太棒了,非常感谢!.. 它按照您提供的解决方案工作,并在连接器中添加了以下转换! “转换”:“cusPart”,“transforms.cusPart.type”:“com.anthem.kafkaconnect.AddPartition”
猜你喜欢
  • 2019-12-21
  • 1970-01-01
  • 1970-01-01
  • 2023-01-28
  • 1970-01-01
  • 1970-01-01
  • 2020-04-09
  • 2021-01-27
  • 1970-01-01
相关资源
最近更新 更多