在 Linux 上配置与验证 RabbitMQ 消息确认机制
一 核心概念与适用场景
二 在 Linux 上的操作步骤
sudo apt update && sudo apt install rabbitmq-serversudo systemctl start rabbitmq-serversudo systemctl status rabbitmq-serversudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmqctl eval 'rabbit_channel:call(rabbit_connection:info(self(), [pid]), list_to_binary("OK")).'(示例仅用于连通性验证;实际 confirm 需在客户端代码中开启)sudo rabbitmqctl declare queue name=test_queue durable=truesudo rabbitmqctl list_queues name durable auto_delete argumentssudo rabbitmqctl set_policy ha-all "^myQueue$" '{"ha-mode":"all","ha-sync-mode":"automatic"}'三 生产者侧确认与持久化示例 Python pika
import pika, time, uuid
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
# 1) 开启发布确认
ch.confirm_delivery()
# 2) 队列与消息持久化
ch.queue_declare(queue='task_queue', durable=True)
# 3) 发送并关联唯一ID(CorrelationId)
corr_id = str(uuid.uuid4())
ch.basic_publish(
exchange='',
routing_key='task_queue',
body=f'Hello-{corr_id}'.encode(),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
correlation_id=corr_id # 便于定位 ack/nack
)
)
print(f'Sent: {corr_id}')
# 4) 同步等待确认(演示用;生产可用异步回调)
try:
ch.wait_for_confirms(timeout=5)
print('Broker confirmed delivery')
except pika.exceptions.TimeoutError:
print('Confirm timeout')
conn.close()
confirm_delivery() 开启 Confirm;wait_for_confirms() 可同步等待;生产环境更常用异步监听 ack/nack。durable=True,消息需 delivery_mode=2,否则 Broker 重启可能丢消息。mandatory=True 并注册 add_on_return_callback。四 消费者侧手动确认与重试处理示例 Python pika
import pika, time
def on_message(ch, method, properties, body):
try:
# TODO: 业务处理
print(f'Processing: {body}')
# 模拟处理耗时
time.sleep(1)
# 5) 显式确认(处理成功再 ack)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f'Error: {e}, requeue...')
# 6) 处理失败:可选择 nack 并重新入队(或转入死信)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
# 3) 关闭自动确认,开启手动 ack
ch.queue_declare(queue='task_queue', durable=True)
ch.basic_consume(queue='task_queue', on_message_callback=on_message, auto_ack=False)
print('Waiting for messages. To exit press CTRL+C')
try:
ch.start_consuming()
except KeyboardInterrupt:
ch.stop_consuming()
conn.close()
basic_nack(requeue=True) 重试或转入 死信队列(DLQ) 做后续分析。五 常见问题与排查要点
delivery_mode=2;同时确认 Broker 确实完成磁盘写入(依赖 confirm 的 ack 时机)。