RabbitMQ消息丢失的排查与解决方案
一、先定位丢失环节
二、生产者端确保不丢
spring.rabbitmq.publisher-confirms=true、spring.rabbitmq.publisher-returns=true、spring.rabbitmq.publisher-confirm-type=correlated、rabbitTemplate.setMandatory(true)。new CorrelationData(UUID);实现 ConfirmCallback 与 ReturnCallback 处理 ack/nack 与 return。三、Broker端确保不丢
durable=true;durable=true;deliveryMode=2(如 MessageProperties.PERSISTENT_TEXT_PLAIN)。rabbitmqctl set_vm_memory_high_watermark 0.6,避免因内存不足触发策略性丢弃。四、消费者端确保不丢
basicAck(deliveryTag, false),未 ack 的消息会继续被其他消费者处理。basicNack/reject 进入 DLQ,避免无限循环。spring.rabbitmq.listener.simple.acknowledge-mode=manual;spring.rabbitmq.listener.simple.retry.enabled=true、max-attempts、initial-interval,以及 default-requeue-rejected=false(配合 DLQ 使用更安全)。五、关键配置与代码清单
spring:
rabbitmq:
addresses: localhost:5672
username: guest
password: guest
publisher-confirm-type: correlated
publisher-confirms: true
publisher-returns: true
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true
max-attempts: 3
initial-interval: 3000ms
default-requeue-rejected: false
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) log.warn("消息未确认,correlationId={}, cause={}", correlationData.getId(), cause);
// 失败入补偿库/重发队列
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("消息无法路由:exchange={}, routingKey={}, code={}, text={}", exchange, routingKey, replyCode, replyText);
// 记录并补偿投递
});
rabbitTemplate.setMandatory(true);
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("ex.direct", "rk", "payload", cd);
@RabbitListener(queues = "q.biz")
public void handle(Message msg, Channel ch) throws IOException {
try {
// 1) 业务处理
// 2) 成功后确认
ch.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 重试已达上限或不可重试:进入DLQ,不再重回原队列
ch.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
}
}
channel.exchangeDeclare("ex.direct", "direct", true);
channel.queueDeclare("q.biz", true, false, false, null);
channel.queueBind("q.biz", "ex.direct", "rk");
channel.basicPublish("ex.direct", "rk",
MessageProperties.PERSISTENT_TEXT_PLAIN,
"important".getBytes(StandardCharsets.UTF_8));
以上配置与代码覆盖了发送确认、退回回调、手动确认、重试与 DLQ 的关键落地路径。