温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

RocketMQ基本概念及原理是什么

发布时间:2021-12-03 17:35:47 来源:亿速云 阅读:127 作者:柒染 栏目:云计算

这篇文章将为大家详细讲解有关RocketMQ基本概念及原理是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

RocketMQ使用
基本概念
ProducerGroup
    通常具有同样属性(处理的消息种类-topic、以及消息处理逻辑流程—分布式多个客户端)的一些producer可以归为同一个group。在事务消息机制中,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查 同一个group的其他producer,确认这条消息应该commit还是rollback。
ConsumerGroup
    具有同样逻辑消费同样消息的consumer,可以归并为一个group。同一个group内的消费者,可以共同消费(CLUSTERING)对应topic的消息,达到分布式并行处理的功能。
Topoic
    消息的逻辑管理单位。
Queue
    消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息存储可以分布式集群化,具有了水平扩展的能力。
消费进度管理
    RocketMQ的broker端,不负责推送消息,无论消费者是否消费消息,都将消息存储起来。谁要消费消息,就向broker发请求获取消息,消费记录由consumer来维护。RocketMQ提供了两种存储方式来保留消费记录:一种是保留在consumer所在的服务器上;另一种是保存在broker服务器上。用户还可以自己实现相应的消费进度存储接口。
    默认情况下,采用集群消费(CLUSTERING),会将记录保存在broker端;而采用广播消费(BROADCASTING)则会将消费记录保存在本地。
顺序消息
    用户实现MessageQueueSelector为某一批消息(通常是有同样的唯一的标示ID),选择同一个Queue,则这一批消息的消费将是顺序消费(并由同一个consumer完成消费)。
事务消息
    这样的消息有多个状态,并且其发送是两阶段的。第一个阶段发送PREPARED状态的消息,此时consumer是看不见这种状态的消息的,发送完毕后回调用户的TransactionExecutor接口,执行相应的事务操作(如数据库),当事务操作成功时,则对此条消息返回commit,让broker对该消息执行commit操作,成为commit状态的消息对consumer是可见的。

基本原理
    总览
RocketMQ以Topic来管理不同应用的消息。对于生产者而言,发送消息是,需要指定消息的Topic,对于消费者而言,在启动后,需要订阅相应的Topic,然后可以消费相应的消息。Topic是逻辑上的概念,在物理实现上,一个Topic由多个Queue组成,采用多个Queue的好处是可以将Broker存储分布式化,提高系统性能。
    RocketMQ中,producer将消息发送给Broker时,需要制定发送到哪一个队列中,默认情况下,producer会轮询的将消息发送到每个队列中(所有broker下的Queue合并成一个List去轮询)。
    对于consumer而言,会为每个consumer分配固定的队列(如果队列总数没有发生变化),consumer从固定的队列中去拉取没有消费的消息进行处理。
 
Producer
Producer端(属于client)的逻辑概述:
 
producer端的逻辑都比较简单,将消息发送到某个Queue中即可,具体发送到那个Queue可以由用户控制(MessageQueueSelector接口),默认情况下,将轮询方式选择Queue。在producer端,会从NameServer将所有Broker的Topic及对应的Queue信息(即:TopicRoute信息)拉取到本地,然后根据<brokerName, queueId>组建成一个List。因此在MessageQueueSelector,可以看到所有的Queue信息。
    RocketMQ将topic的消息以多个Queue来管理,使得其较为容易的就可以进行水平扩展,提供系统吞吐力。这样分布带来的问题,就是从全局上不能做到顺序性(很多时候也并不需要全局上的顺序性)。
RocketMQ提到支持顺序消息,实际上是指基于Queue级别的顺序。用户将某些需要满足顺序的一批消息(比如电商某个订单号的一系列后续操作、比如数据库的某个主键的insert、delete、update等操作)发送到固定的某个Queue中,则从这个Queue消费消息的consumer,针对这一批消息是顺序消费。
问题1:针对顺序消息的队列,是否可以做到不停服务下的集群动态扩展?
Consumer
    consumer逻辑稍微复杂一点。初步思考,consumer端至少需要处理:
1、    消息的获取
2、    offset(消费进度)的管理与存储
3、    集群消费模式下,Queue的分配问题(rebalance)
RocketMQ对外提供了两种不同形式的Consumer:PushConsumer和PullConsumer。顾名思义,对于PullConsumer而言,用户需要主动调用相应的接口去拉取未消费的消息。对于PushConsumer而言,用户提供消息处理的CallBack,有未曾消费的消息时,会主动回调这个CallBack来处理消息。虽从用户角度而言,Consumer存在主动(pull)和被动(push),但RocketMQ本身的broker端仅仅保存所有的消息,并不负责push消息,因此PushConsumer的底层实现也是有一个长连接主动去broker上拉取未消费的消息,然后回调用户的callback逻辑。

    PushConsumer端逻辑概述:
        自己使用PushConsumer的代码非常简单:
        
1
2
3
4
5
6
7
8
9
10
11    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“groupName”);
consumer.subscribe(“TopicName”, “*”); // a | b | c
consumer.registerMessaeListener( new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
                                  ConsumeConcurrentlyContext context) {
  System.out.println(“Consume Message Num: ” + msgs.size());
   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
// add shutdown hook to execute consumer.shutdown();
第2行的subscribe即定于某个topic下,符合某些标签(tag)的消息。这个过滤会在服务端过滤(其实在consumer端也有过滤逻辑)。tag之间用”|”分割开。这些tag会被解析成SubscriptionData来保存信息,主要存储tag的字符串集合,以及这些tag对应的hashcode集合(在broker端的存储和过滤其实都是tag值对应的hashcode,可能是为了加速过滤以及节约存储空间)。
    主要的逻辑在第10行调用start函数之后开始。Consumer的主要实现是DefaultMQPushConsumerImpl,其包含的对象关系简单如下图:
 
DefaultMQPushConsumerImpl中各个对象的主要功能如下:
RebalancePushImpl:    主要负责决定,当前的consumer应该从哪些Queue中消费消息;
PullAPIWrapper:        长连接,负责从broker处拉取消息,然后利用ConsumeMessageService回调用户的Listener执行消息消费逻辑;
ConsumeMessageService:    实现所谓的“Push-被动”消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息;
OffsetStore:            维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;
MQClientFactory:    大杂烩,负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成;


使用
Producer返回值
发送消息时,只有抛出异常,才是发送失败,其他情况下,根据如下返回值,应用层做相应取舍处理逻辑:
SendStatus
返回值    解释
SEND_OK    发送成功
FLUSH_DISK_TIMEOUT    发送成功,但broker刷盘失败,此时如若服务器宕机,消息会丢失;
FLUSH_SLAVE_TIMEOUT    写从失败;如果主宕机,消息丢失;
SLAVE_NOT_AVAILABLE    从不可用;
注意:当配置多master无slave的集群时,若master的brokerRole为SYNC_MASTER,则发送消息会一直返回这个值;最新版本(3.1.14以上)事务消息将一直发送失败(事务消息中处理了返回值不为SEND_OK,则直接进行ROLL_BACK);
当应用方明确指出,producer发送成功为SEND_OK状态的消息对consumer才是可见的。可以采用事务消息来完成这个功能,RocketMQ 从3.0.14版本开始,对于事务消息,开始检查SendStatus,如果不为SEND_OK,则直接执行事务消息的回滚。
Consumer返回值
当使用PushConsumer(使用callBack回调执行应用消费逻辑)
非顺序消息(ConsumeConcurrentlyStatus)
返回值    解释
CONSUME_SUCCESS    消费成功
RECONSUME_LATER    消费失败,稍后重新消费这一批消息
RECONSUMER_LATER的解释:
    这一批消息均会sendBack到broker上,稍后会重新消费这一批消息。可以通过设置参数,使得批量消费的“批量”为一条,这样可以一定程度避免重复消费。但这样设置后,可能效率较低。另外一种方法是在用户指定的CallBack(MessageListenerConcurrently)中,通过对应的ConsumeConcurrentlyContext参数来控制本批消息从哪一条之后重复消费。
具体方法是控制context的ackIndex变量。这个变量的意思是对于这一批消息(List<MessageExt>),[0, ackIndex]内的消息是成功消费的,而(ackIndex, Lst.size)内的消息是消费失败的,如果返回值为RECONSUME_LATER,则对于失败范围的消息将调用sendBack回发到broker上(从代码看来,这个功能只对CLUSTERING消费模型的consumer生效,BROADCASTING的直接丢弃)。这里还有个小的tips,在调用SendBack失败后,会在consumer本地去尝试重复消费这些回发失败的消息(构造相应的ConsumeRequest)。这个处理模式(先消费,消费失败的消息尝试回发给broker,回发给broker失败的消息尝试在consumer端重新消费)一直尝试,直到消费成功或者回发到broker成功。
顺序消息(ConsumerOrderlyStatus)
返回值    解释
SUCCESS    消息处理成功
ROLLBACK    回滚消息—似乎用在了事务消息中
COMMIT    提交消息—似乎用在了事务消息中
SUSPEND_CURRENT_QUEUE_A_MOMENT    当前队列挂起一段时间
问题:消费端的ROLLBACK、COMMIT如何理解???
普通消息
    
使用TIPS
集群搭建
1.    基本配置
使用./bin/mqbroker –p >conf/broker.conf,查看所有参数的默认取值;根据自己集群的需要修改对应的配置;
2.    集群选择
RocketMQ集群支持如下一些模式的配置:
集群模式    特点    适用场景
单Master    一个Broker实例;
或者多个Broker实例,但Topic只在某一个Broker上配置了;    测试
多Master 无Slave    多个Broker实例构成集群,且brokerID均为0(即角色都为Master)    
master挂掉后,这个master上未被消费的消息,暂时不能被消费;
可以容忍消息丢失(未被consumer消费)的应用场景;如日志收集

多Master多Slave    每个master有一个backup的slave    HA高可用;当master-slave采用同步双写时(采用异步时,任可能存在部分未写入slave的消息丢失),master挂掉,也可以从slave处消费消息;但当master挂掉后,目前不支持自动的Failover(因此不支持producer的写);

疑问: 手动如何切换?是否需要修改Slave的配置为Master,然后重启broker实例?代价有点高,支持发送命令切换否?

当使用多master无slave的集群搭建方式时,master的brokerRole配置必须为ASYNC_MASTER。如果配置为SYNC_MASTER,则producer发送消息时,返回值的SendStatus会一直是SLAVE_NOT_AVAILABLE。
3.    系统参数优化
参考bin/os.sh中的参数,重点注意物理内存预留参数(vm.min_free_kbytes)。
4.    broker的启动
采用集群模式时,启动broker时,需要制定nameserver 地址的list,这里一定要注意,要将所有的nameserver 地址都包含进去;因为rocketmq的nameserver之间并不会同步,均需要broker主动汇报;如果有3个nameserver: A B C,启动时只制定了A,忘记制定了B C,那么客户端如果刚好链接了B或C去获取broker的信息,则会获取失败。
消息使用
    
问题整理
问题:
1、服务器磁盘配置大概是怎样(重点想了解磁盘配置、磁盘总容量,例如: N * 4T SATA 7200 )?
2、服务器的磁盘做raid了吗?做的哪一种raid?刷磁盘模式是SYNC还是ASYNC?
3、rocketmq采用的哪种方式搭建(多master无slave、多master多slave)?如果是master-slave方式,主从同步是SYNC还是ASYNC方式?

解答:
1、一般是3T磁盘,实际是12个600,12* 600G的sas 15000转的磁盘做了raid10
2、刷盘模式通常是异步方式
3、大部分集群是不开启slave的, 有少部分集群会开启 sync方式的slave 

关于RocketMQ基本概念及原理是什么就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI