【问题标题】:Apache Kafka the order of messages in partition guaranteeApache Kafka 分区保证消息的顺序
【发布时间】:2020-03-04 07:13:03
【问题描述】:

阅读这篇关于主题分区中消息排序的文章:https://blog.softwaremill.com/does-kafka-really-guarantee-the-order-of-messages-3ca849fd19d2

允许重试而不设置 max.in.flight.requests.per.connection 为 1 可能会改变记录的顺序,因为如果两个 批次被发送到单个分区,第一个失败并且是 重试但第二次成功,则第二批中的记录 可能首先出现。

据此,有两种类型的生产者配置可以实现订购保证:

max.in.flight.requests.per.connection=1 // can impact producer throughput

或替代

enable.idempotence=true
max.in.flight.requests.per.connection //to be less than or equal to 5
max.retries // to be greater than 0
acks=all

谁能解释第二个配置如何实现订单保证?同样在第二个配置中启用了一次性语义。

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    幂等性:(每个分区的顺序语义恰好一次)

    幂等传递使生产者能够准确地将消息写入 Kafka 在主题的生命周期内访问一次主题的特定分区 单个生产者没有数据丢失和每个分区的顺序。

    幂等是在Kafka中实现Exactly-once Semantics的关键特性之一。设置“enable.idempotence=true”最终获得每个分区的精确一次语义,这意味着特定分区没有重复,没有数据丢失。如果发生错误,即使生产者多次发送消息也会被写入 Kafka 一次。

    Kafka producer 的 PID 和 Sequence Number 概念实现幂等如下:

    PID 和序列号

    幂等生产者在生产消息时使用产品 id(PID) 和序列号。生产者不断增加发布的每条消息的序列号,该消息映射具有唯一的 PID。代理总是将当前序列号与前一个序列号进行比较,如果新序列号不比前一个序列号大 +1,它会拒绝,以避免重复,如果大于大于则表示消息丢失。

    在失败的情况下,它仍然会保持序列号并避免重复,如下所示:

    注意:生产者重启时,会分配新的 PID。所以幂等性只承诺单个生产者会话

    如果您使用 enable.idempotence=true,您可以将 max.in.flight.requests.per.connection 保持在 5 以内,并且可以实现顺序保证,从而带来更好的并行性并提高性能。

    在我们使用 max.in.flight.requests.per.connection 和重试和 Acks 设置实现一定程度的保证之前,Kafka 0.11+ 中引入了幂等性功能:

    max.in.flight.requests.per.connection to 1
    max.retries bigger number
    acks=all
    

    ma​​x.in.flight.requests.per.connection=1:确保在重试消息时不会发送其他消息。

    这提供了至少一次的保证,并伴随着性能和吞吐量的成本,这鼓励引入 enable.idempotence 功能来提高性能并同时保证排序。

    exactly_once:要实现exactly_once和幂等性,我们需要将事务设置为read_committed,并且不允许覆盖以下参数:

    • isolation.level:read_committed( 消费者将始终读取已提交 仅数据)

    • enable.idempotence=true(生产者将始终启用幂等性)

    • MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5(生产者将 每个连接总是有一个进行中的请求)

    【讨论】:

    • 我认为这不能回答问题。当然,您展示了简单的情况,当seq=3 失败并且您重试时。更有趣的部分是当您有 5 个请求时,前 2 个已提交,第 3 个请求失败并因此重试,第 4 个和第 5 个请求......已提交?对同一分区的所有请求。那么,当第 3 次重试时会发生什么?更有趣的是,如果有消费者呢?如果他们愿意的话,他们将阅读什么以及如何阅读这些信息。这个答案有点肤浅。
    • 当然,在我上面的例子中,代理根本不应该接受第 4 和第 5 请求,因为第 3 请求还没有到达。但是a)这意味着该分区的每条消息都会延迟,直到重试成功? b) 如果它根本没有成功怎么办? PID 和 seq 是否以某种方式重置?
    • 以及我在 KIP 中的最后一点:“启用幂等性后,我们强制执行 acks=all、retries > 1 和 max.inflight.requests.per.connection=1。 ”。所以当idempotence被启用时,max.inflight.requests.per.connection=1是强制的。根本没有其他方法可以处理这个问题。
    【解决方案2】:

    enable.idempotence 是一个较新的设置,作为kip-98 的一部分引入(在 kafka 0.11+ 中实现)。在此之前,用户必须将 max.inflight 设置为 1。

    它的工作方式(缩写)是生产者现在将序列号放在我们正在进行的生产批次上,并且经纪人跟踪每个连接到他们的生产者的这些序列号。如果代理收到一批乱序(比如第 1 批之后的第 3 批),它会拒绝它并希望看到第 2 批(生产者将重新传输)。有关完整的详细信息,您应该阅读 kip-98

    【讨论】:

    • 感谢您的参考。所以我发现,如果违反订单,经纪人会抛出致命的异常。并且没有重新发送前一批然后继续保持正确顺序的机制。如果代理检测到数据丢失,生产者将引发 OutOfOrderSequenceException。换句话说,如果它接收到的序列号大于它预期的序列号。此异常将在未来返回并传递给回调(如果有)。这是一个致命异常,以后调用 Producer 方法(如 send、beginTransaction 等)将抛出 IllegalStateException
    • 感谢您提供 KIP 参考!有太多错误的假设和答案,而 KIP 准确地说:“当启用幂等性时,我们强制执行 acks=all、retries > 1 和 max.inflight.requests.per.connection=1。没有这些值配置,我们不能保证幂等性。如果这些设置没有被应用程序显式覆盖,当启用幂等性时,生产者将设置 acks=all,retries=Integer.MAX_VALUE,max.inflight.requests.per.connection=1。 .
    猜你喜欢
    • 2015-07-01
    • 2018-07-05
    • 2022-01-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-02
    • 2021-01-13
    相关资源
    最近更新 更多