AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

Rabbitmq、Redis

 

概念说明:

  • Broker:简单来说就是消息队列服务器实体。 
    Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 
    Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 
    Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 
    Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 
    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 
    producer:消息生产者,就是投递消息的程序。 
    consumer:消息消费者,就是接受消息的程序。 
    channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

Rabbitmq的安装

     linux
            安装配置epel源
           $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
            安装erlang
           $ yum -y install erlang
 
            安装RabbitMQ
           $ yum -y install rabbitmq-server       
注意:service rabbitmq-server start/stop 启动/关闭

  rabbitmq对于python开放的API接口为pika,安装如下

pip install pika
or
easy_install pika
or
源码
 
https://pypi.python.org/pypi/pika

 简单模式

************生产者*************************
import pika


username = 'xixi'  # 指定远程rabbitmq的用户名密码

pwd = '123456'

user_pwd = pika.PlainCredentials(username, pwd)

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost',credentials=user_pwd))     #封装socket逻辑,默认端口5672
channel = connection.channel()  #拿到操作rabbitmq的句柄,所有操作都需要句柄来完成

channel.queue_declare(queue='hello')  # 创建队列,参数为队列名,队列名唯一,不能重复

channel.basic_publish(exchange='',   #交换机,类似于路由,多队列情况下,生产者生产消息并不直接放到队列里,先交给exchange,由exchange决定放到哪个队列里,由于这里只有一个队列,exchange为空,此时为交换机不工作的模式
                      routing_key='hello',  #声明body里面的数据放到哪个队列里面
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

  

************************消费者***************
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.14.58'))  #封装socket逻辑
channel = connection.channel()

channel.queue_declare(queue='hello')  #创建队列,因为生产者跟消费者谁先启动并不一定,所以同时声明创建这个队列,有则取,无则生成,防止程序报错,生产者同理


def callback(ch, method, properties, body): #body为取到的内容
    print(" [x] Received %r" % body)


channel.basic_consume(callback,   #数据取出后,执行该回调函数,
                      queue='hello',  #取数据的对列名字
                      no_ack=True)  #写的话,如果接收消息,机器宕机消息就丢了
                                    # 一般不写。宕机则生产者检测到发给其他消费者

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

重要参数介绍:

(1)acknowledgment 消息不丢失

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

 

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.211.55.4'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)   #有应答第二步

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)  #有应答第一步

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

(2)durable   针对生产者宕机消息不丢失,实质是想数据写到磁盘持久化,重启后仍然有数据

生产者:

 1 #!/usr/bin/env python
 2 import pika
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
 5 channel = connection.channel()
 6 
 7 # make message persistent
 8 channel.queue_declare(queue='hello', durable=True)
 9 
10 channel.basic_publish(exchange='',
11                       routing_key='hello',
12                       body='Hello World!',
13                       properties=pika.BasicProperties(
14                           delivery_mode=2, # make message persistent
15                       ))   #必须参数
16 print(" [x] Sent 'Hello World!'")
17 connection.close()
View Code

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-01-19
  • 2022-01-25
  • 2021-05-23
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2021-09-30
  • 2022-12-23
  • 2022-12-23
  • 2021-12-18
  • 2021-10-01
  • 2022-01-03
  • 2021-09-05
相关资源
相似解决方案