温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

RabbitMQ消息丢失问题和保证消息可靠性之消费端不丢消息和HA的示例分析

发布时间:2021-09-10 10:51:44 来源:亿速云 阅读:96 作者:柒染 栏目:大数据

RabbitMQ消息丢失问题和保证消息可靠性之消费端不丢消息和HA的示例分析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储。

本文先从消费端讲起:

RabbitMQ Server到消费者消息如何不丢?

上面一篇文章也提到了,消费者获取到消息之后,没有来得及处理完毕,自己直接宕机了,因为消息者默认采用自动ack,此时RabbitMQ的自动ack机制会通知MQ Server这条消息已经处理好了,此时消息就丢了,并不是预期的。

那么我们采用手动ack机制来解决这个问题,消费端处理完逻辑之后再通知MQ Server,这样消费者没处理完消息不会发送ack,如果在消费者拿到消息,没来得及处理的情况下自己挂了,此时MQ集群会自动感知到,它就会自觉的重发消息给其他的消费者服务实例。

根据上面的思路你需要完成下面的两步操作:

第一:消费者监听设置手动ack

  this.channel = channelManager.getListenerChannel(namespace);
  this.queue = queue;
  this.channel.basicConsume(queue, false, consumerTag, this);
  this.disconnectedCallback.setChannel(channel);

核心代码: this.channel.basicConsume(queue, false, consumerTag, this); 第二个参数设置 false 代表不自动ack

第二:业务执行完成后手动ack

public static void ack(MessageContext context) {
        long deliveryTag = context.getEnvelope().getDeliveryTag();
        try {
            context.getChannel().basicAck(deliveryTag, false);
        } catch (IOException e) {
            throw new MqAckException("消息ack出错:连接异常或远端关闭", context, e);
        }
    }

核心代码: context.getChannel().basicAck(deliveryTag, false);

这里封装来,需要业务在执行完自己的业务代码后,调用对象channel 的ack方法通知MQServer,说我这边执行完了,你可以删除了。

注意这里有个问题: 如果忘记调用这个 context.getChannel().basicAck(deliveryTag, false);

或者因为代码异常,这个代码没被执行,会怎么样?后面找时间再写一篇文章讲这个问题。

RabbitMQ Server中存储的消息高可用

当我们解决了,生产端和消费端的问题后,基本保证消息的不丢问题,但是还有一个是消息的高可用问题,单节点问题,普通节点的问题都会影响消息的临时不可用,这个时候要用上我们的HA 镜像集群模式来保证。

上一篇文章 解决RabbitMQ消息丢失问题和保证消息可靠性(一) 已经提到过,服务端消息部署的三种模式的区别,今天就专门讲镜像模式的介绍。

镜像模式至少采用3节点,2个磁盘节点和1个内存节点来保证,架构图:

RabbitMQ消息丢失问题和保证消息可靠性之消费端不丢消息和HA的示例分析

设置镜像也有一些策略:

  • 同步至所有的,一般不这么做,性能会受到极大影响

  • 同步最多N个机器

  • 只同步至符合指定名称的nodes

> 命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

  1. 为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式

rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
  1. 为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式

rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  1. 为每个以“node.”开头的队列分配指定的节点做镜像

rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

> 但是:HA 镜像队列有一个很大的缺点就是: 系统的吞吐量会有所下降

所以采用镜像模式,要根据具体的业务规则定制话处理,没那么重要的业务,消息丢了也没关系的场景,又要求必须高的性能的时候,镜像也可以不用设置。

关于RabbitMQ消息丢失问题和保证消息可靠性之消费端不丢消息和HA的示例分析问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI