Linux上实现RabbitMQ消息路由的完整步骤
一 环境准备与安装
- 准备:选择较新的发行版(如 Ubuntu 20.04/22.04、CentOS 7/8、RHEL 8),生产建议内存≥4GB、/var 预留≥10GB、内核≥3.10+。安装依赖与 Erlang(RabbitMQ 3.12+ 要求 Erlang 25.x)。
- Ubuntu/Debian:
- 安装 Erlang:
curl -fsSL https://packages.erlang-solutions.com/erlang-solutions_2.0_all.deb -o erlang.deb
sudo dpkg -i erlang.deb && sudo apt update && sudo apt install -y esl-erlang
- 安装 RabbitMQ(使用官方仓库脚本):
curl -1sLf ‘https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.deb.sh’ | sudo -E bash
sudo apt install -y rabbitmq-server
- CentOS/RHEL:
- 安装 Erlang:
yum install -y https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
yum install -y esl-erlang
- 安装 RabbitMQ(使用官方仓库脚本):
curl -1sLf ‘https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.rpm.sh’ | sudo -E bash
yum install -y rabbitmq-server
- 启动与开机自启:
sudo systemctl start rabbitmq-server && sudo systemctl enable rabbitmq-server
sudo systemctl status rabbitmq-server(应显示 active (running))
- 管理插件与访问:
sudo rabbitmq-plugins enable rabbitmq_management
访问 http://服务器IP:15672,默认账号 guest/guest(默认仅本地访问)。
二 核心概念与路由类型
- 核心概念:消息先发送到 Exchange(交换机),通过 Binding(绑定) 与 Queue(队列) 关联;投递依据 Routing Key(路由键) 与绑定规则匹配。
- 路由类型与匹配规则:
- Direct:精确匹配 Routing Key = Binding Key。
- Fanout:忽略路由键,广播到所有绑定队列。
- Topic:按模式匹配路由键,支持通配符 “*”(一个单词) 与 “#”(零个或多个单词)。
- Headers:按消息头部属性匹配(较少使用)。
三 命令行快速配置路由
- 创建 vhost 与用户并赋权(示例:vhost=/myvhost,用户=admin):
sudo rabbitmqctl add_vhost /myvhost
sudo rabbitmqctl add_user admin StrongPass!
sudo rabbitmqctl set_permissions -p /myvhost admin “." ".” “.*”
sudo rabbitmqctl set_user_tags admin administrator
- 声明交换机、队列与绑定(示例:交换机 logs,队列 q.info、q.error,路由键 info、error):
sudo rabbitmqadmin declare exchange name=logs type=direct durable=true
sudo rabbitmqadmin declare queue name=q.info durable=true
sudo rabbitmqadmin declare queue name=q.error durable=true
sudo rabbitmqadmin declare binding source=logs destination=q.info routing_key=info
sudo rabbitmqadmin declare binding source=logs destination=q.error routing_key=error
- 说明:也可在管理界面(15672)完成上述对象与绑定创建。
四 代码示例 Python实现直连与主题路由
- 直连路由 Direct(按级别投递到不同队列)
- 生产者 send_direct.py:
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’)))
ch = conn.channel()
ch.exchange_declare(exchange=‘logs’, exchange_type=‘direct’, durable=True)
for level in (‘info’, ‘error’, ‘debug’):
ch.basic_publish(exchange=‘logs’, routing_key=level, body=f’[{level}] Hello Direct’)
conn.close()
- 消费者 recv_direct.py(消费 error 队列):
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’)))
ch = conn.channel()
ch.exchange_declare(exchange=‘logs’, exchange_type=‘direct’, durable=True)
q = ch.queue_declare(queue=‘q.error’, durable=True)
ch.queue_bind(exchange=‘logs’, queue=q.method.queue, routing_key=‘error’)
def cb(ch, method, props, body):
print(f" [x] {method.routing_key} => {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_consume(queue=q.method.queue, on_message_callback=cb, auto_ack=False)
print(’ [*] Waiting for error logs…')
ch.start_consuming()
- 主题路由 Topic(按模式匹配)
- 生产者 send_topic.py:
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘StrongPass!’)))
ch = conn.channel()
ch.exchange_declare(exchange=‘topic_logs’, exchange_type=‘topic’, durable=True)
keys = [‘app.error’, ‘app.warning’, ‘audit.info’, ‘audit.security’]
for k in keys:
ch.basic_publish(exchange=‘topic_logs’, routing_key=k, body=f’[{k}] Topic message’)
conn.close()
- 消费者 recv_topic.py(接收 app.* 与 audit.#):
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’, 5672, ‘/myvhost’, credentials=pika.PlainCredentials(‘admin’, ‘admin’)))
ch = conn.channel()
ch.exchange_declare(exchange=‘topic_logs’, exchange_type=‘topic’, durable=True)
q = ch.queue_declare(queue=‘’, exclusive=True)
for b in (‘app.‘, ‘audit.#’):
ch.queue_bind(exchange=‘topic_logs’, queue=q.method.queue, routing_key=b)
def cb(ch, method, props, body):
print(f" [x] {method.routing_key} => {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_consume(queue=q.method.queue, on_message_callback=cb, auto_ack=False)
print(’ [] Waiting for topic logs…’)
ch.start_consuming()
- 运行要点:先启动消费者,再运行生产者;Topic 中 “*” 匹配一个单词、“#” 匹配零个或多个单词。
五 生产环境与安全加固
- 远程访问与账户:默认 guest/guest 仅本地可用;创建专用管理员并赋权,必要时在配置中调整 loopback_users(示例见下)。
- 配置文件与目录(示例):
- /etc/rabbitmq/rabbitmq.config(允许远程管理员登录):
[
{rabbit, [
{loopback_users, []}
]}
].
- /etc/rabbitmq/rabbitmq-env.conf(数据与日志目录):
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia
RABBITMQ_LOG_BASE=/data/rabbitmq/log
- 目录与权限:
mkdir -pv /data/rabbitmq/{mnesia,log}
chown -R rabbitmq:rabbitmq /data/rabbitmq
- 防火墙:开放 5672(AMQP)、15672(管理界面)
- Ubuntu/Debian:sudo ufw allow 5672/tcp && sudo ufw allow 15672/tcp
- 资源与高可用:
- 资源阈值(/etc/rabbitmq/rabbitmq.conf):
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB
- 建议:开启镜像队列/仲裁队列、监控磁盘与内存、按业务设置队列与消息 TTL、开启确认机制与重试策略。