温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

rabbitmq学习笔记

发布时间:2020-07-07 11:17:24 来源:网络 阅读:472 作者:xrb_jurgen 栏目:建站服务器

一、消息确认机制

rabbitmq在发送消息后立即从内存中删除消息,因此如果消费者处理消息耗时较长,在处理过程中消费者被kill,则处理中的消息、以及其他发往该消费者的消息都将丢失。


为了保证消息不丢失,rabbitmq支持消息确认机制,消费者可以发送ack告诉rabbitm指定消息已经收到并处理,因此rabbitmq可以删除该消息。


如果消费者死掉(channel关闭、connection关闭、或者TCP connection丢失),导致rabbitmq没有收到ack,rabbitmq将把消息重入队列。


不存在消息超时,这意味着处理一个消息非常长的时间也是ok的。


消息确认机制默认是开启的,通过在channel.basic_consume中设置no_ack=True关闭。


注意消费者在处理消息后,不要忘记调用channel.basic_ack进行消息确认,否则rabbitmq将不断消耗内存把消息重入队列。


二、队列/消息持久化

为了防止rabbitmq服务终止导致队列和消息丢失,需要将队列和消息标记为持久化的:

  1. 确保rabbitmq永远不丢失队列,需要将队列 声明为持久化的:


  2. channel.queue_declare(queue='task_queue', durable=True)
  3. 将消息声明为持久化的:

  4. channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          ))

注意:尽管已经很健壮了,但是仍然无法完全保证消息不会丢失,例如rabbitmq接收消息但是还没有保存到硬盘的情况。


三、exchange

简单的说,exchange的一端接收消息,另一端把消息放进队列。

在rabbitmq中生产者不会将请求直接发送给消费者,生产值只会把消息发给exchange,exchange收到消息后需要知道怎么做:添加到特定队列、添加到多个队列、还是丢弃。

exchange的类型包括direct,topic,headers,fanout


四、绑定

exchange和queue之间的联系被称为绑定(binding),可以简单的看:队列对于特定exchange上的消息感兴趣

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

此时'logs' exchange将添加消息到指定queue


绑定可以使用一个额外的routing_key参数,例如:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

对于fanout类型的exchange来说,routing_key参数是被忽略的


五、topic exchange

发往topic exchange的消息不能携带任意的routing_key,必须是以点隔开的一串字符,最大255个字节

binding key也必须是相同的形式,注意binding key有两个重要的特殊情况:

* 可以替代一个单词

#可以替代零个或多个单词

例如,如果binding key是*.orange.*,则可以匹配所有<one word>.orange.<one word>的key,但是如果key不是*.*.*的形式,例如orange,或者quick.orange.male.rabbit,则消息将被丢弃。

如果binding key是lazy.#,则类似于带有lazy.orange.male.rabbit的key的消息可以匹配。


topic exchange非常强大,通过匹配routing_key可以表现的像存在多个exchange


六、RPC

为了接收响应,客户端需要在发送请求时附加发送回调队列地址:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)
                      
# ... and some code to read a response message from the callback_queue ...


向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI