温馨提示×

RabbitMQ消息确认Ubuntu如何设置

小樊
34
2026-01-03 12:11:58
栏目: 智能运维

在 Ubuntu 上设置 RabbitMQ 消息确认

一 环境准备与启用管理插件

  • 安装并启动服务(Ubuntu 24.04 示例):
    • sudo apt update
    • sudo apt install -y erlang rabbitmq-server
    • sudo systemctl start rabbitmq-server && sudo systemctl enable rabbitmq-server
  • 启用管理插件并创建管理员用户(默认 guest/guest 仅允许本机登录,远程需自建用户):
    • sudo rabbitmq-plugins enable rabbitmq_management
    • sudo rabbitmqctl add_user admin StrongPass!
    • sudo rabbitmqctl set_user_tags admin administrator
    • sudo rabbitmqctl set_permissions -p / admin “." ".” “.*”
  • 访问管理界面:打开浏览器访问 http://服务器IP:15672,使用新建账号登录,便于观察队列的 Ready/Unacked 等指标。

二 消费者确认设置

  • 核心概念
    • 自动确认(autoAck=true):消息一旦投递即被认为成功,网络闪断或处理异常会导致消息丢失,适合可靠性要求不高的场景。
    • 手动确认(autoAck=false):消费者处理完成后显式调用确认,RabbitMQ 才会删除消息;消费者断开且未确认的消息会重新入队。管理界面可查看 Ready(待投递)与 Unacked(已投递未确认)数量。
  • 原生 Java 客户端示例(手动 Ack)
    • 关键步骤:关闭自动确认、消费完成后调用 basicAck;处理失败可调用 basicNack/basicReject 决定是否重入队。
    • 代码片段:
      • channel.basicConsume(queue, false, (consumerTag, delivery) -> { long tag = delivery.getEnvelope().getDeliveryTag(); try { // 1) 业务处理 // 2) 成功后确认(multiple 为 true 时可批量确认) channel.basicAck(tag, false); } catch (Exception e) { // 失败:true=重新入队;false=丢弃/进入死信(需有死信配置) channel.basicNack(tag, false, true); } }, consumerTag -> {});
    • 要点:
      • deliveryTag 按 Channel 单调递增,必须在收到消息的同一 Channel 上确认。
      • basicReject 不支持批量basicNack 支持批量(multiple=true)。

三 生产者确认设置

  • 事务机制(性能较低,不推荐):
    • channel.txSelect(); channel.basicPublish(…); channel.txCommit(); // 失败则 txRollback()
  • 发布者确认 Publisher Confirm(推荐)
    • 开启 Confirm:channel.confirmSelect()
    • 三种常用方式:
      • 同步逐条等待:channel.waitForConfirms()(简单但吞吐低)
      • 同步批量等待:累积到 N 条或超时统一等待
      • 异步监听(常用):
        • channel.addConfirmListener((seq, multiple) -> { /* ack / }, (seq, multiple) -> { / nack */ });
    • 路由失败回执 Return(需设置 mandatory=true):
      • channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> { // 无法路由到队列时处理(如落库/告警/重发) });
      • channel.basicPublish(exchange, routingKey, true, props, body); // mandatory=true
    • 可靠性增强建议:队列与消息持久化(队列 durable、消息 deliveryMode=PERSISTENT)与 Confirm 配合使用,可显著降低消息丢失风险。

四 Spring Boot 配置示例

  • application.yml(生产者与消费者均可用)
    • spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: StrongPass! virtual-host: /

      生产者确认

      publisher-confirm-type: correlated # 新版使用 publisher-confirm-type publisher-returns: true template: mandatory: true

      消费者确认

      listener: simple: acknowledge-mode: manual # 可选:none/auto/manual
  • 消费者手动确认片段
    • @RabbitListener(queues = “ack_queue”) public void listen(Message message, Channel channel) throws IOException { long tag = message.getMessageProperties().getDeliveryTag(); try { // TODO 业务处理 channel.basicAck(tag, false); } catch (Exception e) { // 失败重入队(或转入死信) channel.basicNack(tag, false, true); } }
  • 说明:Spring AMQP 提供 AcknowledgeMode.NONE/AUTO/MANUAL 三种策略;处理异常且未确认时,消息会保持 Unacked 状态,必要时会重新投递。

五 验证与常见问题

  • 验证要点
    • 管理界面观察队列的 Ready/Unacked 变化是否符合预期(手动 Ack 应在处理完成后下降)。
    • 生产者开启 Confirm/Return 后,断网或路由失败时能在回调中捕获并处理。
  • 常见问题与建议
    • 不要混合使用事务与 Confirm;优先选择 Publisher Confirm
    • 消费者异常导致 Unacked 积压时,先排查处理耗时与异常路径,必要时引入 死信队列(DLQ) 与重试策略,避免无限重投递。

0