温馨提示×

RabbitMQ消息路由Linux如何实现

小樊
38
2025-12-04 20:17:02
栏目: 智能运维

Linux上实现RabbitMQ消息路由的完整步骤

一 环境准备与安装

  • 准备:选择较新的发行版(如 Ubuntu 20.04/22.04CentOS 7/8RHEL 8),生产建议内存≥4GB、/var 预留≥10GB、内核≥3.10+。安装依赖与 Erlang(RabbitMQ 3.12+ 要求 Erlang 25.x)。
  • Ubuntu/Debian:
    • 安装 Erlang:
      curl -fsSL https://packages.erlang-solutions.com/erlang-solutions_2.0_all.deb -o erlang.deb
      sudo dpkg -i erlang.deb && sudo apt update && sudo apt install -y esl-erlang
    • 安装 RabbitMQ(使用官方仓库脚本):
      curl -1sLf ‘https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.deb.sh’ | sudo -E bash
      sudo apt install -y rabbitmq-server
  • CentOS/RHEL:
    • 安装 Erlang:
      yum install -y https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
      yum install -y esl-erlang
    • 安装 RabbitMQ(使用官方仓库脚本):
      curl -1sLf ‘https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.rpm.sh’ | sudo -E bash
      yum install -y rabbitmq-server
  • 启动与开机自启:
    sudo systemctl start rabbitmq-server && sudo systemctl enable rabbitmq-server
    sudo systemctl status rabbitmq-server(应显示 active (running))
  • 管理插件与访问:
    sudo rabbitmq-plugins enable rabbitmq_management
    访问 http://服务器IP:15672,默认账号 guest/guest(默认仅本地访问)。

二 核心概念与路由类型

  • 核心概念:消息先发送到 Exchange(交换机),通过 Binding(绑定)Queue(队列) 关联;投递依据 Routing Key(路由键) 与绑定规则匹配。
  • 路由类型与匹配规则:
    • Direct:精确匹配 Routing Key = Binding Key
    • Fanout:忽略路由键,广播到所有绑定队列。
    • Topic:按模式匹配路由键,支持通配符 “*”(一个单词)“#”(零个或多个单词)
    • Headers:按消息头部属性匹配(较少使用)。

三 命令行快速配置路由

  • 创建 vhost 与用户并赋权(示例:vhost=/myvhost,用户=admin):
    sudo rabbitmqctl add_vhost /myvhost
    sudo rabbitmqctl add_user admin StrongPass!
    sudo rabbitmqctl set_permissions -p /myvhost admin “." ".” “.*”
    sudo rabbitmqctl set_user_tags admin administrator
  • 声明交换机、队列与绑定(示例:交换机 logs,队列 q.infoq.error,路由键 infoerror):
    sudo rabbitmqadmin declare exchange name=logs type=direct durable=true
    sudo rabbitmqadmin declare queue name=q.info durable=true
    sudo rabbitmqadmin declare queue name=q.error durable=true
    sudo rabbitmqadmin declare binding source=logs destination=q.info routing_key=info
    sudo rabbitmqadmin declare binding source=logs destination=q.error routing_key=error
  • 说明:也可在管理界面(15672)完成上述对象与绑定创建。

四 代码示例 Python实现直连与主题路由

  • 直连路由 Direct(按级别投递到不同队列)
    • 生产者 send_direct.py: import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’))) ch = conn.channel() ch.exchange_declare(exchange=‘logs’, exchange_type=‘direct’, durable=True) for level in (‘info’, ‘error’, ‘debug’): ch.basic_publish(exchange=‘logs’, routing_key=level, body=f’[{level}] Hello Direct’) conn.close()
    • 消费者 recv_direct.py(消费 error 队列): import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’))) ch = conn.channel() ch.exchange_declare(exchange=‘logs’, exchange_type=‘direct’, durable=True) q = ch.queue_declare(queue=‘q.error’, durable=True) ch.queue_bind(exchange=‘logs’, queue=q.method.queue, routing_key=‘error’) def cb(ch, method, props, body): print(f" [x] {method.routing_key} => {body.decode()}") ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_consume(queue=q.method.queue, on_message_callback=cb, auto_ack=False) print(’ [*] Waiting for error logs…') ch.start_consuming()
  • 主题路由 Topic(按模式匹配)
    • 生产者 send_topic.py: import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’))) ch = conn.channel() ch.exchange_declare(exchange=‘topic_logs’, exchange_type=‘topic’, durable=True) keys = [‘app.error’, ‘app.warning’, ‘audit.info’, ‘audit.security’] for k in keys: ch.basic_publish(exchange=‘topic_logs’, routing_key=k, body=f’[{k}] Topic message’) conn.close()
    • 消费者 recv_topic.py(接收 app.* 与 audit.#): import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘admin’))) ch = conn.channel() ch.exchange_declare(exchange=‘topic_logs’, exchange_type=‘topic’, durable=True) q = ch.queue_declare(queue=‘’, exclusive=True) for b in (‘app.‘, ‘audit.#’): ch.queue_bind(exchange=‘topic_logs’, queue=q.method.queue, routing_key=b) def cb(ch, method, props, body): print(f" [x] {method.routing_key} => {body.decode()}") ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_consume(queue=q.method.queue, on_message_callback=cb, auto_ack=False) print(’ [] Waiting for topic logs…’) ch.start_consuming()
  • 运行要点:先启动消费者,再运行生产者;Topic 中 “*” 匹配一个单词、“#” 匹配零个或多个单词。

五 生产环境与安全加固

  • 远程访问与账户:默认 guest/guest 仅本地可用;创建专用管理员并赋权,必要时在配置中调整 loopback_users(示例见下)。
  • 配置文件与目录(示例):
    • /etc/rabbitmq/rabbitmq.config(允许远程管理员登录):
      [
      {rabbit, [
      {loopback_users, []}
      ]}
      ].
    • /etc/rabbitmq/rabbitmq-env.conf(数据与日志目录):
      RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia
      RABBITMQ_LOG_BASE=/data/rabbitmq/log
    • 目录与权限:
      mkdir -pv /data/rabbitmq/{mnesia,log}
      chown -R rabbitmq:rabbitmq /data/rabbitmq
  • 防火墙:开放 5672(AMQP)、15672(管理界面)
    • Ubuntu/Debian:sudo ufw allow 5672/tcp && sudo ufw allow 15672/tcp
  • 资源与高可用:
    • 资源阈值(/etc/rabbitmq/rabbitmq.conf):
      vm_memory_high_watermark.relative = 0.6
      disk_free_limit.absolute = 2GB
    • 建议:开启镜像队列/仲裁队列、监控磁盘与内存、按业务设置队列与消息 TTL、开启确认机制与重试策略。

0