RabbitMQ在Ubuntu上的消息确认机制主要包括以下几个方面:
设置手动确认模式:
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
其中,false 表示关闭自动确认。
处理消息并发送确认:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 处理消息
processMessage(message);
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 处理异常,可以选择拒绝消息并重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
基本确认(Basic Acknowledgment):
basicAck 方法确认单个消息。基本拒绝(Basic Nack):
basicNack 方法拒绝消息。基本拒绝并重新排队(Basic Reject):
basicNack,但总是重新排队被拒绝的消息。以下是一个简单的Java示例,展示了如何在RabbitMQ中使用手动确认机制:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQManualAckExample {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
通过上述步骤和示例代码,你可以在Ubuntu上配置和使用RabbitMQ的消息确认机制,确保消息的可靠传递和处理。