RabbitMq如何保证消息不丢失

xiaojiuaigc@163.com 发布于 2024-11-10 457 次阅读


生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功

消息失败之后如何处理呢?

  • 回调方法即时重发
  • 记录日志
  • 保存到数据库然后定时重发,成功发送后即刻删除表中的数据

消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

交换机持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
   
return new DirectExchange(“simple.direct”, true, false);
}

队列持久化:

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
   
return QueueBuilder.durable(“simple.queue”).build();
}

 

 

消息持久化,SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定

Message msg = MessageBuilder
        .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
       
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
       
.build();

消息确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到了以后,如果消息依然失败,将消息投递到异常交换机,交由人工处理

总结:

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,多次重试失败后将消息投递到异常交换机,交由人工处理