本文小编为大家详细介绍“RocketMQ消息发送流程源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“RocketMQ消息发送流程源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。
public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
这里看看这几个参数,
communicationMode 是通信模式,同步异步还是单向
sendCallback 是针对异步模式的,异步模式需要设置发送完成后的回调。
这里消息孩子进到第一个卡口,先要检查送孩子来的家长是否还能联系上,若是能联系到,就继续。要是联系不到,这孩子岂不是被抛弃了,不敢接不敢接,送到孤儿院吧。
然后需要检查消息孩子了,首先是检查孩子还在不在,别扔个衣服跑了。
然后看看孩子指定的这个topic,不能说我想去内个topic哈,必须是实实在在的名字。而且上头也规定了,这个topic的名字也不能太长,也不能包含特殊字符。已有的一些领导定过的也不能用哈。
接下来就是检查孩子的body了,之前说body就是孩子的技能,首先,技能为空,不行不行,啥都不会是不行的。再者太长也不行,你唱首歌两年,这没法玩。
检查message不为null
检查topic
topic不能为空
topic不能太长
不能包含特殊字符
检查话题的名字是否被系统已占用
检查body
检查是否为空
检查长度是否过长,最大为4MB 这样
下边我们看看sendDefaultImpl这个方法。给他拆成一段一段的看。
//校验生产者服务是ok的,可以联系到的 this.makeSureStateOK(); //校验消息的参数 Validators.checkMessage(msg, this.defaultMQProducer);
第一个检查,检查生产者服务是否是正常工作的,若是不正常工作,就抛出异常。
private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }
第二个检查,检查消息本身是否为空,检查topic,检查消息的body
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // 这里校验Topic的时候,校验了不能为空,长度和特殊字符 Validators.checkTopic(msg.getTopic()); //这里则校验了一些不允许使用的topic名字 Validators.isNotAllowedSendTopic(msg.getTopic()); // body不为空 if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } // body长度不为0 if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } // body 长度不能过长 if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
嗯,这里孩子终于通过了检查,服务人员开始带着他去找自己指定的topic区域,指定是自己指定,划分还是工作人员划分的。咱总得知道这个topic区域在哪吧。
先去缓存笔记里找,有没有这个区域的信息,若是没有这个topic,就新建一个,然后更新到缓存笔记里边。若有topic但是不知道在哪,就找name server
大脑去申请这个topic在哪的信息。
执行tryToFindTopicPublishInfo方法去获取Topic的路由信息,若是不存在就新建,若是有topic但是缓存中没有路由信息,则通过name server获取路由信息。
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //获取topic信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //不存在 if (null == topicPublishInfo || !topicPublishInfo.ok()) { //新建 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //修改topic的路由信息并更新到本地 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } //包含路由信息就直接返回 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //不包含路由信息则向name server申请,修改topic的路由信息并更新到本地 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
这就是计算消息孩子可以尝试去找地方坐几次,没坐上,欸,我又来了,没坐上,欸,我又来了。
这行代码就是计算重试次数的,根据communicationMode
传入的值,同步异步还是单向的来决定重试次数是几次。 很明显,若是同步的,就会尝试三次。若是异步的或者单向的就只发送一次。
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
我们之前说了,Broker类似于候船大厅,为了均分压力,每次都要进与上次不同的候船大厅。
执行selectOneMessageQueue
方法通过Queue将消息发送到与上次不同的一个Broker。也可以通过 sendLatencyFaultEnable判断是否启用延迟容错开关
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
这就是走过巷道坐到属于自己的座位上了
然后就通过sendKernelImpl
发送消息了,这是发送消息的核心方法。会准备通信层的入参,并将请求发送给通信层,内部实现是基于Netty
的。
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
读到这里,这篇“RocketMQ消息发送流程源码分析”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。