一、解释
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
二、安装
pip install pika
三、简单队列
1、使用API操作RabbitMQ
基于Queue实现生产者消费者模型
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
message = Queue.Queue(10)
def producer(i):
while True:
message.put(i)
def consumer(i):
while True:
msg = message.get()
for i in range(12):
t = threading.Thread(target=producer, args=(i,))
t.start()
for i in range(10):
t = threading.Thread(target=consumer, args=(i,))
t.start()
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
生产者
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BaseConnection(pika.ConnectionParameters(host='10.211.55.4')) # 封装socket逻辑部分
channel = connection.channel() # 拿到操作句柄
channel.queue_declare(queue='hello') # 通过channel创建一个队列,再给给队列取名字
channel.basic_publish(exchange='', # 通过句柄给
routing_key='hello', # 把body的数据放到名为hello的队列里去
body='Hello World!',
))
print("[x] Sent 'Hello World!")
connection.close()
消费者
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BaseConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
channel.queue_declare(queue='hello') # 创建队列
def callback(ch, method, properties, body): # 就是个回调函数
print(" [x] Received %r" % body)
channel.basic_consume(callback, # 函数名;取出数据就执行这个函数
queue='hello', # 队列名
no_ack=Ture) # 无应答是(Ture);有应答(False)
print(' [*] Waiting for messages.To exit press CTRL+C')
channel.start_consuming()
2、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
# !/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BaseConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() channel.queue_declare(queue='hello') # 创建队列 def callback(ch, method, properties, body): # 就是个回调函数 print(" [x] Received %r" % body) import time time.sleep(10) print('ok') ch.basic_ack(delivery_tag=method.delivery_tag) # 调为有应答要加上的(下面的要改no_ack=False) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, # 函数名;取出数据就执行这个函数 queue='hello', # 队列名 no_ack=False) # 无应答是(Ture);有应答(False) print(' [*] Waiting for messages.To exit press CTRL+C') channel.start_consuming()