【问题标题】:Kafka Exactly once with Transactional ProducerKafka Exactly once with Transactional Producer
【发布时间】:2018-08-08 05:30:31
【问题描述】:

我正在尝试使用事务生产者/消费者来准确理解 Kafka。

我遇到了以下示例。但是,我仍然很难准确地理解一次。这段代码正确吗?

producer.sendOffsetsToTransaction - 这段代码是做什么的?是否应该针对相同的目标主题执行此操作?

consumer.commitSync() 之前的系统崩溃是什么; // 会再次读取相同的消息并产生重复的消息?

public class ExactlyOnceLowLevel {

    public void runConsumer() throws Exception {
        final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
        final Producer<Long, String> producer = createProducer();

        producer.initTransactions();

        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

            try {
                final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
                producer.beginTransaction();
                for (final ConsumerRecord<byte[], byte[]> record : records) {
                    System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
                                record.offset(), record.key(), record.value());

                    final ProducerRecord<Long, String> producerRecord =
                                new ProducerRecord<>(TOPIC_1, new BigInteger(record.key()).longValue(), record.value().toString());
                    // send returns Future
                    final RecordMetadata metadata = producer.send(producerRecord).get();
                    currentOffsets.put(new TopicPartition(TOPIC_1, record.partition()), new OffsetAndMetadata(record.offset()));
                }
                producer.sendOffsetsToTransaction(currentOffsets, "my-transactional-consumer-group"); // a bit annoying here to reference group id rwice
                producer.commitTransaction();
                consumer.commitSync();
                currentOffsets.clear();
                // EXACTLY ONCE!
            }
            catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                e.printStackTrace();
                // We can't recover from these exceptions, so our only option is to close the producer and exit.
                producer.close();
            }
            catch (final KafkaException e) {
                e.printStackTrace();
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }
            finally {
                producer.flush();
                producer.close();
            }
        }
    }

    private static KafkaConsumer<byte[], byte[]> createConsumer() {
        final Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be

        return new KafkaConsumer<>(consumerConfig);
    }

    private static Producer<Long, String> createProducer() {
        final Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1

        return new KafkaProducer<>(props);
    }

    public static void main(final String... args) throws Exception {

        final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
        example.runConsumer();

    }
}

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    在将读/处理/写模式与 Kafka 事务一起使用时,您不应尝试向消费者提交偏移量。正如您所暗示的,这可能会导致问题。

    在这个用例中,需要将偏移量添加到事务中,并且您应该只使用sendOffsetsToTransaction() 来执行此操作。该方法确保仅在完整事务成功时才提交这些偏移量。见Javadoc

    向消费者组协调器发送指定偏移量列表, 并将这些偏移量标记为当前事务的一部分。这些 只有当事务被提交时,偏移才会被认为是提交 提交成功。提交的偏移量应该是下一个 您的应用程序将使用的消息,即 lastProcessedMessageOffset + 1。

    当你需要批量消费和 一起生产消息,通常在消费-转换-生产中 图案。因此,指定的 consumerGroupId 应该与 配置参数 group.id 使用的消费者。请注意,该 消费者应该有 enable.auto.commit=false 也不应该 手动提交偏移量(通过同步或异步提交)。

    【讨论】:

    • 所以,sendOffsetsToTransaction 应该是到源主题。如果调用 currentOffsets.clear(); 时出现问题怎么办?
    • 在您的示例中,您不必调用clear(),因为您将在开始下一次迭代时重新创建新地图。即使您想在事务提交后调用clear(),也不会丢失任何数据。
    • 我们这里需要事务消费者吗? consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED);它在这里扮演什么角色,因为它只是从其他主题中读取。
    • 如果输入主题不包含任何事务,则保留此设置的默认值。如果它确实包含交易,那么这取决于您的要求。
    • producer.sendOffsetsToTransaction -> 交易偏移量存储在哪里?是 __stransaction_state 主题还是在事务提交日志中或 ....
    猜你喜欢
    • 2019-12-10
    • 2019-08-25
    • 2021-07-16
    • 2020-01-19
    • 2019-07-23
    • 2019-11-15
    • 1970-01-01
    • 2020-08-20
    • 2022-12-28
    相关资源
    最近更新 更多