本篇内容介绍了“RocketMQ普通消息同步发送怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息。这个方式可以确保消息发送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。
public static void main(String[] args) throws Exception { //实例化消息生产者对象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //设置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //启动Producer实例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8)); //同步发送方式 SendResult send = producer.send(msg); //确认返回 System.out.println(send); } //关闭producer producer.shutdown(); }
异步消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。
public static void main(String[] args) throws Exception { //实例化消息生产者对象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //设置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //启动Producer实例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8)); //SendCallback会接收异步返回结果的回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } //若是过早关闭producer,会抛出The producer service state not OK, SHUTDOWN_ALREADY的错 Thread.sleep(10000); //关闭producer producer.shutdown(); }
单项发送不关心发送的结果,只发送请求不等待应答。发送消息耗时极短。
public static void main(String[] args) throws Exception { //实例化消息生产者对象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //设置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //启动Producer实例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8)); //同步发送方式 producer.sendOneway(msg); } //关闭producer producer.shutdown(); }
消费者采用负载均衡的方式消费消息,同一个Group下的多个Consumer共同消费Queue里的Message,每个Consumer处理的消息不同。
一个Consumer Group中的各个Consumer实例分共同消费消息,即一条消息只会投递到一个Group下面的一个实例,并且只消费一遍。
例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。
public static void main(String[] args) throws Exception { //实例化消息消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //订阅topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.CLUSTERING); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); }
广播消费模式中把消息对一个Group下的各个Consumer实例都投递一遍。也就是说消息也会被 Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
public static void main(String[] args) throws Exception { //实例化消息消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //订阅topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.BROADCASTING); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); }
“RocketMQ普通消息同步发送怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。