目录:
1、RabbitMQ
2、Redis
内容:
1、RabbitMQ
实现简单的队列通信
send端
import pika
credentials = pika.PlainCredentials('admin','admin')
parameters = pika.ConnectionParameters(credentials=credentials)
parameters = pika.ConnectionParameters('192.168.52.155',5672,'/',credentials)
connection = pika.BlockingConnection(parameters) #建立原生socket
channel = connection.channel() #建立一个管道,在管道中发送消息
channel.queue_declare(queue='hello') #声明queue
channel.basic_publish(
exchange='',
routing_key='hello', #就是queue名字
body='Hellow World!' #消息内容
) #通过basic_publish来发消息
print('[x] Sent "Hello World!"')
connection.close()
receive端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
Work Queues
在这种模式下,RabbitMQ会默认把P发的消息依次发给各个消费者C,跟负载均衡差不多
producer
import pika credentials = pika.PlainCredentials('admin','admin') parameters = pika.ConnectionParameters(credentials=credentials) parameters = pika.ConnectionParameters('192.168.52.157',5672,'/',credentials) connection = pika.BlockingConnection(parameters) #建立原生socket channel = connection.channel() #建立一个管道,在管道中发送消息 channel.queue_declare(queue='hello') #声明queue channel.basic_publish( exchange='', routing_key='hello', #就是queue名字 body='Hellow World!' #消息内容 ) #通过basic_publish来发消息 print('[x] Sent "Hello World!"') connection.close()
consumer
import pika credentials = pika.PlainCredentials('admin','admin') parameters = pika.ConnectionParameters(credentials=credentials) parameters = pika.ConnectionParameters('192.168.52.157',5672,'/',credentials) connection = pika.BlockingConnection(parameters) #建立原生socket channel = connection.channel() #建立一个管道,在管道中发送消息 channel.queue_declare(queue='hello') #消费者声明队列是为了避免消费者开始运行而生产者没有运行 def callback(ch, method, properties, body): print('-->',ch,method,properties) print('[x] Recevied %r' % body) ch.basic_ack(delivery_tag=method.delivery_tag) #添加收到消息确认机制,收到后将queue中消息删除 channel.basic_consume(#消费消息 callback, #如果收到消息就调用这个函数来处理消息 queue='hello', #no_ack=True 不管处不处理都不会给服务器端发确认,一般不开 ) #在没有no_ack情况下,如果一个consumer断了就会发给另一个consumer,只有收到consumer的确认才会删除 print('[*] Waiting for messages. To exit press Crlt + C') channel.start_consuming() #启动start会开始收,一直运行,没有就卡住,单进程阻塞式,没有消息就卡住
消息持久化
We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.
First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:队列持久化需要加durable=True
channel.queue_declare(queue='hello', durable=True)
Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:
channel.queue_declare(queue='task_queue', durable=True)
This queue_declare change needs to be applied to both the producer and consumer code.
At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))#加delivery_mode = 2 消息持久化
消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=1)
带消息持久化+公平分发的完整代码
生产者端
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消费者端
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
Publish\Subscribe(消息发布\订阅)
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
消息publisher
#广播消息是实时的 import pika credentials = pika.PlainCredentials('admin','admin') parameters = pika.ConnectionParameters(credentials=credentials) parameters = pika.ConnectionParameters('192.168.52.157',5672,'/',credentials) connection = pika.BlockingConnection(parameters) #建立原生socket channel = connection.channel() #建立一个管道,在管道中发送消息 channel.exchange_declare( exchange='logs', type='fanout' ) message = 'info: Hello World!' channel.basic_publish( exchange='logs', routing_key='', body=message ) print('[x] Sent %s' %message) connection.close()