
| 1. 生产者和消费者 RabbitMQ在应用程序和服务器之间扮演者路由器的角色.所以当应用程序连接RabbitMQ时,它会决定是发送还是接收 1. 生产者生产消息,然后发布到代理服务器(RabbitMQ). 消息分为两部分: * 有效载荷:你要传输的数据 * 标签:它描述了有效载荷,并且RabbitMQ用它来决定谁将获得消息的拷贝 2. 消费者消费消息,先连接到代理服务器(RabbitMQ),并订阅到队列上 当消费者接收到消息时,它只得到消息的一部分:有效载荷.在消息的路由过程中,消息的标签并没有随有效载荷一起传递 生产者创建消息,消费者接收消息.在发布消息或接收消息之前必须先连接RabbitMQ,建立一条信道(channel) 2. 队列 AMPQ消息路由必须包含三部分: * 路由器: 生产者把消息发送到路由器上 * 队列: 消息最终到达队列,并被消费者接收 * 绑定: 决定了消息如何从路由器路由到特定的队列上
消费者通过两种方式从特定的队列中接收消息: * 通过AMQP的base.consume命令订阅(持续订阅) * 通过AMQP的base.get命令(单条订阅) 当至少一个消费者订阅了队列, 消息会立即发送给这些订阅的消费者;如果消息到达了无人订阅的队列,消息会在队列中等待,一旦有消费者订阅该队列,那么队列上的消息会立即发送给消费者.当有多个消费者订阅到同一个队列时,队列收到的消息将以循环的方式发送给消费者,**每条消息只会发送给一个订阅消费者**. 例如:消费者A和B都订阅了队列send 1. Message_A发送到队列send 2. RabbitMQ将消息Message_A发送给消费者A 3. 消费者A确认收到了消息Message_A 4. RabbitMQ将消息Message_A从队列send中删除 5. Message_B发送到队列send 6. RabbitMQ将消息Message_B发送给消费者B 7. 消费者B确认收到了消息Message_B 8. RabbitMQ将消息Message_B从队列send中删除 注意: * 消费者接收到的每一条消息都必须进行确认(通过AMQP的base.ack命令或在订阅队列的时候将auto_ack设置为true). * 消费者对消息的确认和告诉生产者消息已经被接收了这两件事情毫无相关.因此,消费者通过确认命令告诉RabbitMQ它已经正确的接收到了消息,同时RabbitMQ才能安全的把消息从队列中移除 * 如果消费者接收一条消息,然后确认之前从Rabbit处断开连接,RabbitMQ会认为这条消息没有分发,它会从新分发给下一个订阅的消费者 * 如果你的程序崩溃了,这样做一方面保证消息会被发送给下一个消费者处理;另一方面,如果应用程序有Bug而忘记确认消息,RabbitMQ不会给该消费者发送更多消息,这是因为在上一条消息被确认之前,RabbitMQ会认为这个消费者还没有准备好接收下一条消息
在收到消息后,如果你想明确拒绝而不是确认收到该消息,你有两个选择: * 把消费者从RabbitMQ服务器断开连接,这会导致RabbitMQ会重新将消息发送给另一个消费者 优点:所有RabbitMQ都支持 缺点:这样连接/断开的连接方式会增加RabbitMQ的负担 * RabbitMQ >= 2.0.0版本,使用AMPQ的base.reject命令 如果把reject的requeue参数设置为true,RabbitMQ会把消息发送给下一个消费者 如果把reject的requeue参数设置为false,RabbitMQ会立即把消息从队列中移除,而不会把它发送给新的消费者
创建队列: 生产者和消费者都能使用AMPQ的queue.declare命令创建队列 如果消费者在同一条信道上订阅了另一个队列,就无法声明队列了.必须首先取消订阅,将信道设置为"传输"模式 创建队列时需要指定队列名称,消费者订阅时需要队列名称,在创建绑定时也需要队列名称.如果在创建队列时没有指定队列名称,RabbitMQ会分配一个随机名称并在queue.declare命令的响应中返回 创建队列有用参数: * exclusive 设置true,队列将变为私有的,此时只有你的应用程序才能够消费队列消息.当你限制一个队列只有一个消费者时有用 * auto-delete 当最后一个消费者取消订阅时,队列会自动移除 * 如果你只想检测队列是否存在,可设置queue.declare的passive选项为true.在该设置下,如果队列存在返回成功;如果队列不存在返回错误
队列是AMPQ的基础模块: * 为消息提供了处所,消息在此等待消费 * 对负载均衡来说,队列是绝佳方案.只需附加一堆消费者,并让RabbitMQ循环的方式均匀分发消息 * 队列是RabbitMQ消息的最终点(除非消息进入了黑洞)
3. 交换器和绑定 将你想将消息投递给队列时,你通过把消息发送给交换器来完成.然后,根据确定的规则,RabbitMQ会决定消息该投递到哪个队列.这些规则成为路由键 **队列通过路由键来绑定到交换器.** 当你把消息发送到代理服务器时,消息将有一个路由键(即便是空的),RabbitMQ也会将其和绑定的路由键进行匹配.如果匹配成功,那么消息将投递到该队列;如果匹配不成功,消息将进入黑洞
服务器会根据路由键将消息从交换器路由到队列,如何处理投递多个队列呢? 协议中定义的不通类型交换器发挥了作用: * direct * fanout * topic * headers 每一类实现了不通的路由算法.
1. headers交换器允许你匹配AMQP消息的header而非路由键,除此之外,headers交换器和direct交换器完全一致,但性能会差很多 2. direct **如果路由键匹配,消息就会被投递到对应的队列** 服务器必须实现direct类型交换器,包含一个空白字符串名称的默认交换器 当声明一个队列时,它会自动绑定到默认交换器,并以队列名称作为路由键. 例如: $channel->baseic_publish($msg,'','queue-name') 第一个参数表示你想发送的内容 第二个参数为空,表示指定了默认的交换器 第三个参数就是路由键,即队列名字 当默认的direct交换器无法满足应用程序的需求时,你可以声明自己的交换器.只需发送exchange.declare命令并设置合适的参数 3. fanout **会将收到的消息广播到绑定的队列上** 当你发送一条消息到fanout交换器时,它会把消息投递给所有附加在此交换机的队列 4. topic **允许来自不同源头的消息能够到达同一个队列**
声明队列msg-inbox-errors,并将其绑定到交换器上 $channel->queue_bind('msg-inbox-errors','logs-exchange','error.msg-inbox') error.msg-inbox表示绑定规则 4. 虚拟主机和隔离 每一个RabbitMQ服务器都能创建虚拟消息服务器,我们称为虚拟主机vhost. 每一个vhost本质上都一个mini版的RabbitMQ服务器,拥有自己的队列,交换器和绑定,更重要的是拥有自己的权限机制 vhost之于RabbitMQ就像虚拟机之于物理服务器一样:它们通过在各个实例间提供逻辑上的分离,允许你为不同应用程序安全保密的运行数据 vhost是AMQP概念的基础,你必须在连接时进行制定.默认的vhost:"/"
AMQP它并没有指定权限控制是在vhost级别还是在服务器端级别实现 当你在RabbitMQ里面创建一个用户时,用户通常会被指派至少一个vhost,并且只能访问被指派vhost内的队列,交换器和绑定 注意:vhost之间是绝对隔离的 vhost和权限控制非常独特,它们是AMQP中唯一无法通过AMQP协议创建的基元.需要通过rabbitmqctl工具创建 例如: * rabbitmqctl add_vhost vhost-name * rabbitmqctl delete_vhost vhost-name * rabbitmqctl list_vhost 5. 持久化和你的策略 默认情况下,重启RabbitMQ服务器后,那些队列和交换器就消失了.原因在于每个队列和交换器的durable属性,该属性为false,它决定了RabbitMQ是否在崩溃或者重启之后重建队列.该属性为true,你就不需要在服务器断电重启后重新创建队列和交换器了
在发布消息之前,通过把它的"投递模式"(delivery mode)选项设置为2来把消息标记成持久化.到目前为止,消息还只是被表示为持久化的,但是它还必须被发布到持久化交换器中并到达持久化队列中才行. 如果消息想从RabbitMQ崩溃中恢复,那么消息必须: * 把它的投递模式设置为2(持久) * 发送到持久化交换器 * 到达持久化队列
RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件.当发布一条持久性消息到交换器时,RabbitMQ会在消息提交到日志文件后才发送响应.记住,之后这条消息如果路由到了非持久队列,它会自动从持久性日志中移除,并且无法从服务器重启中恢复.如果你使用持久性消息,则确保之前提到的持久性消息的那三点必须做到位.一旦你从持久化队列中消费了一条持久性消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集.在你消费持久化消息之前,如果RabbitMQ重启的话,服务器会自动重建交换器和队列,绑定,重播持久性日志文件中消息到合适的队列或者交换器上
持久化消息能保证RabbitMQ重启后数据的恢复,同时你也要为此付出性能代价.
和持久化相关的一个概念是AMQP事务. 在AMQP中,在把信道设置成事务模式后,你通过信道发送那些想要确认的消息,之后还有多个其他AMQP命令.这些命令是执行还是忽略,取决于第一条消息发送是否成功.一旦你发送完所有命令,就可以提交事务了. 事务填补了生产者发布消息以及RabbitMQ将它们提交到磁盘上这两者之间"最有1英里"的差距
RabbitMQ事务消耗RabbitMQ性能,RabbitMQ团队拿出最好的方案来保证消息投递:发送方确认模式 你需要告诉RabbitMQ将信道设置成confirm模式,而且你只能通过重新创建信道来关闭该设置.一旦信道进入confirm模式,所有在信道上发布的消息都会被指派一个唯一的ID号.一旦消息被投递给所有匹配的队列后,信道会发送一个发送方确认模式给生产者应用程序.这使得生产者知晓消息已经安全到达队列了.如果消息和队列是可持久化的,那么确认消息只会在队列将消息写入磁盘后才会发出.发送方确认模式的最大好处是它们是异步的.一旦发布了一条消息,生产者应用程序就可以在等待确认的同时继续发送下一条.当确认消息最终收到的时候,生产者应用的回调方法就会被触发来处理该确认消息.如果RabbitMQ内部错误从而导致了消息的丢失,RabbitMQ会发送一条nack消息.就像发送方确认消息那样,只不过这次说明的消息已经丢失了.同时,由于没有消息回滚的概念,因此发送方确认模式更加轻量级,同时对RabbitMQ代理服务器的性能影响几乎可以忽略不计 6. 一条消息的一生 发布者: * 连接到RabbitMQ * 获取信道 * 声明交换器 * 创建消息 * 发布消息 * 关闭信道 * 关闭连接
脚本: esay_install pika # -*- coding:utf-8 -*- import pika,sys # 建立连接 credentials = pika.PlainCredentials("guest","guest") conn_params = pika.ConnectionParameters("localhost",credentials = credentials) conn_broker = pika.BlockingConnection(conn_params) # 获取信道 channel = conn_broker.channel() # 声明交换器 channel.exchange_declare(exchange="hello-exchange",type="direct",passive=False,durable=True,auto_delete=False) # 创建消息 msg = sys.argv[1] msg_props = pika.baseicProperties() msg_props.content_type = "text/plain" # 发布消息 channel.basic_publish(body=msg,exchange="hello-exchange",properties=msg_props,routing_key="hola")
消费者: * 连接到RabbitMQ * 获取信道 * 声明交换器 * 声明队列 * 把队列和交换器绑定起来 * 消费消息 * 关闭信道 * 关闭连接
脚本: esay_install pika # -*- coding:utf-8 -*- import pika,sys # 建立连接 credentials = pika.PlainCredentials("guest","guest") conn_params = pika.ConnectionParameters("localhost",credentials = credentials) conn_broker = pika.BlockingConnection(conn_params) # 获取信道 channel = conn_broker.channel() # 声明交换器 channel.exchange_declare(exchange="hello-exchange",type="direct",passive=False,durable=True,auto_delete=False) # 声明队列 channel.queue_declare(queue="hello-queue") # 通过routing键将队列和交换器绑定起来 channel.queue_bind(queue="hello-queue",exchange="hello-exchange",routing_key="hola") # 定义消息处理函数 def msg_consumer(channel,method,header,body): # 消息确认 channel.basic_ack(delivery_tag=method.delivery_tag) # 停止消费并退出 if body == "quit": channel.basic_cancel(consumer_tag="hello-consumer") channel.stop_consuming() else: print body return # 订阅消费者 channel.basic_consume(msg_consumer,queue="hello-queue",consumer_tag="hello-consumer") channel.start_consuming()
7. 使用发送方确认模式来确认投递 带有确认功能的生产者
# -*- coding:utf-8 -*- import pika,sys from pika import spec
credentials = pika.PlainCredentials("guest","guest") conn_params = pika.ConnectionParameters("localhost",credentials = credentials) conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel() #发送方确认模式处理器 def confirm_handler(frame): if type(frame.method) == spec.Confirm.SelectOK: print "Channel in 'confirm' mode." elif type(frame.method) == spec.Basic.Nack: if frame.method.delivery_tag in msg_ids: print "Message lost!" elif type(frame.method) == spec.Basic.Ack: if frame.method.delivery_tag in msg_ids: print "Confirm received!" msg_ids.remove(frame.method.delivery_tag) # 将信道设置为confirm模式 channel.confirm_delivery(callback=confirm_handler) # 重设消息ID追踪器 msg = sys.argv[1] msg_props = pika.baseicProperties() msg_props.content_type = "text/plain" msg_ids = [] # 发布消息 channel.basic_publish(body=msg,exchange="hello-exchange",properties=msg_props,routing_key="hola") msg_ids.append(len(msg_ids) + 1) channel.close()
|