一、解释

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

二、安装

pip install pika

三、简单队列

  1、使用API操作RabbitMQ 

基于Queue实现生产者消费者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

生产者

# !/usr/bin/env python
# -*- coding:utf-8 -*-

import pika

connection = pika.BaseConnection(pika.ConnectionParameters(host='10.211.55.4'))  # 封装socket逻辑部分
channel = connection.channel()  # 拿到操作句柄

channel.queue_declare(queue='hello')  # 通过channel创建一个队列,再给给队列取名字

channel.basic_publish(exchange='',  # 通过句柄给
                      routing_key='hello',  # 把body的数据放到名为hello的队列里去
                      body='Hello World!',
                     
                      ))
print("[x] Sent 'Hello World!")
connection.close()

消费者  

# !/usr/bin/env python
# -*- coding:utf-8 -*-

import pika

connection = pika.BaseConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

channel.queue_declare(queue='hello')  # 创建队列

def callback(ch, method, properties, body):  # 就是个回调函数
    print(" [x] Received %r" % body)
   

channel.basic_consume(callback,  # 函数名;取出数据就执行这个函数
                      queue='hello',  # 队列名
                      no_ack=Ture)  # 无应答是(Ture);有应答(False)

print(' [*] Waiting for messages.To exit press CTRL+C')
channel.start_consuming()

  2、acknowledgment 消息不丢失  

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

# !/usr/bin/env python
# -*- coding:utf-8 -*-

import pika

connection = pika.BaseConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

channel.queue_declare(queue='hello')  # 创建队列

def callback(ch, method, properties, body):  # 就是个回调函数
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 调为有应答要加上的(下面的要改no_ack=False)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,  # 函数名;取出数据就执行这个函数
                      queue='hello',  # 队列名
                      no_ack=False)  # 无应答是(Ture);有应答(False)
print(' [*] Waiting for messages.To exit press CTRL+C')
channel.start_consuming()
消费者

相关文章:

  • 2021-05-19
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2021-08-10
  • 2022-12-23
  • 2022-12-23
  • 2022-02-17
  • 2022-12-23
相关资源
相似解决方案