admin管理员组

文章数量:1642347

RabbitMQ如何保证消息不丢失

消息传输中的三个过程

生产端到RabbitMQ、RabbitMQ到消费端、消费端消费掉消息

在这三个过程中,任意一过程都将可能导致消息传输处理失败

生产端到RabbitMQ

  • 事务消息机制:会严重降低性能
  • confirm消息确认机制:生产端投递消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端

​ 开启消息确认的方式:

channel.confirmSelect();// 开启发送方确认模式

异步监听确认和未确认的消息

channel.addConfirmListener(new ConfirmListener() {
    //消息正确到达broker
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("已收到消息");
        //做一些其他处理
    }
    //RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未确认消息,标识:" + deliveryTag);
        //做一些其他处理,比如消息重发等
    }
});

此时,无论RabbitMQ是否确认消息,生产端都可以感知到消息是否处理成功。

问题:但是如果RabbitMQ接收到消息,并进行处理,但还未来的及发送确认消息,或者生产端由于网络故障没有接收到消息,,生产端就不知道消息有没有发送成功?

  • 消息持久化

消息持久化解决的是RabbitMQ突然挂掉了导致内存中消息丢失的情况。为了让重启后的RabbitMQ 重启后,能够恢复硬盘中的数据,我们需要将exchange、queue和message都进行持久化

//exchange持久化
//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

//queue持久化
//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

//message 持久化
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

问题:RabbitMQ接收到消息,还未来得及持久化到硬盘的时候,RabbitMQ挂了,消息丢失

* 消息入库

为了保证消息传输过程中,消息不被丢失,将发送消息前,先将消息保存到数据库,增加status状态字段、超时时间以及重发次数。其中状态字段表示当时消息传输是否传输成功。超时时间表示当超过一定时间后,重新发送消息。(可以通过定时任务,定时检测status为0,且时间大于超时时间的消息),重发次数表示最大的重发次数,如果发送一直失败,超过一定次数,就对做单独处理

问题:当目前为止,还存在一个问题,如果消息到了RabbitMQ,但是确认消息接收失败,这种,如果通过重复发送消息的方式可能导致消息的冗余,目前可想到的方式但不一定是最优的方案是,在消息体中,增加一个唯一ID,确定消息的唯一性。

消费端消息不丢失

消费端的话,可以认为有两种情况,消息传输过程中和消息处理过程中,导致消息处理失败。

如果消费端和生产端一样,通过ack机制确认,那么这种问题可以避免。默认RabbitMQ在消息发出后就立即删除这条消息,导致无论消费端是否接受到消息,是否处理完,消息都会丢失

所以,需要将自动ack机制改为手动ack机制

消费端手动确认消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        //接收到消息,做处理
        //手动确认
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        //出错处理,这里可以让消息重回队列重新发送或直接丢弃消息
    }
};
//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

autoAck设置为FALSE,对rabbitMQ而言,如果一直没有接收到消费端的确认信号,并且消费此消息的消费端已经断开或者宕机,RabbitMQ 会自动将该消息重新放入消息队列的头部,等地投递给下一个消息的消费者。

同样,消费端消息和生产的消息都需要保证幂等性。

参考资料:

引入RabbitMQ后,你如何保证全链路数据100%不丢失?

本文标签: 消息RabbitMQ