【问题标题】:How to manually control the offset commit with camel-kafka?如何使用camel-kafka手动控制偏移提交?
【发布时间】:2018-02-07 09:17:06
【问题描述】:

我正在使用骆驼 kafka 组件,但我不清楚在提交偏移量时会发生什么。如下所示,我正在聚合记录,我认为对于我的用例,只有在记录保存到 SFTP 之后提交偏移量才有意义。

是否可以手动控制何时可以执行提交?

private static class MyRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("kafka:{{mh.topic}}?" + getKafkaConfigString())
        .unmarshal().string()
        .aggregate(constant(true), new MyAggregationStrategy())
            .completionSize(1000)
            .completionTimeout(1000)
        .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime())
        .to("sftp://" + getSftpConfigString())

        // how to commit offset only after saving messages to SFTP?

        ;
    }

    private final class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            String oldBody = oldExchange.getIn().getBody(String.class); 
            String newBody = newExchange.getIn().getBody(String.class);
            String body = oldBody + newBody;
            oldExchange.getIn().setBody(body);
            return oldExchange;
        }
    }
}

private static String getSftpConfigString() {
        return "{{sftp.hostname}}/{{sftp.dir}}?"
                + "username={{sftp.username}}"
                + "&password={{sftp.password}}"
                + "&tempPrefix=.temp."
                + "&fileExist=Append"
                ;
}

private static String getKafkaConfigString() {
        return "brokers={{mh.brokers}}" 
            + "&saslMechanism={{mh.saslMechanism}}"  
            + "&securityProtocol={{mh.securityProtocol}}"
            + "&sslProtocol={{mh.sslProtocol}}"
            + "&sslEnabledProtocols={{mh.sslEnabledProtocols}}" 
            + "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}"
            + "&saslJaasConfig={{mh.saslJaasConfig}}" 
            + "&groupId={{mh.groupId}}"
            ;
}

【问题讨论】:

    标签: apache-kafka apache-camel


    【解决方案1】:

    不,你不能。 Kafka 每 X 秒在后台执行一次自动提交(您可以配置此项)。

    camel-kafka 不支持手动提交。这也是不可能的,因为聚合器与 kafka 消费者以及执行提交的消费者是分开的。

    【讨论】:

    • 您能否分享有关您声明“骆驼卡夫卡中不支持手动提交”的文档
    • 这是 4 年前的事了,查看 camel 网站文档中的 camel-kafka 组件,它支持手动提交
    • 是的,想通了,但忘了在这里更新。谢谢
    【解决方案2】:

    我认为这是最新版本的骆驼(2.22.0)(the doc)的变化,你应该能够做到这一点。

    // Endpoint configuration &autoCommitEnable=false&allowManualCommit=true
    public void process(Exchange exchange) {
         KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
         manual.commitSync();
    }
    

    【讨论】:

    • 详细了解Kafka Manual Commit的使用方法可以看这个问题stackoverflow.com/questions/51843677/…
    • 我认为如果路由中有聚合器,手动提交是不可能的。聚合器作为必须提交消息的消费者在单独的线程中工作。有没有人有这个案例的解决方案?对于我的用例,在聚合之后提交很重要,因为如果杀死实例并且没有优雅地关闭路由,我可能会丢失数据。
    • @R_FF92 下面是一个例子
    【解决方案3】:

    您甚至可以通过使用偏移存储库 (Camel Documentation) 在多线程路由中控制手动偏移提交(例如使用聚合器)

    @Override
    public void configure() throws Exception {
          // The route
          from(kafkaEndpoint())
                .routeId(ROUTE_ID)
                // Some processors...
                // Commit kafka offset
                .process(MyRoute::commitKafka)
                // Continue or not...
                .to(someEndpoint());
    }
    
    private String kafkaEndpoint() {
        return new StringBuilder("kafka:")
                .append(kafkaConfiguration.getTopicName())
                .append("?brokers=")
                .append(kafkaConfiguration.getBootstrapServers())
                .append("&groupId=")
                .append(kafkaConfiguration.getGroupId())
                .append("&clientId=")
                .append(kafkaConfiguration.getClientId())
                .append("&autoCommitEnable=")
                .append(false)
                .append("&allowManualCommit=")
                .append(true)
                .append("&autoOffsetReset=")
                .append("earliest")
                .append("&offsetRepository=")
                .append("#fileStore")
                .toString();
    
    }
    
    @Bean(name = "fileStore", initMethod = "start", destroyMethod = "stop")
    private FileStateRepository fileStore() {
        FileStateRepository fileStateRepository = 
        FileStateRepository.fileStateRepository(new File(kafkaConfiguration.getOffsetFilePath()));
        fileStateRepository.setMaxFileStoreSize(10485760); // 10MB max
    
        return fileStateRepository;
    }
    
    private static void commitKafka(Exchange exchange) {
        KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
        manual.commitSync();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-04-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-18
      • 2015-04-09
      相关资源
      最近更新 更多