温馨提示×

RabbitMQ消息确认机制Ubuntu设置

小樊
43
2025-12-12 23:14:22
栏目: 智能运维

Ubuntu上配置RabbitMQ消息确认机制

一 环境准备与启用管理插件

  • 安装组件并启动服务(Ubuntu 20.04/22.04/24.04 通用)
    • 更新索引并安装:sudo apt update && sudo apt install -y erlang rabbitmq-server
    • 启动与自启:sudo systemctl start rabbitmq-server && sudo systemctl enable rabbitmq-server
    • 启用管理插件:sudo rabbitmq-plugins enable rabbitmq_management
    • 状态检查:sudo systemctl status rabbitmq-server
  • 创建管理员并开放访问
    • 新建用户:sudo rabbitmqctl add_user your_username your_password
    • 赋权管理员:sudo rabbitmqctl set_user_tags your_username administrator
    • 授予默认 vhost 权限:sudo rabbitmqctl set_permissions -p / your_username “." ".” “.*”
    • 访问管理界面:浏览器打开 http://服务器IP:15672,使用新建账号登录。

二 消费者确认机制设置

  • 核心要点
    • 关闭自动确认(autoAck),改为在处理完成后显式调用 ACK/NACK,避免消费者宕机导致消息丢失。
    • 处理成功调用 basic.ack(deliveryTag=method.delivery_tag);处理失败可调用 basic.nack(deliveryTag=…, requeue=true/false) 决定是否重回队列。
  • Python pika 示例(手动确认)
    • 安装客户端:pip install pika
    • 示例代码:
      • import pika
        
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='task_queue', durable=True)
        
        def callback(ch, method, properties, body):
            try:
                # TODO: 业务处理
                print(f"处理消息: {body}")
                ch.basic_ack(delivery_tag=method.delivery_tag)  # 显式确认
            except Exception:
                # 处理失败:requeue=True 重回队列;False 丢弃或进入死信
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        
        # 关闭自动确认
        channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
        print('等待消息中...')
        channel.start_consuming()
        
    • 说明:上述示例关闭了 auto_ack,并在业务完成后手动 ACK;异常时调用 NACK 并选择是否 requeue

三 生产者确认与持久化

  • 可靠投递的常用组合
    • 事务(较少用):channel.txSelect() → basicPublish → txCommit()/txRollback(),保证消息到达服务器,但性能较低。
    • 发布者确认 Publisher Confirms(推荐):channel.confirmSelect(),配合 waitForConfirms()add_confirm_listener() 确认消息被服务器接收并(配合持久化)落盘。
  • Java 示例(发布者确认 + 持久化)
    • 依赖(Maven):
      • <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.20.0</version>
        </dependency>
        
    • 示例代码:
      • import com.rabbitmq.client.*;
        import java.nio.charset.StandardCharsets;
        
        public class Producer {
            public static void main(String[] args) throws Exception {
                ConnectionFactory f = new ConnectionFactory();
                f.setHost("192.168.187.133"); // 服务器地址
                f.setUsername("rabbit");
                f.setPassword("rabbit");
        
                try (Connection c = f.newConnection(); Channel ch = c.createChannel()) {
                    // 队列持久化
                    ch.queueDeclare("my_queue", true, false, false, null);
                    // 开启发布者确认
                    ch.confirmSelect();
        
                    String msg = "Hello, confirm!";
                    // 消息持久化
                    ch.basicPublish("", "my_queue",
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        msg.getBytes(StandardCharsets.UTF_8));
        
                    // 同步等待确认(生产环境可按需批量/异步确认)
                    if (ch.waitForConfirms()) {
                        System.out.println("消息已确认(到达并持久化)");
                    }
                }
            }
        }
        
    • 说明:通过 confirmSelect() 开启确认;队列与消息均设为持久化,可显著降低消息丢失风险。

四 常见问题与最佳实践

  • 必须同时具备的条件
    • 消费者侧关闭 autoAck,在业务处理完成后显式 ACK/NACK,否则消息可能在处理前被删除。
    • 需要“不丢消息”的场景,务必使用持久化(队列 durable=true、消息 delivery_mode=2/PERSISTENT_TEXT_PLAIN)并开启 Publisher Confirms
  • 无法路由的消息
    • 发布时将 mandatory=true 并注册 ReturnListener,对无法路由的消息进行处理或告警,避免静默丢弃。
  • 连接与权限
    • 远程访问管理界面需确保防火墙放行 15672 端口,并为应用创建具备相应 vhost 权限的用户。

0