RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue,消息队列是一种应用程序对应程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接他们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ的安装:
1、安装配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
2、安装erlang $ yum -y install erlang
3、安装RabbitMQ $ yum -y install rabbitmq-server
4、启动与停止:service rabbitmq-server start/stop
python安装API:
pip install pika 或者 easy_install pika 或者源码安装:
生产者
消费者
https://pypi.python.org/pypi/pika
对于RabbitMQ来说,生产和消费不再针对内存里的一个queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
import pika # 创建连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.254.129')) channel = connection.channel() # 创建频道 channel.queue_declare(queue='hello') # 创建队列hello,若存在则忽略 # 向队列hello中发消息, body为发的消息内容 channel.basic_publish(exchange='', routing_key='hello', body='Hello World') print('saf') connection.close()
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.254.129')) chanel = conn.channel() chanel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(body) # ch为channel, method为函数名字, property为属性, body为取到的消息 chanel.basic_consume(callback, queue='hello', no_ack=True) # no_ack = False为,如果消费者遇到情况(its channel is closed,connection # is closed,or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中 print('hehehehe') chanel.start_consuming()
1、acknowledgment 消息不丢失:no_ack = False,如果消费者遇到情况(its channel is closed,connection is closed,or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
import pika, time conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.254.129')) chanel = conn.channel() chanel.queue_declare(queue='hello') def callback(ch, method, properties, body): print('消息:', body) time.sleep(10) print('ok') ch.basic_ack(delivery_tag=method.delivery_tag) chanel.basic_consume(callback, queue='hello', no_ack=False) print('我是消费者,等待消息') chanel.start_consuming()
2、durable(持久化) 消息不丢失:当RabbitMQ服务宕机,后也不用担心消息的丢失。
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.254.129')) channel = conn.channel() channel.queue_declare(queue='hello1', durable=True) # delivery_mode=2意思是做持久化 channel.basic_publish(exchange='', routing_key='hello1', body='hello world', properties=pika.BasicProperties( delivery_mode=2, )) print('发送消息') conn.close()