TF511

运行rabbitmq服务必须先启动rabbitmq,服务夯住了才能进行 发送--接受 操作

rabbitMQ常用命令:

rabbitmq-server          (启动rabbitmq, 必须进入rabbitmq安装sbin目录下执行)

rabbitmqctl list_queues (查看所有队列信息)

rabbitmqctl stop_app     (关闭应用)

rabbitmqctl start_app   (开启应用,启动应用,和上述关闭命令配合使用,达到清空队列的目的)
 
rabbitmqctl reset       (清除所有队列)

rabbitmqctl status       (查看运行信息)

rabbitmqctl list_exchanges   (查看 exchange(过滤) 模式)

rabbitmqctl list_bindings   (查看 捆绑 信息 )

 

如何保证数据不丢失?

  1. 1. 在队列里,设置durable=true  #代表 队列持久化,就算断电队列也不会消失,但是消息会丢失
    2. 在生产者端,
      properties = pika.BasicProperties(
            delivery_mode=2, #确保生产者生产的消息持久化到队列里面,就算断电消息也不会消失
        )
    3. 在消费者端 设置auto_ack
    channel.basic_consume(
      #auto_ack=False, 确认报消费者取到消息,每次消费者获取消息的时候都会和生产这进行确认; #auto_ack=True,不进行确认,
      queue=\'test\', on_message_callback=callback, auto_ack=False)
       
    ps:白话 关于auto_ack的设置,为True时 负责任的态度,生产者会关注消费者是否拿到消息(消费者拿取得时候,会告诉生产者已经拿到),为False时,不关注消费者是否消费,(不负责任的态度,只管生产不管你是否消费)

exchange的三种模式:

fanout : 广播 -----所有捆绑了得客户都能收到消息。 direct : 组播 -------只有同一组的客户才能收到。 topic : 规则波 ------ 满足某一要求的所有可客户都能收到。

 

 

关于公司月活两等问题

QPS: 每秒访问的次数 DAU: 日活跃用户数 MAU: 月活跃用户数

关于:QPS  、DAU、这些统计图表都是运维做的,我们不做,他们做好后有一个链接,我们点进去就能直接看到了。 

 

1、linux ----- docker

2、shell 脚本语言

3、numpy,pandas

4、后端面试常问: mysql、队列相关、缓存相关、服务器相关

常规使用队列:

producer.py

import pika
#建立连接
connection = pika.BlockingConnection(
   pika.ConnectionParameters(host=\'localhost\'))
#实例化一个对象
channel = connection.channel()

### 声明队列 ,durable=True 开启队列持久化,就算断电队列也不会消失
channel.queue_declare(queue=\'test\',durable=True)
#发送的信息
channel.basic_publish(exchange=\'\',
                     routing_key=\'test\',
                     body=\'Hello World!\',
                     #确保生产者生产的消息持久化到队列里面,就算断电消息也不会消失
                     properties = pika.BasicProperties(
                       delivery_mode=2,  # make message persistent
                    )
                    )

print(" [x] Sent \'Hello World!\'")
#回收资源
connection.close()

consumer.py

import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host=\'localhost\'))

channel = connection.channel()
#建立连接
channel.queue_declare(queue=\'test\', durable=True)
#定义回调方法,
def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
   ch.basic_ack(delivery_tag=method.delivery_tag)
#配置参数
channel.basic_consume(
   #auto_ack=False, 确认报消费者取到消息,每次消费者获取消息的时候都会和生产这进行确认。auto_ack=True,不进行确认,
   queue=\'test\', on_message_callback=callback, auto_ack=False)

print(\' [*] Waiting for messages. To exit press CTRL+C\')
#开始接收
channel.start_consuming()

exchange 的三种方式实现:

第一种:fanout

producer.py

import sys
import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host=\'localhost\'))

channel = connection.channel()
#设置消息过滤条件与 发送级别   条件logs 级别fanout---广播
channel.exchange_declare(exchange=\'logs\', exchange_type=\'fanout\')

#会自动创建一个连接着 logs 的 唯一的队列:queue_name
result = channel.queue_declare(\'\', exclusive=True) ### exclusive 排他的 唯一的

queue_name = result.method.queue
print("queue_name:", queue_name)

channel.queue_bind(exchange=\'logs\', queue=queue_name)

print(\' [*] Waiting for logs. To exit press CTRL+C\')

def callback(ch, method, properties, body):
   print(" [x] %r" % body)
#打包发送
channel.basic_consume(
   queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

consumer.py

import sys
import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host=\'localhost\'))

channel = connection.channel()
#确定接收消息 条件与logs级别fanout
channel.exchange_declare(exchange=\'logs\',exchange_type=\'fanout\')

message = \' \'.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange=\'logs\', routing_key=\'\', body=message)

print(" [x] Sent %r" % message)

connection.close()
第二种:direct --- 组播

producer.py

import sys
import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\')
#设置 过滤 级别
log_levels = sys.argv[1] if len(sys.argv) > 1 else \'info\'

message = \' \'.join(sys.argv[2:]) or \'Hello World!\'

channel.basic_publish(
   exchange=\'direct_logs\', routing_key=log_levels, body=message)
print(" [x] Sent %r:%r" % (log_levels, message))
connection.close()

consumer.py

import sys
import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\')

result = channel.queue_declare(\'\', exclusive=True)
queue_name = result.method.queue

#拿到接收的 级别
log_levels = sys.argv[1:]
if not log_levels:
   sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
   sys.exit(1)
#可能有多个级别
for severity in log_levels:
   channel.queue_bind(
       exchange=\'direct_logs\', queue=queue_name, routing_key=severity)

print(\' [*] Waiting for logs. To exit press CTRL+C\')

def callback(ch, method, properties, body):
   print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(
   queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
第三种:topic - ------规则播

PS: 过滤条件用 正则表达式 表示

producer.py

import sys
import pika
connection = pika.BlockingConnection(
   pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'topic_logs\', exchange_type=\'topic\')

#这里上面不同,这里是用 点 距隔
routing_key = sys.argv[1] if len(sys.argv) > 2 else \'anonymous.info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'

channel.basic_publish(
   exchange=\'topic_logs\', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

consumer.py

import pika
import sys

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'topic_logs\', exchange_type=\'topic\')

result = channel.queue_declare(\'\', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
   sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
   sys.exit(1)

for binding_key in binding_keys:
   channel.queue_bind(
       exchange=\'topic_logs\', queue=queue_name, routing_key=binding_key)

print(\' [*] Waiting for logs. To exit press CTRL+C\')


def callback(ch, method, properties, body):
   print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
   queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

 

分类:

技术点:

相关文章: