【问题标题】:Apache Kafka - is it possible to lose messages on error?Apache Kafka - 是否有可能因错误而丢失消息?
【发布时间】:2017-10-13 19:46:58
【问题描述】:

我正在深入研究 Apache KafkaSpring Cloud Stream 并观察到一些行为让我怀疑我做错了什么或者它是否按预期工作 - 我几乎怀疑:

出错时可能会丢失消息!?

我的设置尽可能简单。一个 Kafka 代理和一个只有 1 个分区的主题。具有默认设置的代理、主题、生产者和消费者(自动确认为 true)。

测试用例 1

  • 产生message1
  • 产生message2
  • 启动一个消费者,它会在接收到任何消息时抛出 RuntimeException
  • 消费message1,重试
  • 消费message1,重试
  • 消费message1,重试
  • 抛出异常
  • 消费message2,重试
  • 消费message2,重试
  • 消费message2,重试
  • 抛出异常
  • 停止并重新启动消费者
  • 消费message1,重试
  • 消费message1,重试
  • 消费message1,重试
  • 抛出异常
  • 消费message2,重试
  • 消费message2,重试
  • 消费message2,重试
  • 抛出异常

按预期工作。

测试用例 2

  • 产生message1
  • 产生message2
  • 启动一个消费者,它会在接收到完全是message1时抛出一个RuntimeException
  • 消费message1,重试
  • 消费message1,重试
  • 消费message1,重试
  • 抛出异常
  • 成功消费message2
  • 产生message3
  • 成功消费message3
  • 停止并重新启动消费者
  • 什么都没有发生,消费者等待新消息消费

message1 将被跳过,因为提交的偏移量已设置为message3。这就是困扰我的地方。只要之前的消息未成功处理,我不希望消费者继续处理消息。

有没有人遇到过同样的行为和/或可能可以指导我如何改变这种情况?

提前致谢!


更新:应要求,一些代码sn-ps

创建主题

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

连接生产者

kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

创建一个maven项目

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.7.RELEASE</version>
    <relativePath/>
</parent>

...

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>


<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

添加以下application.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test-topic
          contentType: text/plain
          group: test-group
          consumer:
            header-mode: raw
      kafka:
        binder:
          zkNodes: localhost:2181
          brokers: localhost:9092

添加以下Application.java

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    private void consume(Message<String> message) {
        log.info("Received: {}", message.getPayload());
        if ("message1".equals(message.getPayload())
            throw new RuntimeException();
        log.info("Successfully processed message {}", message.getPayload());
    }
}

应该是这样的。运行应用程序并使用控制台生产者生成消息。

【问题讨论】:

  • 如果只有一个分区,为什么消费者在testcase2中消费message1失败,却消费message2?
  • @herokingsley 我不知道,但这就是正在发生的事情。如果在尝试使用message1 失败后它不会消耗message2,那么我会很满意。
  • 也许向我们展示一些代码或日志会有所帮助
  • @herokingsley 我在我的问题中添加了一些代码 sn-ps。
  • testcase1 的代码是什么样的?

标签: java apache-kafka spring-cloud-stream


【解决方案1】:

在 Kafka 中,每条消息都带有一个偏移量 ID。您的消费者应用程序可以检查偏移量,以及是否跳过或错过任何偏移量,而不是使用下一条消息。您可以使用 consumer.seek 方法获取丢失的特定消息。

偏移量本质上是递增的和连续的。

在你的情况下使用手动提交。

我可以说使用以下步骤..

  1. poll 方法后,首先检查之前提交的偏移量和 并请求下一个偏移值

  2. 成功消费和处理消息后,保存 在某些内部成功处理消息的偏移值 内存或表。在下次投票期间

以下链接不适用于您的用例,但您可以得到公平的想法

参考Example

【讨论】:

  • stphngrtz 没有使用原生 Kafka API
  • 我同意,但是 SCS 提供了很多抽象。无论如何你是对的,自我管理的抵消是一种选择。
  • 如果我决定将消息放入一个特定的分区,这很可能是因为我希望它们按出现的顺序进行处理。我不认为不希望消息这么容易被跳过是一个非常独特的要求。在某些情况下可能会出现这种情况,但请提供某种配置选项来更改行为。无论如何,从我的角度来看,我希望不跳过行为成为默认行为。
  • 您建议的步骤可行,但老实说,我不敢相信这是来自 kafka 和/或 spring 的人希望我们做的事情。在具有多个不同服务或相同服务实例的分布式系统中,我不想跟踪谁成功处理了哪条消息。我希望 kafka 为我做这件事。
  • 有像storm、spark这样的框架可以为你做这件事。但是spring lib做的事情是不需要看文档的。
【解决方案2】:

您应该为这种情况配置 DLQ。如果您的消息在 3 次重试后仍无法被消费,则很可能它根本不会被消费或需要特殊处理。 设置一个 DLQ 可以到达有毒消息的地方,并且您不会丢失消息

【讨论】:

  • 但是将 message1 移动到 DLQ 会破坏排序,对吗?将message1 移动到DLQ 后,线程将继续使用message2
  • 它不会破坏排序。这不像 message2 在 message1 之前被使用。但是,除非您想停止对错误消息的所有消费,因此不提交偏移量,否则在任何面向消息的系统中都会发生这种情况。错误消息被发送到一个特殊的地方,流程继续。否则系统将停止任何错误
【解决方案3】:

Kafka 为您提供运行时,但您拥有选择的权力。在某些情况下 msgs 可能会丢失/跳过,在某些情况下可能不会 - 您需要根据需要准备配置。 IMO 您应该进一步调查一些 Spring Cloud Stream 设置。您还可以尝试禁用自动提交和“手动”提交偏移量。

【讨论】:

  • 我会尝试手动提交,但我几乎无法相信我所描述的行为在大多数情况下都可以。默认情况下,message1 应该阻止 message2 直到成功处理(并因此得到确认),如果您不想阻止,那么您将不得不更改配置 - 但这只是我,也许我没有得到大局现在。
  • 仅仅禁用自动提交是不够的。您需要跟踪已确认的消息,并在处理之前手动检查即将处理的消息是否是下一条消息。这对我来说绝对不满意。
猜你喜欢
  • 2021-04-22
  • 1970-01-01
  • 2010-12-26
  • 2020-11-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多