一、保证消息投递成功

  1)Kafka

  2)RabbitMQ

  3)RocketMQ

 

二、保证消息不会丢失

  1、生产者确保消息成功发送到Broker

  2、Broker保证消息不丢

  3、消费者确保消息消费成功

 

三、保证消息成功消费

  1、Kafka:关闭手动提交,消费成功之后发送ack

  2、RabbitMQ:消费者回复ack确认

 

四、如何保证消息的有序性?

  1、producer生产有序

    通过算法,将需要保持先后顺序的消息放在同一个消息队列中,然后只用一个消费者去消费该队列。

    1)Kafka 只能保证Topic内每个partition的有序性,无法保证整个Topic的有序性,因此将Topic的partition只设为一个就能保证消息被顺序消费。

      Kafka支持局部有序的方式,把一类消息都放入同一个partition(发送消息时指定partition key),就保证了这组消息的顺序。

    2)RabbitMQ:如果存在多个消费者,那么就让每个消费者对应一个queue,然后把要发送的数据全部放到一个queue,这样就能保证所有的数据只到达一个消费者从而保证每个数据到达数据库都是顺序的。

    (拆分多个queue,每个queue一个consumer。或者就是一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理)。

  2、consumer消费有序

    1)Kafka一个分区只会被消费组内的一个消费者消费,因此,Kafka只要顺序消息在同一个partition中,且当消费组使用单线程消费时,是可以保证消息消费有序的。

    2)RabbitMQ:只能拥有一个consumer

    

 

 

五、如何处理消息不被重复消费呢

  消费者保证消费幂等

  消息表

 

 

六、消费者消费失败,导致消息积压怎么处理? 

  什么是消息积压?

    Consumer消费出问题,没有及时发现,导致大量消息积压在MQ中

  消息积压怎么处理?

    1、找出消费者消费失败的原因,能修复的话,则修复让消费者恢复正常消费

    2、如果修复很麻烦,比较耗时,时间来不及,则做转发处理。写一个临时消费方案,将积压的消息消费掉,然后再转发到一个新的临时MQ的topic中。这个新的topic机器资源单独申请,要能承载住当前积压的消息。

    3、处理完积压数据之后,修复之前出问题的消费者,去消费新的MQ的topic数据,消费完后再恢复原来的消费状态

  总结:

    消息积压是件棘手的事情,最好是提前防范,做好硬件和消息系统的健康监控。如果出现消息丢失,就要人工查找丢失的消息,然后补上。在消费不过来的时候,可以考虑使用临时队列作为中转,提升处理能力。

 

 

附录:

1、Kafka保证消息不丢失方案

一、Producer 消息发送

  1、消息发送确认:消息数据是存储在分区中的,而分区又可能有多个副本,所以一条消息被发送到Broker之后何时算投递成功呢?Kafka提供了三种模式:

    1):不等Broker确认,消息被发送出去就认为是成功的。这种方式延迟最小,但是不能保证消息已经被成功投递到Broker

    2):由leader确认,当leader确认接收到消息就认为投递是成功的,然后由其他副本通过异步方式拉取

    3):由所有的leader和follower都确认接收到消息才认为是成功的。采用这种方式投递的可靠性最高,但相对会损伤性能

    // 生产者消息发送确认模式,0表示第一种,1表示第二种,all表示第三种
        props.put("acks", "1");

  2、消息重发:Kafka为了高可用性,生产者提供了自动重试机制。当从Broker接收到的是临时可恢复的异常时,生产者会向Broker重发消息,但不能无限

  次重发,如果重发次数达到阀值,生产者将不再重试并返回错误。

     // 消息发送重试次数
        props.put("retries", "10");
        // 重试间隔时间,默认100ms,设置时需要知道节点恢复所用的时间,要设置的比节点恢复所用时间长
        props.put("retry.backoff.ms", "1000");

 

 

二、Broker

  Broker会存在丢失消息的情况,解决方案是Kafka通过Producer和Broker协同处理单个Broker丢失数据的情况,一旦Producer发现Broker数据丢失,即可自动retry。具体是Producer发送消息时的ack机制来实现的,ack为all才能保证数据不会丢失。

  Broker丢失消息是Kakfa为了得到更高的性能和吞吐量,将数据异步的存储在磁盘中。Kafka为了提高性能,减少刷盘次数,采用了批量刷盘的做法。即按照一定的消息量,和时间间隔进行刷盘。

  Linux的机制,数据要存储到Linux系统中,会先存储到页缓存(Page Cache)中,按照时间或者其他条件进行刷盘(从Page Cache到file),或者通过fsync命令强制刷盘。

  当数据在Page Cache中,还未刷盘,这时系统挂掉,数据就会丢失。

 

 

三、消息消费

  从设计上来说,由于Kafka服务端并不保存消息的状态,所以在消费消息时就需要消费者自己去做很多事情,消费者每次调用poll方法时,该方法总是返回

由生产者写入Kafka中但还没有被消费者消费的消息。Kafka在设计上有一个不同于其他JMS队列的地方是生产者的消息并不需要消费者确认,而消息在分区中

又是顺序排列的,那么必然就可以通过一个偏移量offset来确定每一条消息的位置,偏移量在消息消费的过程中起着很重要的作用。

  更新分区当前位置的操作叫做提交偏移量,Kafka中有个叫做_consumer_offset的特殊主题用来保存消息在每个分区的偏移量,消费者每次消费时都会往

这个主题中发送消息,消息包含每个分区的偏移量。如果消费者崩溃或者有新的消费者加入消费组从而触发再均衡操作,再均衡之后该分区的消费者若不是之前

那个,那么新的消费者如何得知该分区的消息已经被之前的消费者消费到哪个位置了呢?这种情况下,就提现了偏移量的用处。为了能继续之前的工作,新的消

费者需要读取每个分区最后一次提交的偏移量,然后再从偏移量开始继续往下消费消息。

偏移量提交方式:

  1、自动提交

  Kafka默认会定期自动提交偏移量,提交的默认时间间隔是5000ms,但可能存在提交不及时导致再均衡之后重复消费的情况

  自动提交不是每消费一条消息就提交一次,而是定期提交,定期提交的间隔由 auto.commit.interval.ms 配置,默认是5秒。即

每隔5秒会将拉取到的每个分区中最大的消息offset进行提交。自动提交的动作是在poll方法的逻辑中完成的,在每次真正向服务端拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

        // Consumer的offset是否自动提交
        props.put("enable.auto.commit", "true");
        // 自动提交offset到zk的时间间隔,时间单位是毫秒
        props.put("auto.commit.interval.ms", "1000");

  2、手动提交

  先关闭消费者的自动提交配置,然后使用commitSync方法提交偏移量。

MQ如何保证消息不丢失
MQ如何保证消息不丢失
    // 关闭自动提交
        props.put("enable.auto.commit", "false");
    // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println(record.partition() + record.offset());
                System.out.println(record.key());
                System.out.println(record.value());
            }
            // 手动提交最新的偏移量
            consumer.commitSync();
        }
MQ如何保证消息不丢失
MQ如何保证消息不丢失

commitSync方法会提交由poll返回的最新偏移量,所以在处理完记录后要确保调用了commitSync方法,否则还是会发生重复处理的问题。

 

  3、异步提交

  使用commitSync方法提交偏移量有一个不足之处,就是该方法在Broker对提交请求做出回应前是阻塞的,要等待回应。因此,采用这种方式每提交一次偏移量就

等待一次限制了消费端的吞吐量,因此Kafka提供了异步提交的方式【consumer.commitAsync();】,消费者只管发送提交请求,而不需要等待Broker的立即回应。

但commitSync方法在成功提交之前如碰到无法恢复的错误之前会一直重试,而commitAsync并不会,因为为了避免异步提交的偏移量被覆盖。

 

 

END.

相关文章: