在 Ubuntu 上设置 RabbitMQ 消息确认
一 环境准备与启用管理插件
- 安装并启动服务(Ubuntu 24.04 示例):
- sudo apt update
- sudo apt install -y erlang rabbitmq-server
- sudo systemctl start rabbitmq-server && sudo systemctl enable rabbitmq-server
- 启用管理插件并创建管理员用户(默认 guest/guest 仅允许本机登录,远程需自建用户):
- sudo rabbitmq-plugins enable rabbitmq_management
- sudo rabbitmqctl add_user admin StrongPass!
- sudo rabbitmqctl set_user_tags admin administrator
- sudo rabbitmqctl set_permissions -p / admin “." ".” “.*”
- 访问管理界面:打开浏览器访问 http://服务器IP:15672,使用新建账号登录,便于观察队列的 Ready/Unacked 等指标。
二 消费者确认设置
- 核心概念
- 自动确认(autoAck=true):消息一旦投递即被认为成功,网络闪断或处理异常会导致消息丢失,适合可靠性要求不高的场景。
- 手动确认(autoAck=false):消费者处理完成后显式调用确认,RabbitMQ 才会删除消息;消费者断开且未确认的消息会重新入队。管理界面可查看 Ready(待投递)与 Unacked(已投递未确认)数量。
- 原生 Java 客户端示例(手动 Ack)
- 关键步骤:关闭自动确认、消费完成后调用 basicAck;处理失败可调用 basicNack/basicReject 决定是否重入队。
- 代码片段:
- channel.basicConsume(queue, false, (consumerTag, delivery) -> {
long tag = delivery.getEnvelope().getDeliveryTag();
try {
// 1) 业务处理
// 2) 成功后确认(multiple 为 true 时可批量确认)
channel.basicAck(tag, false);
} catch (Exception e) {
// 失败:true=重新入队;false=丢弃/进入死信(需有死信配置)
channel.basicNack(tag, false, true);
}
}, consumerTag -> {});
- 要点:
- deliveryTag 按 Channel 单调递增,必须在收到消息的同一 Channel 上确认。
- basicReject 不支持批量,basicNack 支持批量(multiple=true)。
三 生产者确认设置
- 事务机制(性能较低,不推荐):
- channel.txSelect(); channel.basicPublish(…); channel.txCommit(); // 失败则 txRollback()
- 发布者确认 Publisher Confirm(推荐)
- 开启 Confirm:channel.confirmSelect()
- 三种常用方式:
- 同步逐条等待:channel.waitForConfirms()(简单但吞吐低)
- 同步批量等待:累积到 N 条或超时统一等待
- 异步监听(常用):
- channel.addConfirmListener((seq, multiple) -> { /* ack / }, (seq, multiple) -> { / nack */ });
- 路由失败回执 Return(需设置 mandatory=true):
- channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
// 无法路由到队列时处理(如落库/告警/重发)
});
- channel.basicPublish(exchange, routingKey, true, props, body); // mandatory=true
- 可靠性增强建议:队列与消息持久化(队列 durable、消息 deliveryMode=PERSISTENT)与 Confirm 配合使用,可显著降低消息丢失风险。
四 Spring Boot 配置示例
- application.yml(生产者与消费者均可用)
- spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: StrongPass!
virtual-host: /
生产者确认
publisher-confirm-type: correlated # 新版使用 publisher-confirm-type
publisher-returns: true
template:
mandatory: true
消费者确认
listener:
simple:
acknowledge-mode: manual # 可选:none/auto/manual
- 消费者手动确认片段
- @RabbitListener(queues = “ack_queue”)
public void listen(Message message, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
try {
// TODO 业务处理
channel.basicAck(tag, false);
} catch (Exception e) {
// 失败重入队(或转入死信)
channel.basicNack(tag, false, true);
}
}
- 说明:Spring AMQP 提供 AcknowledgeMode.NONE/AUTO/MANUAL 三种策略;处理异常且未确认时,消息会保持 Unacked 状态,必要时会重新投递。
五 验证与常见问题
- 验证要点
- 管理界面观察队列的 Ready/Unacked 变化是否符合预期(手动 Ack 应在处理完成后下降)。
- 生产者开启 Confirm/Return 后,断网或路由失败时能在回调中捕获并处理。
- 常见问题与建议
- 不要混合使用事务与 Confirm;优先选择 Publisher Confirm。
- 消费者异常导致 Unacked 积压时,先排查处理耗时与异常路径,必要时引入 死信队列(DLQ) 与重试策略,避免无限重投递。