Ubuntu上配置RabbitMQ消息确认机制
一 环境准备与启用管理插件
二 消费者确认机制设置
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
try:
# TODO: 业务处理
print(f"处理消息: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 显式确认
except Exception:
# 处理失败:requeue=True 重回队列;False 丢弃或进入死信
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# 关闭自动确认
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
print('等待消息中...')
channel.start_consuming()
三 生产者确认与持久化
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.187.133"); // 服务器地址
f.setUsername("rabbit");
f.setPassword("rabbit");
try (Connection c = f.newConnection(); Channel ch = c.createChannel()) {
// 队列持久化
ch.queueDeclare("my_queue", true, false, false, null);
// 开启发布者确认
ch.confirmSelect();
String msg = "Hello, confirm!";
// 消息持久化
ch.basicPublish("", "my_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBytes(StandardCharsets.UTF_8));
// 同步等待确认(生产环境可按需批量/异步确认)
if (ch.waitForConfirms()) {
System.out.println("消息已确认(到达并持久化)");
}
}
}
}
四 常见问题与最佳实践