AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
概念说明:
-
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
Rabbitmq的安装
linux
安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
安装erlang
$ yum -y install erlang
安装RabbitMQ
$ yum -y install rabbitmq-server
注意:service rabbitmq-server start/stop 启动/关闭
rabbitmq对于python开放的API接口为pika,安装如下
pip install pika or easy_install pika or 源码 https://pypi.python.org/pypi/pika
简单模式
************生产者*************************
import pika
username = 'xixi' # 指定远程rabbitmq的用户名密码
pwd = '123456'
user_pwd = pika.PlainCredentials(username, pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',credentials=user_pwd)) #封装socket逻辑,默认端口5672
channel = connection.channel() #拿到操作rabbitmq的句柄,所有操作都需要句柄来完成
channel.queue_declare(queue='hello') # 创建队列,参数为队列名,队列名唯一,不能重复
channel.basic_publish(exchange='', #交换机,类似于路由,多队列情况下,生产者生产消息并不直接放到队列里,先交给exchange,由exchange决定放到哪个队列里,由于这里只有一个队列,exchange为空,此时为交换机不工作的模式
routing_key='hello', #声明body里面的数据放到哪个队列里面
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
************************消费者***************
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.14.58')) #封装socket逻辑
channel = connection.channel()
channel.queue_declare(queue='hello') #创建队列,因为生产者跟消费者谁先启动并不一定,所以同时声明创建这个队列,有则取,无则生成,防止程序报错,生产者同理
def callback(ch, method, properties, body): #body为取到的内容
print(" [x] Received %r" % body)
channel.basic_consume(callback, #数据取出后,执行该回调函数,
queue='hello', #取数据的对列名字
no_ack=True) #写的话,如果接收消息,机器宕机消息就丢了
# 一般不写。宕机则生产者检测到发给其他消费者
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
重要参数介绍:
(1)acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.211.55.4'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag) #有应答第二步
channel.basic_consume(callback,
queue='hello',
no_ack=False) #有应答第一步
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
(2)durable 针对生产者宕机消息不丢失,实质是想数据写到磁盘持久化,重启后仍然有数据
生产者:
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) 5 channel = connection.channel() 6 7 # make message persistent 8 channel.queue_declare(queue='hello', durable=True) 9 10 channel.basic_publish(exchange='', 11 routing_key='hello', 12 body='Hello World!', 13 properties=pika.BasicProperties( 14 delivery_mode=2, # make message persistent 15 )) #必须参数 16 print(" [x] Sent 'Hello World!'") 17 connection.close()