RabbitMQ Linux客户端使用指南
在使用Linux客户端前,需先确保Linux系统已安装并运行RabbitMQ服务端。以下是通用安装步骤(以Ubuntu为例):
sudo apt update
sudo apt install -y erlang
sudo apt install -y rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo systemctl status rabbitmq-server # 查看状态(显示"active (running)"即为正常)
以上步骤确保Linux系统具备RabbitMQ服务端能力,可作为客户端连接自身服务,或供其他机器的客户端连接。
Linux环境下,RabbitMQ客户端主要通过命令行工具(如rabbitmqadmin)或编程语言客户端库(如Python的pika、Java的amqp-client)实现。以下是常用工具的安装方法:
rabbitmqadmin是RabbitMQ官方提供的命令行工具,用于管理队列、交换机、发送/接收消息等。
# 下载rabbitmqadmin(替换为你的RabbitMQ服务器IP)
wget http://<服务器IP>:15672/cli/rabbitmqadmin
# 添加执行权限
chmod +x rabbitmqadmin
# 移动到系统路径(可选)
sudo mv rabbitmqadmin /usr/local/bin/
使用前需配置认证信息(默认用户guest,密码guest,仅限本地访问):
./rabbitmqadmin -u admin -p admin123 list queues # 替换为你的用户名密码
以Python为例,使用pika库实现消息收发:
pip install pika # 安装pika库
Java项目需在pom.xml中添加依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version> <!-- 使用最新版本 -->
</dependency>
使用rabbitmqadmin向指定队列发送消息:
./rabbitmqadmin -u admin -p admin123 publish exchange= amqp.default routing_key=test_queue payload="Hello, RabbitMQ!"
exchange=:使用默认交换机(空字符串)。routing_key=test_queue:目标队列名称。payload:消息内容。使用rabbitmqadmin监听队列并接收消息:
./rabbitmqadmin -u admin -p admin123 get queue=test_queue no_ack=true
no_ack=true:表示无需手动确认消息(简化示例,生产环境建议设置为false)。./rabbitmqadmin -u admin -p admin123 declare queue name=test_queue durable=true
durable=true:队列持久化(服务器重启后仍存在)。./rabbitmqadmin -u admin -p admin123 delete queue name=test_queue
./rabbitmqadmin -u admin -p admin123 list queues
以上命令适用于快速测试,生产环境建议使用编程语言客户端实现更复杂的逻辑(如消息确认、持久化、错误处理)。
以下是Python客户端完整的“生产者-消费者”模型代码:
import pika
# 连接RabbitMQ服务器(替换为你的服务器IP、用户名、密码)
credentials = pika.PlainCredentials('admin', 'admin123')
parameters = pika.ConnectionParameters('192.168.1.100', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 声明队列(确保存在)
channel.queue_declare(queue='test_queue', durable=True)
# 发送消息
message = "Hello from Python producer!"
channel.basic_publish(
exchange='',
routing_key='test_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化(1为非持久化,2为持久化)
)
)
print(f" [x] Sent '{message}'")
# 关闭连接
connection.close()
import pika
# 连接RabbitMQ服务器
credentials = pika.PlainCredentials('admin', 'admin123')
parameters = pika.ConnectionParameters('192.168.1.100', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 声明队列(确保存在)
channel.queue_declare(queue='test_queue', durable=True)
# 定义消息处理回调函数
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 手动确认消息(避免消息丢失)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 消费消息(自动应答设置为False)
channel.basic_consume(
queue='test_queue',
on_message_callback=callback,
auto_ack=False # 关键:手动确认消息
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 进入阻塞监听状态
关键说明:
delivery_mode=2(消息持久化)、durable=True(队列持久化)确保服务器重启后消息不丢失。auto_ack=False配合basic_ack避免消息未处理完就被删除(生产环境必用)。try-except捕获连接异常、通道异常等。sudo systemctl status rabbitmq-server。sudo firewall-cmd --list-ports(确保包含5672、15672)。guest用户仅限本地访问,远程连接需创建新用户并授权。queue_declare名称需相同)。sudo tail -f /var/log/rabbitmq/rabbitmq.log(定位具体错误)。