温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》
  • 首页 > 
  • 教程 > 
  • 开发技术 > 
  • mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理

mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理

发布时间:2020-07-28 07:41:51 来源:网络 阅读:3173 作者:xingyuntian 栏目:开发技术

本文讲解moquette对UNSUBSCRIBE和DISCONNECT的处理

先说UNSUBSCRIBE,代码比较简单

    public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) {
    List<String> topics = msg.payload().topics();
    String clientID = NettyUtils.clientID(channel);

    LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, topics);

    ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
    for (String t : topics) {
        Topic topic = new Topic(t);
        boolean validTopic = topic.isValid();
        if (!validTopic) {
            // close the connection, not valid topicFilter is a protocol violation
            channel.close();
            LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", clientID, topics, topic);
            return;
        }
        if(LOG.isDebugEnabled()){
            LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic);
        }
        subscriptions.removeSubscription(topic, clientID);
        clientSession.unsubscribeFrom(topic);
        String username = NettyUtils.userName(channel);
        m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username);
    }

    // ack the client
    int messageID = msg.variableHeader().messageId();
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0);
    MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));

    LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", clientID, topics, messageID);
    channel.writeAndFlush(ackMessage);
}

主要分为以下几步
1.从目录树下,移除该client的订阅,这个移除过程有点复杂,后面单独一篇专门讲解topic树
2.清除ClientSession里面的订阅,包括Set<Subscription> subscriptions,同时还得移除ISubscriptionsStore里面的Map<Topic, Subscription> subscriptions
3.唤醒拦截器
4.返回UNSUBACK ,这里注意UNSUBACK 是没有payload的。

再说DISCONNECT的处理

public void processDisconnect(Channel channel) throws InterruptedException {
    final String clientID = NettyUtils.clientID(channel);
    LOG.info("Processing DISCONNECT message. CId={}", clientID);
    channel.flush();
    final ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID);
    if (existingDescriptor == null) {
        // another client with same ID removed the descriptor, we must exit
        channel.close();
        return;
    }

    if (existingDescriptor.doesNotUseChannel(channel)) {
        // another client saved it's descriptor, exit
        LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!removeSubscriptions(existingDescriptor, clientID)) {
        LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!dropStoredMessages(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!existingDescriptor.close()) {
        LOG.info("The connection has been closed. CId={}", clientID);
        return;
    }

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    LOG.info("The DISCONNECT message has been processed. CId={}", clientID);
}

1.检查连接描述符是否还存在,如果不存在,说明之前已经有客户端删除了它,直接关闭通道
2.判断这个client的连接描述符是不是,是不是还是当前使用这个通道的client?作者要先防止这种情况呢?先卖个关子,后面的第6条会说明
3.清除订阅请求,这里面好像只清楚了不要求保存会话信息的clientsession里面的ISessionsStore里面的Map<Topic, Subscription> subscriptions,而并没有清除ClientSession里面的Set<Subscription> subscriptions和topic树里面的订阅,这能够解释https://blog.51cto.com/13579730/2073914 这篇文章结尾讨论的问题了,只有Map<Topic, Subscription> subscriptions的订阅才是最准确的。
4.丢弃存储的消息,这里面也只是会丢弃不要去保存会话信息的消息
5.清除遗愿消息,对于遗愿消息,这里稍微啰嗦一点,遗愿消息是在初次连接的存储到ProtocolProcessor的ConcurrentMap<String, WillMessage> m_willStore这里面的,那么什么时候发送给订阅者呢?看下面

    io.moquette.server.netty.NettyMQTTHandler#channelInactive
    @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    String clientID = NettyUtils.clientID(ctx.channel());
    if (clientID != null && !clientID.isEmpty()) {
        LOG.info("Notifying connection lost event. MqttClientId = {}.", clientID);
        m_processor.processConnectionLost(clientID, ctx.channel());
    }
    ctx.close();
}
    说明是当netty检测到通道不活跃的时候通知ProtocolProcessor处理ConnectionLost事件的。
    public void processConnectionLost(String clientID, Channel channel) {
    LOG.info("Processing connection lost event. CId={}", clientID);
    ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
    connectionDescriptors.removeConnection(oldConnDescr);//移除连接描述符
    // publish the Will message (if any) for the clientID
    if (m_willStore.containsKey(clientID)) {
        WillMessage will = m_willStore.get(clientID);
        forwardPublishWill(will, clientID);//发布遗愿消息
        m_willStore.remove(clientID);//移除遗愿消息存储
    }

    String username = NettyUtils.userName(channel);
    m_interceptor.notifyClientConnectionLost(clientID, username);//唤醒拦截器
}
    在以下这种情况下会发布遗愿消息
    遗嘱消息发布的条件,包括但不限于:
    服务端检测到了一个I/O错误或者网络故障。
    客户端在保持连接(Keep Alive)的时间内未能通讯。
    客户端没有先发送DISCONNECT报文直接关闭了网络连接。
    由于协议错误服务端关闭了网络连接。

    另外说明一下,遗愿消息是可以设置消息等级的,而且可以被设置成retain消息

6.连接描述符集合里面清除该通道对应的连接描述符,这里有一点很容易误解,强调一下

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    作者调用的是ConcurrentMap里面的boolean remove(Object key, Object value);这个方法要求key存在,且value 与预期的一样才会删除,也就说,是有可能存在的,key一样而value不一样的情况的,什么时候会出现?答案是client在两个设备上先后登陆,这个时候由于是存在一个map里面的所以后面的登陆所创建的连接描述符会覆盖前面的一个。当然这里面,也可以在覆盖之前强制断开之前那个连接,但是moquette并没有这么做,具体看源码io.moquette.server.ConnectionDescriptorStore#addConnection

也就说说moquette是允许存在一个账号多设备登陆的。将入client先后在A,B两个设备上建立连接,B连接会覆盖A连接,这个时候A连接虽然还在,但其实是永远也收不到消息的,因为发送消息的时候,会以ConnectionDescriptorStore里面存储的为准,具体看源码
io.moquette.server.ConnectionDescriptorStore#sendMessage,也就是说A连接会无谓的占用broker的资源,个人觉得这样并不好,也非常没有必要,大家可以自行改进。
现在大家就能够理解上面的第2步了,因为这个就是为双登陆的情况下,被覆盖的那个连接准备的。

moquette-broker还要处理以下的报文,包括
1.PINGREQ,心跳报文 
2.PUBACK,当broker向client发送qos1消息的时候,client需要回复PUBACK消息,消息存储在
io.moquette.spi.ClientSession.OutboundFlightZone outboundFlightZone里面(底层使用map存储的),
消息是io.moquette.spi.impl.MessagesPublisher#publish3Subscribers(io.moquette.spi.IMessagesStore.StoredMessage, io.moquette.spi.impl.subscriptions.Topic)
这里被存储进去的,这是一个临时的存储,存储完之后消息会被删除掉
3.PUBREC 这个是当broker向client发送qos2消息之后,client需要向broker作的第一个返回报文,
这里面有个动作是将消息从inboundFlightMessages转移到secondPhaseStore和outboundFlightMessages,具体看这
里io.moquette.persistence.memory.MemorySessionStore#moveInFlightToSecondPhaseAckWaiting
4.PUBCOMP 当broker收到这个报文的时候会负责从内存里面删除飞行窗口的消息,具体怎么删除的详见下篇,moquette拦截器
5.PUBREL。当client向broker发送qos2消息的时候,broker会回复PUBREC,告诉client已经记录下来了,
client收到PUBREC之后会发送PUBREL,告诉broker,我知道你已经记录了消息,既然你记录了,那这边
就释放消息了(确保只要broker才会该消息,避免client重发),当broker收到PUBREL报文的时候,就知道
client那边已经把该消息释放了,然后消息的主导权到了他这边,他开始发送消息。当消息发送完成了,
会向client发送PUBCOMP报文。
关于qos2消息的介绍可以看一下这里https://github.com/mcxiaoke/mqtt/blob/master/mqtt/04-OperationalBehavior.md 的4.3条
向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI