RabbitMQ 工作原理
RabbitMQ 简介
1
2
3
4
5
6* 开源AMQP实现,Erlang语言编写,支持多种客户端
* 分布式,高可用,持久化,可靠,安全
* 支持多种协议: AMQP,STOMP,MQTT,HTTP
* RabbitMQ主要概念对象: 生产者,消费者,交换机,队列
* 业务解耦: 解决多系统,易购系统间的数据交换,解耦生产者和消费者
* 适用场景: 批量数据异步处理,并行任务串行化,高负载任务负载均衡RabbitMQ 核心概念
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34* 颗粒度
* Broker
接收和分发消息的应用,RabbitMQ Server就是Message Broker
* Virtual host
虚拟主机(虚拟组),一个Broker可以开设多个vhost.因为RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制.
当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange和queue等
* Exchange 消息交换机
message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去
常用的类型有:direct(point-to-point),topic(publish-subscribe),fanout(multicast)
* Queue 消息队列载体
消息最终被送到这里等待consumer取走.一个message可以被同时拷贝到多个queue中
* 消息流转
* Binding 绑定,根据路由规则绑定Exchange和Queue
exchange和queue之间的虚拟连接,binding中可以包含routing key
Binding信息被保存到exchange中的查询表中,用于message的分发依据
* Routing Key 路由关键字
* Connection 连接
Publisher/Consumer和broker之间的TCP连接
断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题
* Channel 消息通道,每个连接可建立多个channel
如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低.Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了"channel id"帮助客户端和"message broker"识别channel,所以channel之间是完全隔离的.Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
* 关联对象
* Producter 消息生产者
* Consumer 消息消费者RabbitMQ典型生产消费消息模型
1
生产者发送消息到broker server(RabbitMQ).在Broker内部,用户创建Exchange/Queue,通过Binding规则将两者联系在一起.Exchange分发消息,根据类型/binding的不同,分发策略有所区别.消息最后来到Queue中,等待消费者取走
- RabbitMQ 交换机类型- Direct Exchange
1
Direct Exchange 路由机制: 通过精确匹配消息的路由关键字,将消息路由到零个或者多个队列中,绑定关键字用来将队列和交换器绑定在一起
- RabbitMQ 交换机类型- Topic Exchange
1
Topic Exchange 路由机制: 通过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中.这种路由器类型可以被用来支持经典的发布/订阅消息传输模式
- RabbitMQ 交换机类型- Fanout Exchange
1
Fanout Exchange 路由机制: 不论消息的路由关键字是什么,这条信息都会被路由到所有与该交换机绑定的队列中
- RabbitMQ 交换机类型- Headers Exchange
1
Headers Exchange 路由机制: 键值对匹配路由
操作流程
1
2
3
4
5
6
7
8
9
10(1) 消费者创建消息队列
(2) 消费者定义消息队列
(3) 消费者定义特定类型的交换机
(4) 消费者设定绑定规则
(5) 等待消息
(6) 生产者创建消息
(7) 生产者将消息投递至信息通道中
(8) 交换机获取消息
(9) 消费者获取并处理消息,并发送反馈
(10) 结束,关闭通道和连接实践保障和规划
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//保障
* 复用connection,复用channel
* 如果需要可靠业务,需要持久化和ack机制
* 如果希望高吞吐,可以采取非持久化,noack,自动删除机制
* 稳定性保障: 生产者异常保障,消费者异常保障
//规划
* 生产者面对Exchange,消费者面对queue
* 一个queue只有一个消费者(可多个副本)进行处理
* 命名规划:
- exchange: ex_{业务对象}_{业务场景}
- routekey: {事件,某个业务动作}
- queue: ex_{业务对象}_{业务场景}_{事件}
* 串行或并行业务方案
- 并行方案: 一个事件发生后,多个消费者相互间没有依赖关系,可由Exchange分发消息到多个队列,由各队列的消费者并行进行处理
- 串行方案: 一个事件发生后,多个消费者有先后依赖关系,可以由先执行的消费者处理事情,处理完成后再次发送消息到exchange,由后续的队列进行处理应用场景-异步处理
场景说明: 用户注册后,需要发注册邮件和注册短信,传统的做法有两种:- 串行的方式
将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端
这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西. - 并行的方式
将注册信息写入数据库后,发送邮件的同时发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间 - 引入消息队列
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理.用户的响应时间就等于写入数据库的时间+写入消息队列的时间,响应时间是串行的3倍,是并行的2倍
- 串行的方式
应用场景-应用解耦
场景说明: 双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.这种做法有一个缺点,当库存系统出现故障时,订单就会失败
引入消息队列,订单系统(用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功),库存系统(订阅下单的消息,获取下单消息,进行库操作),就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失应用场景-流量削峰
场景说明: 流量削峰一般在秒杀活动中应用广泛.秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列.一方面可以控制活动人数,超过此一定阀值的订单直接丢弃;另一方面可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
引入消息队列后,用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.秒杀业务根据消息队列中的请求信息,再做后续处理.RabbitMQ 系统架构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48* Round-robin dispathching 循环分发
RabbbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在load加重,那么只需要创建更多的Consumer来进行任务处理
* Message acknowledgment 消息确认
为了保证数据不被丢失,RabbitMQ支持消息确认机制,为了保证数据能被正确处理,而不仅仅是被Consumer收到,那么我们不能采用no-ack,而应该是在处理完数据之后发送ack.
在处理完数据之后发送ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以安全的删除它了.如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer,这样就保证在Consumer异常退出情况下数据也不会丢失.
RabbitMQ它没有用到超时机制.RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有正确处理,也就是说RabbitMQ给了Consumer足够长的时间做数据处理.如果忘记ack,那么当Consumer退出时,Mesage会重新分发,然后RabbitMQ会占用越来越多的内存.
* Message durability 消息持久化
要持久化队列(queue)的持久化需要在声明时指定"durable=True;"
这里要注意,队列的名字一定要是Broker中不存在的,不然不能改变此队列的任何属性.
队列和交换机有一个创建时候指定的标志durable,durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复
消息持久化包括3部分:
1.exchange持久化,在声明时指定"durable => true"
2.queue持久化,在声明时指定"durable => true"
3.消息持久化,在投递时指定"delivery_mode => 2" (1是非持久化)
注意:
* 如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的
* 如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定
* 一旦创建了队列和交换机,就不能修改其标志了.例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建
* Fair dispath 公平分发
分发机制默认状态下不是那么优雅: RabbitMQ将第N个Message分发给第N个Consumer.N是取余后的,它不管Consumer是否还有"unacked Message",只是按照这个默认的机制进行分发.如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却毫无休息的机会
解决方法:
公平分发: 通过basic.qos方法设置"prefetch_count=1",这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它
注意: 这种方法可能会导致queue爆满.当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtual Host来细化你的设计
* 分发到多个Consumer
重温交换机类型:
* Direct Exchange
直接匹配,通过Exchange名称+Rounting Key来发送与接收消息
* Fanout Exchange
广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key
* Topic Exchange
主题匹配订阅,这里的主题指的是Routing Key,Routing Key可以采用通配符,如:*或#,RoutingKey命名采用"."来分隔多个词,只有消息这将队列绑定到该路由器且指定Routing Key符合匹配规则时才能收到消息
* Headers Exchange
消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
* 默认的exchange
如果用空字符串去声明一个exchange,那么系统就会使用"amq.direct"这个exchange.我们创建一个queue时,默认的都会有一个和新建queue同名的routing Key绑定到这个默认的exchange上去
如果有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推
如果有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,也就是说每个程序都会收到这个消息的副本.行为相当于fanout类型的
* 消息序列化
RabbitMQ使用ProtoBuf序列化消息,它可作为RabbitMQ的Message的数据格式进行传输,由于是结构化的数据,这样就极大的方便了Consumer的数据高效处理,当然也可以使用XML,与XML相比,ProtoBuf有优势.