RabbitMQ python pika

RabbitMQ python pika

  1. RabbitMQ 安装和用户权限设置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    * RabbitMQ 安装,此步骤忽略

    * 创建vhost
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl add_vhost /test
    Creating vhost "/test"
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_vhosts|grep test
    /test

    * 对创建的vhost设置高可用
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl set_policy -p /test ha-all "^" '{"ha-mode":"all"}'
    Setting policy "ha-all" for pattern "^" to "{\"ha-mode\":\"all\"}" with priority "0"
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_policies -p /test Listing policies
    /test ha-all all ^ {"ha-mode":"all"} 0

    * 创建用户
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl add_user test test123
    Creating user "test"
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_users|grep test
    test []

    * 为用户设置角色
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl set_user_tags test administrator monitoring
    Setting tags for user "test" to [administrator,monitoring]
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_users|grep test test [administrator, monitoring]

    * 为用户设置权限
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl set_permissions -p /test test '.*' '.*' '.*'
    Setting permissions for user "test" in vhost "/test"
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_permissions -p /test Listing permissions in vhost "/test"
    test .* .* .*
  2. Python pika install

    1
    [root@rabbitmq188 ~]# pip install pika
  3. RabbitMQ 单向发送消息
    RabbitMQ 单向发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    //生产者
    [wisdom@rabbitmq188 web]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_queues -p /test
    Listing queues

    [wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建队列(消息发送到队列)
    channel.queue_declare(queue='queue_hello')

    # 发布消息
    channel.basic_publish(exchange="",
    routing_key="queue_hello",
    body='Hello World!')

    print("msg_send 'Hello World!'")

    # 关闭与RabbitMQ建立连接
    conn_broker.close()
    EOF

    [wisdom@rabbitmq188 web]$ python msg_send.py
    msg_send 'Hello World!'
    [wisdom@rabbitmq188 web]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_queues -p /test |grep hello
    queue_hello 1
    [wisdom@rabbitmq188 web]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_exchanges -p /test |grep hello
    exchange_hello direct


    //消费者
    [wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建队列(消息发送到队列)
    channel.queue_declare(queue='queue_hello')

    # 创建消费消息
    def callback(ch, method, properties, body):
    print("[x] Received %r" % body)

    # 订阅消息
    channel.basic_consume(callback,
    queue='queue_hello',
    no_ack=True)

    print('[*] Waiting for messages. To exit press CTRL+C')

    # 开始消费
    channel.start_consuming()
    EOF

    [wisdom@rabbitmq188 web]$ python msg_receive.py
    [*] Waiting for messages. To exit press CTRL+C


    //执行msg_send.py 脚本,然后观察 msg_receive.py
    [wisdom@rabbitmq188 web]$ python msg_receive.py
    [*] Waiting for messages. To exit press CTRL+C
    [x] Received 'Hello World!'
    [x] Received 'Hello World!'
  4. RabbitMQ 工作队列

    1
    消息也可以理解为任务,消息发送者可以理解为任务分配者,消息接收者可以理解为工作者,当工作者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,于是就需要多个工作者来共同处理这些任务,这些工作者,就称为工作队列

RabbitMQ 工作队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello')

# 发布消息
channel.basic_publish(exchange="",
routing_key="queue_hello",
body=message)

print("[x] Sent %r" % (message,))

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

//消费者(worker_1.py和worker_2 内容一致)
[wisdom@rabbitmq188 web]$ cat > worker_1.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello')

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))
time.sleep(body.count('.'))
print("[x] Done")

# 订阅消息
channel.basic_consume(callback,
queue='queue_hello',
no_ack=True)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF


//启动worker_1.py和worker_2.py
[wisdom@rabbitmq188 web]$ python worker_1.py
[*] Waiting for messages. To exit press CTRL+C
[wisdom@rabbitmq188 web]$ python worker_2.py
[*] Waiting for messages. To exit press CTRL+C

//执行 msg_send.py向队列写入数据
[wisdom@rabbitmq188 web]$ python msg_send.py First message.
[x] Sent 'First message.'
[wisdom@rabbitmq188 web]$ python msg_send.py Second message.
[x] Sent 'Second message.'
[wisdom@rabbitmq188 web]$ python msg_send.py Third message.
[x] Sent 'Third message.'
[wisdom@rabbitmq188 web]$ python msg_send.py Fourth message.
[x] Sent 'Fourth message.'
[wisdom@rabbitmq188 web]$ python msg_send.py Fifth message.
[x] Sent 'Fifth message.'

//worker_1.py返回状态
[wisdom@rabbitmq188 web]$ python worker_1.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message.'
[x] Done
[x] Received 'Fourth message.'
[x] Done

//worker_2.py返回状态
[wisdom@rabbitmq188 web]$ python worker_2.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Done
[x] Received 'Third message.'
[x] Done
[x] Received 'Fifth message.'
[x] Done

综上,每个工作者,都会依次分配到任务.如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理.所以应当有一种机制,当一个工作者完成任务时,会反馈消息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
消息确认就是当工作者完成任务后,会反馈给rabbitmq.修改worker.py中的回调函数:

[wisdom@rabbitmq188 web]$ cat > worker_1.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello')

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))
time.sleep(5)
print("[x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag) #消息确认

# 订阅消息
channel.basic_consume(callback,
queue='queue_hello',
no_ack=False) # no_ack 关闭

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

//启动work_1.py和worker_2.py
[wisdom@rabbitmq188 web]$ python worker_1.py
[*] Waiting for messages. To exit press CTRL+C
[wisdom@rabbitmq188 web]$ python worker_2.py
[*] Waiting for messages. To exit press CTRL+C

//执行 msg_send.py向队列写入数据
[wisdom@rabbitmq188 web]$ python msg_send.py First message.
[x] Sent 'First message.'

//执行向队列发送消息后,我发现woker_1.py收到消息,我立即中断woker_1.py程序
[wisdom@rabbitmq188 web]$ python worker_1.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
^CTraceback (most recent call last): "Ctrl+C 退出"

//在5秒中内停止worker_1.py后,worker_2.py接收消息并继续处理
[wisdom@rabbitmq188 web]$ python worker_2.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Done

综上,通过消息确认,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
虽然有了消息反馈机制,正在执行的任务不会丢失,但是如果rabbitmq本身服务挂掉的话,那么任务还是会丢失.所以需要将任务持久化存储起来.声明持久化存储:

//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello',durable=True) #创建队列,声明持久化

# 发布消息
channel.basic_publish(exchange="",
routing_key="queue_hello",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # 持久化参数
))

print("[x] Sent %r" % (message,))

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

//消费者
[wisdom@rabbitmq188 web]$ cat > worker_1.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello',durable=True) # 声明持久化队列

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))
time.sleep(5)
print("[x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)

# 订阅消息
channel.basic_consume(callback,
queue='queue_hello',
no_ack=False)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

注意:
声明队列的时候,需要添加"durable=True"参数,如果队列已经存在,程序执行会报错.rabbitmq不允许使用不同的参数来重新定义存在的队列,所以需要定义一个新的队列.
channel.queue_declare(queue='queue_hello',durable=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
公平调度:
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样.可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短.如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务.

//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello',durable=True)

# 发布消息
channel.basic_publish(exchange="",
routing_key="queue_hello",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2,
))

print("[x] Sent %r" % (message,))

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

//消费者
[wisdom@rabbitmq188 web]$ cat > worker_1.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello',durable=True)

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))
time.sleep(5)
print("[x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)

# 公平调度
channel.basic_qos(prefetch_count=1) # 公平调度
# 订阅消息
channel.basic_consume(callback,
queue='queue_hello',
no_ack=False)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

注意,公平调度主要是在消费者上进行设置:
channel.basic_qos(prefetch_count=1)
  1. RabbitMQ 交换器
    1
    2
    3
    工作队列,每次消息都只会发送给其中一个接收端,如果需要将消息广播出去,让每个接收端都能收到,那么就要使用交换机

    交换机的工作原理: 消息发送端先将消息发送给交换机,交换机再将消息发送到绑定的消息队列,而后每个接收端都能从各自的消息队列里接收到信息

RabbitMQ 工作队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建交换机
channel.exchange_declare(exchange="exchange_hello",type='fanout') #创建交换机

# 发布消息
channel.basic_publish(exchange="exchange_hello", #发布消息到交换机
routing_key="",
body='Hello World!')

print("[x] Sent 'Hello World!'")

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

注意:
1.生产者要将消息发送到交换机,而不是队列.
2.basic_publish方法的参数exchange被设定为相应交换机,因为是要广播出去,发送到所有队列,所以routing_key就不需要设定了
3.exchange如果为空,表示是使用匿名的交换机(例如:amq.*这样的交换机,就是系统默认的交换机).routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列的意思


//消费者
[wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建交换机
channel.exchange_declare(exchange='exchange_hello', type='fanout') # 创建交换机

# 创建随机队列,并绑定到交换机上
# "exclusive=True"表示当接收端退出时,销毁临时产生的队列
queue_random = channel.queue_declare(exclusive=True) # 创建随机队列,并绑定到交换机.
queue_name = queue_random.method.queue
channel.queue_bind(exchange='exchange_hello',queue=queue_name)

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))

# 订阅消息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

  1. RabbitMQ 路由键

    1
    2
    3
    交换机已经能实现给所有接收端发送消息,但是如果需要自由定制,有的消息发给其中一些接收端,有些消息发送给另外一些接收端,这种情况就需要用到路由键了

    路由键的工作原理: 每个接收端的消息队列在绑定交换机的时候,可以设定相应的路由键.发送端通过交换机发送信息时,可以指明路由键,交换机会根据路由键把消息发送到相应的消息队列,这样接收端就能接收到消息了
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    //生产者
    [wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建交换机
    channel.exchange_declare(exchange="exchange_hello",type='direct')

    # 定义三个路由键
    routings = ['info','warning','error']

    # 将消息发布到交换机,并设置路由键
    for routing in routings:
    message = '%s message.' % routing
    channel.basic_publish(exchange="exchange_hello",
    routing_key=routing,
    body=message)
    print(message)

    # 关闭与RabbitMQ建立连接
    conn_broker.close()
    EOF

    //消费者
    [wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika
    import time

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建交换机
    channel.exchange_declare(exchange='exchange_hello', type='direct')

    # 从命令行获取路由键参数,如果没有,默认为info
    routings = sys.argv[1:]
    if not routings:
    routings = ['info']

    # 创建随机队列,并绑定到交换机上,设置路由键
    queue_random = channel.queue_declare(exclusive=True)
    queue_name = queue_random.method.queue
    for routing in routings:
    channel.queue_bind(exchange='exchange_hello',
    queue=queue_name,
    routing_key=routing)

    # 创建消费消息
    def callback(ch, method, properties, body):
    print("[x] Received %r" % (body,))

    # 订阅消息
    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)

    print('[*] Waiting for messages. To exit press CTRL+C')

    # 开始消费
    channel.start_consuming()
    EOF

    //执行msg_receive.py
    [wisdom@rabbitmq188 web]$ python msg_receive.py warning error
    [*] Waiting for messages. To exit press CTRL+C
  2. RabbitMQ 路由键模糊匹配

    1
    2
    通过设置路由键,可以将消息发送到相应的队列,这里的路由键是要完全匹配,比如: info消息的只能发到路由键为info的消息队列
    路由键模糊匹配,就是可以使用正则表达式.和常用的正则表示式不同,这里的话"#"表示所有,全部的意思;"*"只匹配到一个词
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建交换机
channel.exchange_declare(exchange="exchange_hello",type='topic') # 设置为topic类型

# 定义三个路由键
routings = ['happy.work','happy.life','sad.work','sad.life']

# 将消息发布到交换机,并设置路由键
for routing in routings:
message = '%s message.' % routing
channel.basic_publish(exchange="exchange_hello",
routing_key=routing,
body=message)
print(message)

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

//消费者
[wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建交换机
channel.exchange_declare(exchange='exchange_hello', type='topic')

# 从命令行获取路由键参数,如果没有,则报错退出
routings = sys.argv[1:]
if not routings:
print >> sys.stderr,"Usage: %s [routing_key]..." % (sys.argv[0])
sys.exit()

# 创建随机队列,并绑定到交换机上,设置路由键
queue_random = channel.queue_declare(exclusive=True)
queue_name = queue_random.method.queue
for routing in routings:
channel.queue_bind(exchange='exchange_hello',
queue=queue_name,
routing_key=routing)

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))

# 订阅消息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

//运行接收端
[wisdom@rabbitmq188 web]$ python msg_receive.py "*.work"
[wisdom@rabbitmq188 web]$ python msg_receive.py "happy.*"


补充说明:
1.发送信息时,如果不设置路由键,那么路由键设置为"*"的接收端是否能接收到消息?
发送信息时,如果不设置路由键,默认是表示广播出去,理论上所有接收端都可以收到消息,但经测试,路由键设置为"*"的接收端收不到任何消息.只有发送消息时,设置路由键为一个词,路由键设置为"*"的接收端才能收到消息.在这里,每个词使用"."符号分开的

2.发送消息时,如果路由键设置为"..",那么路由键设置为"#."的接收端是否能接收到消息?如果发送消息时,路由键设置为一个词呢?
两种情况,测试都可以

3."a.*.#""a.#"的区别
"a.#"只要字符串开头的一个词是a就可以了,比如a,a.haha,a.haha.haha;而这样的词是不行的,如abs,abc,abc.haha
"a.*.#"必须要满足a.*的字符串才可以,比如a.,a.haha,a.haha.haha.而这样的词是不行的,如a
  1. RabbitMQ 远程结果返回

    1
    2
    3
    前面的例子都有个共同点,就是发送端发送消息出去后没有结果返回.如果只是单纯发送消息,当然没有问题了.但是在实际中,常常会需要接收端将收到的消息进行处理之后,返回给发送端.

    处理方法描述: 发送端在发送信息前,产生一个接收消息的临时队列,该队列用来接收返回的结果.其实在这里接收端,发送端的概念已经比较模糊了,因为发送端也同样要接收消息,接收端同样也要发送消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    示例内容: 假设有一个控制中心和一个计算节点,控制中心会将一个自然数N发送给计算节点,计算节点将N值加1后,返回给控制中心.这里用center.py模拟控制中心,compute.py模拟计算节点


    //生产者
    [wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika


    class Center(object):
    def __init__(self):
    # 建立与RabbitMQ建立连接
    self.credentials = pika.PlainCredentials("test","test123")
    self.conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = self.credentials)
    self.conn_broker = pika.BlockingConnection(self.conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    self.channel = self.conn_broker.channel()

    # 定义接收到返回消息处理方法
    def on_response(self,ch,method,props,body):
    self.response = body

    def request(self,n):
    self.response = None
    #发送计算请求,并声明返回队列
    self.channel.basic_publish(exchange='',
    routing_key='queue_compute',
    properties=pika.BasicProperties(
    reply_to = self.callback_queue,
    ),
    body=str(n))
    #接收返回的数据
    while self.response is None:
    self.conn_broker.process_data_events()
    return int(self.response)

    center = Center()

    print " [x] Requesting increase(30)"
    response = center.request(30)
    print " [.] Got %r" % (response,)
    EOF

    //消费者
    [wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika
    import time

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建队列
    channel.queue_declare(queue='queue_compute')
    print('[*] Waiting for n')

    # 将n值加1
    def increase(n):
    return n + 1

    # 创建消费消息
    def callback(ch, method, properties, body):
    print("[.] increase(%s)" % (body,))
    response = increase(int(body))
    # 将计算结果返回生产者
    ch.basic_publish(exchange='',
    routing_key=properties.reply_to,
    body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)

    # 订阅消息
    channel.basic_consume(callback,
    queue='queue_compute',
    no_ack=False)

    print('[*] Waiting for messages. To exit press CTRL+C')

    # 开始消费
    channel.start_consuming()
    EOF
  2. RabbitMQ 相互关联编号”correlation id”

    1
    2
    3
    4
    5
    远程结果中有一个没有提到,就是"correlation id",这是什么呢?

    假设有多个计算节点,控制中心开启多个线程,往这些计算节点发送数字,要求计算结果并返回,但是控制中心只开启了一个队列,所有线程都是从这个队列里获取消息,每个线程如何确定收到的消息就是该线程对应的呢?这个就是"correlation id"的用处了.correlation翻译成中文就是相互关联,也表达了这个意思

    "correlation id"运行原理: 控制中心发送计算请求时设置"correlation id",而后计算节点将计算结果,连同接收到的"correlation id"一起返回,这样控制中心就能通过"correlation id"来标识请求.其实"correlation id"也可以理解为请求的唯一标识码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    示例内容: 控制中心开启多个线程,每个线程都发起一次计算请求,通过"correlation id",每个线程都能准确收到相应的计算结果

    //生产者
    [wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika
    import threading
    import uuid

    # 自定义线程类,继承threading.Thread
    class MyThread(threading.Thread):
    def __init__(self):
    super(MyThread,self).__init()
    self.func = func
    self.num = num
    def run(self):
    print("[x] Requesting increase(%d)" % self.num)
    response = self.func(self.num)
    print "[.] increase(%d)=%d" % (self.num, response)

    # 控制中心类
    class Center(object):
    def __init__(self):
    # 建立与RabbitMQ建立连接
    self.credentials = pika.PlainCredentials("test","test123")
    self.conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = self.credentials)
    self.conn_broker = pika.BlockingConnection(self.conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    self.channel = self.conn_broker.channel()

    # 定义接收返回消息队列
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response,
    no_ack=True,
    queue=self.callback_queue)
    #返回的结果都会存储在该字典里
    self.response = {}

    # 定义接收到返回消息处理方法
    def on_response(self,ch,method,props,body):
    self.response[props.correlation_id] = body

    def request(self,n):
    corr_id = str(uuid.uuid4())
    self.response[corr_id] = None
    #发送计算请求,并声明返回队列
    self.channel.basic_publish(exchange='',
    routing_key='queue_compute',
    properties=pika.BasicProperties(
    reply_to = self.callback_queue,
    correlation_id = corr_id,
    ),
    body=str(n))
    #接收返回的数据
    while self.response is None:
    self.conn_broker.process_data_events()
    return int(self.response[corr_id])

    center = Center()
    #发起5次计算请求
    nums= [10, 20, 30, 40 ,50]
    threads = []
    for num in nums:
    threads.append(MyThread(center.request, num))
    for thread in threads:
    thread.start()
    for thread in threads:
    thread.join()
    EOF

    //消费者
    [wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika
    import time

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建队列
    channel.queue_declare(queue='queue_compute')
    print('[*] Waiting for n')

    # 将n值加1
    def increase(n):
    return n + 1

    # 创建消费消息
    def callback(ch, method, properties, body):
    print("[.] increase(%s)" % (body,))
    response = increase(int(body))
    # 将计算结果返回生产者,增加"correlation_id"的设定
    ch.basic_publish(exchange='',
    routing_key=properties.reply_to,
    properties=pika.BasicProperties(correlation_id= \
    props.correlation_id),
    body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)

    # 订阅消息
    channel.basic_consume(callback,
    queue='queue_compute',
    no_ack=False)

    print('[*] Waiting for messages. To exit press CTRL+C')

    # 开始消费
    channel.start_consuming()
    EOF