温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

rabbitMq中架构是怎么样的

发布时间:2021-12-24 09:21:25 来源:亿速云 阅读:93 作者:小新 栏目:大数据

这篇文章主要为大家展示了“rabbitMq中架构是怎么样的”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“rabbitMq中架构是怎么样的”这篇文章吧。

一:特点

优点:1、轻量,部署方便。 2、消息路由规则灵活。3、消息延迟在微秒级别。

缺点:1、性能相对较弱,单机每秒处理能力在 1W量级。 2、如果消息大量堆积,性能会极具下降。  3、有消息丢失的可能,虽然很低。

所以,如果系统量级不是很大,几万-几十万以下吞吐量,对性能也没有非常高的要求,选用rabbitmq还是很不错的, 使用方便灵活,路由丰富,延迟很低,可靠性也不错,即使有极小的数据丢失可能,也可以通过一些方法来追踪(后面会讲)。

二:逻辑架构

消息发送流程:

在交换器Exchange与队列Queue绑定时候,会指定一个绑定键bindingKey。生产者发送消息时,会在消息中指定一个路由键routingKey,消息到达Exchange后,对比bingdingKey和routingKey将消息路由到合适的Queue中,消费者可通过主动拉取或者监听的模式,消费队列中的消息。  (当然,并不是所有的消息都要经过交换器,下面会讲) rabbitMq中架构是怎么样的

三:工作流程

生产者发送消息:

  1. 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)

  2. 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等

  3. 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等

  4. 生产者通过 routingKey (路由Key)将交换器和队列绑定( binding )起来

  5. 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息

  6. 相应的交换器根据接收到的 routingKey 查找相匹配的队列。

  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。

  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

  9. 关闭信道。

  10. 关闭连接。

消费者接收消息:

  1. 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。

  2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及

做一些准备工作

  1. 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。

  2. 消费者确认( ack) 接收到的消息。

  3. RabbitMQ 从队列中删除相应己经被确认的消息。

  4. 关闭信道。

  5. 关闭连接。

 Connection 和Channel关系 :

生产者和消费者需要与broker(一个mq实例)建立tcp连接,也就是connection。 接着会在connection基础上创建一个AMQP信道channel,其实就是一个虚拟的连接,所有指令通过channel完成。

为什么要使用channel,因为多个channel可以复用同一connection来减少性能开销,便于管理(否则,如果有多个线程收发消息,就要频繁创建和销毁TCP连接)。  但是如果channel流量过大,就需要建立多个connection来分担。 rabbitMq中架构是怎么样的

四:交换器类型

前提说明: bingdingKey和routingKey就是一个字符串,以 "." 来分隔,如user.news

常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种

fanout: 此类交换器会把消息路由到所有与它绑定的队列中,才不管你是什么key。

direct : bingdingKey和routingKey一致的队列会收到消息。

topic:   direct的升级版,可以对bingdingKey使用通配符。通配符有两种: # 和 * ,其中 * 用于匹配一个单词,"#"用于匹配0到多个单词。 如:     bingdingKey为: user.# 。那么发送的消息中,routingKey第一个单词为user即可匹配。       bingdingKey为: user.*.name 。 那么routingKey第一个和第三个单词必须为 user 和 name,且只能是三个单词。

headers: headers这个交换器就神奇了,因为性能很差,一般不太用,但我们还是要了解下。 headers头交换器, 不需要bingdingKey和routingKey,而是依赖消息头中设置的属性值,是一个map。 在交换器和队列绑定时,也需要指定一个头属性,同样是一个map,  值得注意的是,绑定时候的map中需要put一个键值对:map.put("x-match","all") 或map.put("x-match","any"), 差别是: all: 消息中map的所有key和value,绑定的map中全部要能匹配上。 any:只要有一个键值对能匹配上就行。

其实headers和direct很相似,不过可以同时支持多个属性,类型也不只是字符串。

五:数据存储机制 RabbitMQ消息有两种类型: 持久化消息:在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。存内存是为了提高性能。 非持久化消息:一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。

RabbitMQ存储层包含两个部分:队列索引和消息存储 rabbitMq中架构是怎么样的

如上图:

   发送非持久化消息,会在index file维护索引,消息会存到内存的store file中,如果消息堆积内存不够,会把消息刷到磁盘的msg_store_transient文件(当然,索引的维护也会刷盘)。   

   发送持久化消息,先保存到磁盘msg_store_persistent,并在内存stroe file中存储提升性能。

队列索引 :rabbit_queue_index  (每个队列私有)         维护落盘消息信息,如存储地点(下面消息存储中,消息的具体位置),是否已被消费者接收,确认消费等。          文件形式: 使用后缀为.idx的顺序段文件,文件名从0开始,每个文件有N条记录(N = segment_entry_count参数的值,默认16384)。         每个index从磁盘读取消息时,至少要在内存中维护一个段文件。                为什么是至少呢?  因为,index所在的段文件都会加载到内存,另外,如果消息大小较小,也会直接存在索引文件中,所以可能加载的段文件中已经有需要的消息了。                 那么,多小的消息,会直接放到index文件中呢? 由参数 queue_index_embed_msgs_below 决定,默认4096B。 该值设置时候一定要谨慎再谨慎,哪怕设置再大一点点,内存都可能爆炸式增长。

消息存储:rabbit_msg_store    (一个虚拟机下所有队列共有)

        存储:键值对形式存储。         rabbit_msg_store分为持久化 msg_store_transient 和非持久 msg_store_persistent 两种,非持久化重启服务后丢失。          文件以.rdq后缀,文件名从0开始,超过file_size_limit 的大小后创建新文件,以追加方式写入消息。            存储消息时会在ETS(Erlang Term Storage)表记录 消息在文件中的位置映射和文件信息。

        读取:根据msg_id找到存储文件,如果存在且未被锁定,直接打开文件从指定位置读取,发送请求由rabbit_msg_store,让它自己来处理。

        删除:消息被消费后,不是直接删除,而是标识为垃圾数据,后续再删除。步骤如下:         1、删除ETS表中的记录,更新对应文件中的相关信息,如index文件中标识为以消费,store文件中此条信息标识为垃圾数据。         2、当一个文件中都是垃圾数据,即可删除该文件。   但是,什么时候触发回收删除呢?         3、当检测到逻辑上相邻的两个文件,他们的有效数据可以合并成一个文件时,并且,垃圾数据的大小超过总文件数据大小(至少3个文件)的n倍后(n是参数 GARBAGE_FACTORION的值,默认0.5),触发合并和垃圾回收。         4、合并时,先锁定两个文件,分别整理前后两个文件的有效数据,合并到前面那个文件中,删除后面的文件,最后更新ETS记录。 如图: rabbitMq中架构是怎么样的

队列结构:

        队列由rabbit_amqqueue_process和backing_queue这两部分组成。                rabbit_amqqueue_process 负责消息的接收,向消费者交付消息,处理消息的确认等。(处理逻辑需要调用接口)              backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口。         如果发送消息的目的队列为空,且该队列有消费者订阅,则直接发送给消费者,否则才需要发送到队列暂时缓存。

       队列有4种状态:依次为 a b g d       1. alpha:索引和消息内容都存内存,最耗内存也最快,很少消耗CPU       2. beta:索引存内存,消息内存存磁盘       3. gama:索引内存和磁盘都有,消息存磁盘       4. delta:索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作       解释:       消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发生变化。       持久化的消息,索引和内容都必须先保存在磁盘上,才会达到上面某种状态。 另外,第3种状态,持久化消息才会有。

      对于普通队列,没有设置优先级和镜像,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。 其中Q1、Q4只包含alpha状态的消息,Q2和Q3包含beta和gamma状态的消息,Delta只包含delta状态的消息。      常规情况消息发送到消费,按照Q1->Q2->Delta->Q3->Q4流动,经历内存到磁盘,磁盘又到内存的过程。 如下图:

rabbitMq中架构是怎么样的

但是,并不是每一条消息都是以上的过程,会根据当前负载而变化,如下图: 对于消息发送: 如果负载低内存充足,消息到来时,直接从Q1到Q4,不会经过磁盘(持久化消息肯定会走磁盘,但也是处于Q1->Q2->Q3->Q4) 如果负载增大,内存不够充足,新来的消息会经历Q1->Q2->Q3->Q4。 如果负载很大内存完全不足,新消息的索引和消息体完全存储到磁盘,即队列delta。

对于消息消费: 1、从Q4读取,如果有直接返回。 2、Q4为空,读取Q3,Q3为空返回队列为空。 3、Q3不为空,取出Q3的消息后,判断Q3和delta长度。       a、都为空,则认为Q2,delta,Q3,Q4全为空,将Q1的消息移动到Q4中。       b、Q3为空delta不为空,将delta消息转移到Q3中。       转移是按照索引文件分段读取,先读取第一段,判断此段消息个数是否=delta的长度,是则代表delta可以全部转移,此时把Q2和delta        中消息一并转移到Q3; 否则,只会转移delta中此次读取到的消息。

按以上步骤得到结论: 获取消息时候,如果Q3为空,则整个队列为空。 原因: 消息最终是从Q4出去的,Q4为空才会到Q3去取,Q3也为空,说明上一次取消息后,已经没有消息转移到Q3或者Q4了。 以上流程也能解释,为什么消息堆积会导致性能大幅度下降: 因为消息会写入到很深的队列中,如delta。 每次写入和读取都会经历磁盘io。

rabbitMq中架构是怎么样的

以上是“rabbitMq中架构是怎么样的”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI