RabbitMQ队列

基础

  1. 安装erlang: http://www.erlang.org/downloads

  2. 安装rabbitmq:  http://www.rabbitmq.com

  3. 安装rabbitmq moudle: pip install pika

  4. RabbitMQ介绍:http://www.rabbitmq.com/tutorials/tutorial-six-python.html
  5. RabbitMQ web 登入:  http://localhost:15672

RabbitMQ是一个消息代理:它接受和转发消息,类似邮局的角色。

实例一:

send端:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 建立连接
 4 channel = connection.channel()   # 声明一个管道
 5 channel.queue_declare('hello')   # 声明一个队列
 6 channel.basic_publish(
 7                         exchange='',
 8                         routing_key='hello',
 9                         body= 'Hello Word')
10 # 发送消息
11 # routing_key : 队列名
12 # body: 消息内容
13 print('[x] sent "Hello World!"')
14 connection.close()   # 关闭连接,channel不需要关闭

 receive 端:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 建立连接
 4 channel = connection.channel()          # 定义管道
 5 channel.queue_declare(queue='hello')    
 6 # 声明队列,send端和recv端只需声明一次就好,但是不确定send端是否声明过,所以在两边可以都声明一次,这并不冲突
 7 
 8 def callback(ch, method,properties,body): 
 9     """
10     收到消息,调用函数
11     :param ch: 管道
12     :param method:  
13     :param properties: 
14     :param body: 消息
15     :return: 
16     """
17     print('reeive %r' % body)
18     
19 channel.basic_consume(
20                         callback,
21                         queue='hello',
22                         no_ack=True
23 )
24 print('wait for message.Press Ctrl+C to exit')
25 channel.start_consuming()   # 持续接收消息,如果没有消息就阻塞

 执行结果:

  • send端发送一条消息就结束
  • recv端持续接受消息,一收到消息就调用回调函数,否则一直阻塞
  • 如果send端没有打开,每次send的消息会存在rabbitmq服务器上,直到有recv端接入接受消息。

 实例二:

  RabbitMQ server采用轮询机制发送消息。开启多个recv端,send端发送消息,轮询recv端接收消息(每一次send,只会有一个recv端接收消息)

  recv端 : channel.basic_consume参数no_ack=True  recv端是否在调用完成回调函数后给send端一个确认;

    • False:默认值,需要接收确认
    • True:执行后不确认,也就是服务端把一个消息分发出去后就不管了

情景模拟:no_ack=False,回调函数添加消息处理完毕的确认语句:ch.basic_ack(delivery_tag=method.delivery_tag);在回调函数中添加sleep(),在recv端没处理完消息就中断,观察其他recv端的接受情况。

 1 import pika
 2 import time
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     'localhost'))
 6 channel = connection.channel()
 7 
 8 
 9 def callback(ch, method, properties, body):
10     print(" [x] Received %r" % body)
11     time.sleep(20)
12     print(" [x] Done")
13     print("method.delivery_tag", method.delivery_tag)
14     ch.basic_ack(delivery_tag=method.delivery_tag)
15 
16 
17 channel.basic_consume(callback,
18                       queue='task_queue',
19                       )
20 
21 print(' [*] Waiting for messages. To exit press CTRL+C')
22 channel.start_consuming()
recv端

相关文章:

  • 2021-08-12
  • 2021-09-29
  • 2022-12-23
  • 2021-12-04
  • 2022-12-23
  • 2022-12-23
  • 2021-11-24
  • 2022-01-05
猜你喜欢
  • 2021-10-07
  • 2022-12-23
  • 2021-08-28
  • 2021-06-06
相关资源
相似解决方案