【问题标题】:Kafka streams exactly once delivery卡夫卡流式传输恰好一次交付
【发布时间】:2018-11-28 18:31:44
【问题描述】:

我的目标是从主题 A 消费,做一些处理并生产到主题 B,作为一个单一的原子动作。为了实现这一点,我看到了两个选项:

  1. here 所述,使用 spring-kafka @Kafkalistener 和 KafkaTemplate。
  2. 使用 Streams eos(exactly-once)功能。

我已成功验证选项 #1。成功,我的意思是如果我的处理失败(抛出 IllegalArgumentException)来自主题 A 的消费消息继续被 KafkaListener 消费。这是我所期望的,因为未提交偏移量并且使用了DefaultAfterRollbackProcessor

如果我使用流从主题 A 消费、处理和发送到主题 B(选项 #2)而不是 KafkaListener,我希望看到相同的行为。但是,即使在我处理 IllegalArgumentException 时抛出了消息,流也只会使用一次。这是预期的行为吗?

在 Streams 情况下,我唯一的配置如下:

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");        
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);
    }
}

//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here: https://docs.spring.io/spring-kafka/reference/html/_reference.html#after-rollback
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            streamsBuilderFactoryBean.stop();
            log.debug("creating and starting a new StreamsThread ..");
            streamsBuilderFactoryBean.start();
        }
    });
    return streamsBuilderFactoryBean;
}

我的直播是这样的:

@Autowired
public SpecificAvroSerde<InvoiceEvents> eventSerde;

@Autowired
private TaxService taxService;

@Bean
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {

    KStream<String, InvoiceEvents> kStream = builder.stream("A",
            Consumed.with(Serdes.String(), eventSerde));

      kStream
        .mapValues(v -> 
            {
                // get tax from possibly remote service
                // an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
                int tax = taxService.getTaxForInvoice(v);
                // create a TaxCalculated event
                InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
                log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
                return taxCalculatedEvent;
            })
        .to("B", Produced.with(Serdes.String(), eventSerde));

    return kStream;
}

快乐路径流场景有效:如果在处理过程中没有抛出异常,则消息正确显示在主题 B 中。

我的单元测试:

@Test
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
    log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
    Mockito.when(taxService.getTaxForInvoice(any(InvoiceEvents.class)))
                        .thenThrow(new IllegalArgumentException("Tax calculation failed"));

    InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
    List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
            new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));

     Properties producerConfig = new Properties();
     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
     producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
     producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
     producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
     producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");

    // produce with key
    IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);

    // wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
    Thread.sleep(30000);
// ...
}

更新: 在我的单元测试中,我发送了 50 个 invoiceEvents (orderId=1,...,50),我处理它们并将它们发送到目标主题。

在我的日志中,我看到的行为如下:

invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown
..new stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
...
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task.
..new stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed

为什么在第二次失败后,从 invoiceEvent.orderId = 46 重新消费?

【问题讨论】:

  • 问题是,您在输出主题中看到了什么?启用完全一次,保证输出主题包含一个结果,就好像没有发生错误一样。是否重新使用数据取决于输出是否在失败之前成功写入。什么时候抛出异常有点不清楚?
  • 感谢马蒂亚斯的快速回复!在我的单元测试中,我总是从 taxService.getTaxForInvoice(v) 中抛出 IllegalArgumentException 异常。我的期望是消息应该被重新消费,但我只看到初始消费(即没有重新消费)。我试图了解我的预期是否正确。
  • 我编辑了我的帖子以包含单元测试。
  • 如果抛出异常,StreamThread会死掉,不会自动重启。在观察到第一个异常(通过未捕获的异常处理程序)后,您是否关闭()Kafka Streams,创建一个新的,然后重新启动它?
  • 提交是基于挂钟时间通过参数commit.interval.ms 进行的配置。如果您希望每条消息都提交,则设置 max.poll.records=1 是正确的,但还不够。您还需要为每条记录手动请求提交(您可以插入仅转发数据并请求提交的transformValues 步骤)。参照。 stackoverflow.com/questions/43416178/…

标签: apache-kafka apache-kafka-streams spring-kafka


【解决方案1】:

使选项 2(流事务)起作用的关键点是:

  • 分配一个 Thread.UncaughtExceptionHandler() 以便在出现任何未捕获的异常时启动一个新的 StreamThread(默认情况下,StreamThread 死亡 - 请参见下面的代码 sn-p)。如果到 Kafka 代理的生产失败,甚至会发生这种情况,它不必与流中的业务逻辑代码相关。
  • 考虑设置一个策略来处理消息的去序列化(当你消费时)。检查 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG (javadoc)。例如,您应该忽略并使用下一条消息还是停止使用相关 Kafka 分区。
  • 在 Streams 的情况下,即使您设置 MAX_POLL_RECORDS_CONFIG=1(每个轮询/批次一条记录),仍然消耗的偏移量和生成的消息不会在每条消息中提交。这种情况会导致问题中描述的情况(请参阅“为什么在第二次失败后,它会从 invoiceEvent.orderId = 46 重新使用?”)。
  • Kafka 事务根本无法在 Windows 上运行。该修复程序将在 Kafka 1.1.1 (https://issues.apache.org/jira/browse/KAFKA-6052) 中提供。
  • 考虑检查您如何处理序列化异常(或生产期间的一般异常)(herehere

    @Configuration
    @EnableKafkaStreams
    public class KafkaStreamsConfiguration {
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public StreamsConfig  kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
            // this should be enough to enable transactions
            props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
            return new StreamsConfig(props);
        }
    }
    
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
    public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) 
    {
        StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
        streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
                streamsBuilderFactoryBean.stop();
                log.debug("creating and starting a new StreamsThread ..");
                streamsBuilderFactoryBean.start();
            }
        });
        return streamsBuilderFactoryBean;
    }
    

【讨论】:

    猜你喜欢
    • 2018-07-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多