RabbitMQ队列
基础
-
安装erlang: http://www.erlang.org/downloads
-
安装rabbitmq: http://www.rabbitmq.com
-
安装rabbitmq moudle: pip install pika
- RabbitMQ介绍:http://www.rabbitmq.com/tutorials/tutorial-six-python.html
- RabbitMQ web 登入: http://localhost:15672
RabbitMQ是一个消息代理:它接受和转发消息,类似邮局的角色。
实例一:
send端:
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 建立连接 4 channel = connection.channel() # 声明一个管道 5 channel.queue_declare('hello') # 声明一个队列 6 channel.basic_publish( 7 exchange='', 8 routing_key='hello', 9 body= 'Hello Word') 10 # 发送消息 11 # routing_key : 队列名 12 # body: 消息内容 13 print('[x] sent "Hello World!"') 14 connection.close() # 关闭连接,channel不需要关闭
receive 端:
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 建立连接 4 channel = connection.channel() # 定义管道 5 channel.queue_declare(queue='hello') 6 # 声明队列,send端和recv端只需声明一次就好,但是不确定send端是否声明过,所以在两边可以都声明一次,这并不冲突 7 8 def callback(ch, method,properties,body): 9 """ 10 收到消息,调用函数 11 :param ch: 管道 12 :param method: 13 :param properties: 14 :param body: 消息 15 :return: 16 """ 17 print('reeive %r' % body) 18 19 channel.basic_consume( 20 callback, 21 queue='hello', 22 no_ack=True 23 ) 24 print('wait for message.Press Ctrl+C to exit') 25 channel.start_consuming() # 持续接收消息,如果没有消息就阻塞
执行结果:
- send端发送一条消息就结束
- recv端持续接受消息,一收到消息就调用回调函数,否则一直阻塞
- 如果send端没有打开,每次send的消息会存在rabbitmq服务器上,直到有recv端接入接受消息。
实例二:
RabbitMQ server采用轮询机制发送消息。开启多个recv端,send端发送消息,轮询recv端接收消息(每一次send,只会有一个recv端接收消息)
recv端 : channel.basic_consume参数no_ack=True recv端是否在调用完成回调函数后给send端一个确认;
-
- False:默认值,需要接收确认
- True:执行后不确认,也就是服务端把一个消息分发出去后就不管了
情景模拟:no_ack=False,回调函数添加消息处理完毕的确认语句:ch.basic_ack(delivery_tag=method.delivery_tag);在回调函数中添加sleep(),在recv端没处理完消息就中断,观察其他recv端的接受情况。
1 import pika 2 import time 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 'localhost')) 6 channel = connection.channel() 7 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 time.sleep(20) 12 print(" [x] Done") 13 print("method.delivery_tag", method.delivery_tag) 14 ch.basic_ack(delivery_tag=method.delivery_tag) 15 16 17 channel.basic_consume(callback, 18 queue='task_queue', 19 ) 20 21 print(' [*] Waiting for messages. To exit press CTRL+C') 22 channel.start_consuming()