RabbitMQ消息队列(入门)
一、RabbitMQ队列
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。RabbitMQ可以,多个程序同时使用RabbitMQ ,但是必须队列名称不一样。采用erlang语言,属于爱立信公司开发的。
二、python的queue与RabbitMQ的比较
关于python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种queue都是只能在同一个进程下的线程间或者父进程与子进程之间进行队列通讯,并不能进行程序与程序之间的信息交换,这时候我们就需要一个中间件,来实现程序之间的通讯。
Rabbitmq并不是python内置的模块,而是一个需要你额外安装(ubunto可直接apt-get其余请自行百度。)的程序,安装完毕后可通过python中内置的pika模块来调用MQ发送或接收队列请求。接下来我们就看几种python调用MQ的模式(作者自定义中文形象的模式名称)与方法。
三、python调用Rabbitmq的模式
1.轮询消费模式
此模式下,发送队列的一方把消息存入mq的指定队列后,若有消费者端联入相应队列,即会获取到消息,并且队列中的消息会被消费掉。
若有多个消费端同时连接着队列,则会默认把队列的消息依次分发给各个消费者(c),跟负载均衡差不多。
接下来是代码实例:
producer生产者
import pika credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\',credentials=credentials)) channel = connection.channel() #建立了rabbit协议的通道 # 声明queue channel.queue_declare(queue=\'hello\') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange=\'\', routing_key=\'hello\', body=\'Hello World!\') print(" [x] Sent \'Hello World!\'") connection.close()
发送过队列后,可在MQ服务器中查看队列状态
consumer消费者
import pika credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\',credentials=credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we\'re not yet sure which program to run first. In such cases it\'s a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue=\'hello\') def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) #当消费者接受完消息后,返回标识符.此处是为了消息安全,防止消费端未成功接受消息导致消息丢失。 #收到消息就调用这个 channel.basic_consume(\'hello\', callback) print(\' [*] Waiting for messages. To exit press CTRL+C\') #开始消费 channel.start_consuming()
接收队列后,查看一下队列状态
这里一定要显式的发送确认消息`ch.basic_ack(delivery_tag=method.delivery_tag)明确的告诉服务器消息被处理了.
运行以上代码,可保证消息的安全性,防止消费端宕机未成功接受消息,从而导致消息丢失。
2.队列持久化
当rabbitMQ意外宕机时,可能会有持久化保存队列的需求(队列中的消息不消失)。
可以通过持久化(durable)来确保通道和消息都被保存到磁盘中进行持久化,但是由于从内存写入磁盘也需要时间,如果这段时间出现故障,则这些消息也是会丢失的.所以durable是一种弱的持久化.
增加以下红色部分的代码,可实现队列持久化。
producer
import pika credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() #建立了rabbit协议的通道 # 声明queue channel.queue_declare(queue=\'durable_queue\',durable=True) #durable=True保证队列持久化 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange=\'\', routing_key=\'durable_queue\', body=\'Hello World!\', properties=pika.BasicProperties( delivery_mode=2, # 保证消息持久化 ) ) print(" [x] Sent \'Hello World!\'") connection.close()
执行后查看队列,记下队列名字与队列中所含消息的数量。生产者在发送消息时,将消息的类型定义为delivery_mode=2,用来将消息持久化.
Rabbitmq服务器重启
rabbitmq-server restart
执行后查看队列,查看队列名字与队列中所含消息的数量
执行消费者代码
cunsumer
import pika credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue=\'durable_queue\', durable=True) # 因为生产端已经声明durable=True,若此处durable=True去掉,则报错 def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) #收到消息就调用这个 channel.basic_consume(\'durable_queue\', callback) print(\' [*] Waiting for messages. To exit press CTRL+C\') #开始消费 channel.start_consuming()
可正确接收到信息。
再次查看队列的情况。
3.消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=1)
消息持久化+公平分发的完整代码
生产者
import pika credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() #建立了rabbit协议的通道 # 声明queue channel.queue_declare(queue=\'durable_queue\',durable=True) #durable=True保证队列持久化 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange=\'\', routing_key=\'durable_queue\', body=\'Hello World!\', properties=pika.BasicProperties( delivery_mode=2, # 保证消息持久化 ) ) print(" [x] Sent \'Hello World!\'") connection.close()
消费者
import pika credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue=\'durable_queue\', durable=True) # 因为生产端已经声明durable=True,若此处durable=True去掉,则报错 def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) #收到消息就调用这个 channel.basic_consume(\'durable_queue\', callback) channel.basic_qos(prefetch_count=1) #公平分发 print(\' [*] Waiting for messages. To exit press CTRL+C\') #开始消费 channel.start_consuming()
4.Publish\Subscribe(消息发布\订阅)
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
交换是件很简单的事。在一端从生产者那里收消息,并将它们推送到queue中。Exchange必须非常清楚的知道。他从生产者那里收到的消息,要发给谁? 他是应该被追加到一个具体的queue里,还是发送到多个queue里,或者它应该被丢弃。该规则由Exchange类型定义。
Exchange的作用就是转发消息,给订阅者发消息。
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。(一共有四种类型)
a、fanout: 所有bind到此exchange的queue都可以接收消息 (给所有人发消息)
b、direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 (给指定的一些queue发消息)
c、topic(话题):所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 (给订阅话题的人发消息)
topics支持通配符,如下:
表达式符号说明:#代表一个或多个字符,*代表0个或一个字符
示例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
备注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
4、headers: 通过headers 来决定把消息发给哪些queue (通过消息头,决定发送给哪些队列)
a、fanout方式(给所有人发消息)
应用场景:
例如:视频直播
例如:新浪微博
一个明星,他有几千万的订阅用户,粉丝们想要收到他发送的微博消息(这里指:微博订阅的在线用户发送消息,不发给不在线的用户,发送消息)
生产者
import pika import sys credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=\'logs\', exchange_type=\'fanout\') message = \' \'.join(sys.argv[1:]) or "info: Hello World!" ##如果等于空,就输出hello world! channel.basic_publish(exchange=\'logs\', routing_key=\'\', body=message) print(" [x] Sent %r" % message) connection.close()
消费者
import pika credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() #指定发送类型 channel.exchange_declare(exchange=\'logs\', exchange_type=\'fanout\') result = channel.queue_declare(queue=\'\', exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue #随机生成的Q,绑定到exchange上面。 channel.queue_bind(exchange=\'logs\', queue=queue_name) print(\' [*] Waiting for logs. To exit press CTRL+C\') def callback(ch, method, properties, body): print(" [x] %r" % body) #收到消息就调用这个 channel.basic_consume(queue_name, callback) channel.start_consuming()
b、有选择的接收消息(exchange type=direct)
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
示例:
生产者
import pika import sys credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\') severity = sys.argv[1] if len(sys.argv) > 1 else \'info\' #严重程序,级别;判定条件到底是info,还是空,后面接消息 message = \' \'.join(sys.argv[2:]) or "Hello World!" channel.basic_publish(exchange=\'direct_logs\', routing_key=severity, #绑定的是:error 指定关键字(哪些队列绑定了,这个级别,那些队列就可以收到这个消息) body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消费者
import pika import sys credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() #指定发送类型 channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\') result = channel.queue_declare(queue=\'\', exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue severities = sys.argv[1:] #接收那些消息(指info,还是空),没写就报错 if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) #定义了三种接收消息方式info,warning,error sys.exit(1) for severity in severities: channel.queue_bind(exchange=\'direct_logs\', queue=queue_name, routing_key=severity) #循环绑定关键字 print(\' [*] Waiting for logs. To exit press CTRL+C\') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) #收到消息就调用这个 channel.basic_consume(queue_name, callback) channel.start_consuming()
执行结果:
首先,设置接收类型为:info、warning、 error 三个中的其中一种或多种类型,再从发送端指定发送给那种类型,后面再接要发送的消息。 #发送端 (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 direct_send.py info [x] Sent \'info\':\'Hello World!\' (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 direct_send.py error send error [x] Sent \'error\':\'send error\' (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 direct_send.py warnning send warnning [x] Sent \'warnning\':\'send warnning\' #接收端 (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 direct_receive.py info #指定接收类型为info [*] Waiting for logs. To exit press CTRL+C [x] \'info\':b\'Hello World!\' (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 direct_receive.py error #指定接收类型为error [*] Waiting for logs. To exit press CTRL+C [x] \'error\':b\'send error\' (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 direct_receive.py warnning #指定接收类型为warnning [*] Waiting for logs. To exit press CTRL+C [x] \'warnning\':b\'send warnning\'
c、更细致的消息过滤(exchange type=topic)
生产者
import pika import sys credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=\'topic_logs\', exchange_type=\'topic\') routing_key = sys.argv[1] if len(sys.argv) > 1 else \'info\' #严重程序,级别;判定条件到底是info,还是空,后面接消息 message = \' \'.join(sys.argv[2:]) or "Hello World!" channel.basic_publish(exchange=\'topic_logs\', routing_key=routing_key, #绑定的是:error 指定关键字(哪些队列绑定了,这个级别,那些队列就可以收到这个消息) body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
消费者
import pika import sys credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() #指定发送类型 channel.exchange_declare(exchange=\'topic_logs\', exchange_type=\'topic\') result = channel.queue_declare(queue=\'\', exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue binding_keys = sys.argv[1:] #接收那些消息(指info,还是空),没写就报错 if not binding_keys: sys.stderr.write("Usage: %s [binding_keys]\n" % sys.argv[0]) #定义了三种接收消息方式info,warning,error sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=\'topic_logs\', queue=queue_name, routing_key=binding_key) #循环绑定关键字 print(\' [*] Waiting for logs. To exit press CTRL+C\') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) #收到消息就调用这个 channel.basic_consume(queue_name, callback) channel.start_consuming()
执行结果:
#发送端 (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 topic_send.py error "mysql has error" [x] Sent \'error\':\'mysql has error\' (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 topic_send.py mysql.error "mysql has error" [x] Sent \'mysql.error\':\'mysql has error\' (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 topic_send.py mysql.error. "mysql has error" [x] Sent \'mysql.error.\':\'mysql has error\' (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 topic_send.py mysql.info "mysql has error" [x] Sent \'mysql.info\':\'mysql has error\' #接收端 (venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 topic_receive.py # #指定接收类型绑定#号,就是收所有消息,相当于广播,#代表一个或多个字符 [*] Waiting for logs. To exit press CTRL+C [x] \'error\':b\'mysql has error\' [x] \'mysql.error\':b\'mysql has error\' [x] \'mysql.error.\':b\'mysql has error\'
[x] \'mysql.info\':b\'mysql has error\'
(venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 topic_receive.py *.warning mysql.* #指定接收类型是*.warning和mysql.*,*代表匹配0个或者一个任何字符 [*] Waiting for logs. To exit press CTRL+C [x] \'mysql.error\':b\'mysql has error\'
[x] \'mysql.info\':b\'mysql has error\'
(venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 topic_receive.py mysql.info "mysql has error" [*] Waiting for logs. To exit press CTRL+C [x] \'mysql.info\':b\'mysql has error\' venv) F:\learnpython\learn_django\learn_rabbitMQ>python3 topic_receive.py *.error.* [*] Waiting for logs. To exit press CTRL+C [x] \'mysql.error.\':b\'mysql has error\'
d、Remote procedure call (RPC) 双向的
应用场景:
示例:实现RPC服务功能
生产者
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') self.connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=self.credentials)) self.channel = self.connection.channel() # 接收命令结果 result = self.channel.queue_declare(queue=\'\',exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.callback_queue, #准备接受命令结果 self.on_response) def on_response(self, ch, method, props, body): """"callback方法""" if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) #唯一标识符 #发送命令 self.channel.basic_publish(exchange=\'\', routing_key=\'rpc_queue\', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() #检查队列里有没有新消息,但不会阻塞 return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
消费者
import pika import time credentials = pika.PlainCredentials(\'edwin\', \'edwin123\') connection = pika.BlockingConnection(pika.ConnectionParameters( \'192.168.190.128\', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue=\'rpc_queue\') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): """"callback方法""" n = int(body) print(" [.] fib(%s)" % n) response = fib(n) #发送命令结果 ch.basic_publish(exchange=\'\', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(\'rpc_queue\', on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()
执行结果
#生产端 [x] Requesting fib(30) [.] Got 832040 #消费端 [x] Requesting fib(30) [.] Got 832040