rabbitMQ常用命令:
rabbitmq-server (启动rabbitmq, 必须进入rabbitmq安装sbin目录下执行)
rabbitmqctl list_queues (查看所有队列信息)
rabbitmqctl stop_app (关闭应用)
rabbitmqctl start_app (开启应用,启动应用,和上述关闭命令配合使用,达到清空队列的目的)
rabbitmqctl reset (清除所有队列)
rabbitmqctl status (查看运行信息)
rabbitmqctl list_exchanges (查看 exchange(过滤) 模式)
rabbitmqctl list_bindings (查看 捆绑 信息 )
如何保证数据不丢失?
-
1. 在队列里,设置durable=true #代表 队列持久化,就算断电队列也不会消失,但是消息会丢失
2. 在生产者端,
properties = pika.BasicProperties(
delivery_mode=2, #确保生产者生产的消息持久化到队列里面,就算断电消息也不会消失
)
3. 在消费者端 设置auto_ack
channel.basic_consume(
#auto_ack=False, 确认报消费者取到消息,每次消费者获取消息的时候都会和生产这进行确认; #auto_ack=True,不进行确认,
queue=\'test\', on_message_callback=callback, auto_ack=False)
ps:白话 关于auto_ack的设置,为True时 负责任的态度,生产者会关注消费者是否拿到消息(消费者拿取得时候,会告诉生产者已经拿到),为False时,不关注消费者是否消费,(不负责任的态度,只管生产不管你是否消费)
exchange的三种模式:
fanout : 广播 -----所有捆绑了得客户都能收到消息。 direct : 组播 -------只有同一组的客户才能收到。 topic : 规则波 ------ 满足某一要求的所有可客户都能收到。
关于公司月活两等问题
QPS: 每秒访问的次数 DAU: 日活跃用户数 MAU: 月活跃用户数
关于:QPS 、DAU、这些统计图表都是运维做的,我们不做,他们做好后有一个链接,我们点进去就能直接看到了。
1、linux ----- docker
2、shell 脚本语言
3、numpy,pandas
4、后端面试常问: mysql、队列相关、缓存相关、服务器相关
常规使用队列:
producer.py
import pika
#建立连接
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
#实例化一个对象
channel = connection.channel()
### 声明队列 ,durable=True 开启队列持久化,就算断电队列也不会消失
channel.queue_declare(queue=\'test\',durable=True)
#发送的信息
channel.basic_publish(exchange=\'\',
routing_key=\'test\',
body=\'Hello World!\',
#确保生产者生产的消息持久化到队列里面,就算断电消息也不会消失
properties = pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent \'Hello World!\'")
#回收资源
connection.close()
consumer.py
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
#建立连接
channel.queue_declare(queue=\'test\', durable=True)
#定义回调方法,
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
#配置参数
channel.basic_consume(
#auto_ack=False, 确认报消费者取到消息,每次消费者获取消息的时候都会和生产这进行确认。auto_ack=True,不进行确认,
queue=\'test\', on_message_callback=callback, auto_ack=False)
print(\' [*] Waiting for messages. To exit press CTRL+C\')
#开始接收
channel.start_consuming()
exchange 的三种方式实现:
第一种:fanout
producer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
#设置消息过滤条件与 发送级别 条件logs 级别fanout---广播
channel.exchange_declare(exchange=\'logs\', exchange_type=\'fanout\')
#会自动创建一个连接着 logs 的 唯一的队列:queue_name
result = channel.queue_declare(\'\', exclusive=True) ### exclusive 排他的 唯一的
queue_name = result.method.queue
print("queue_name:", queue_name)
channel.queue_bind(exchange=\'logs\', queue=queue_name)
print(\' [*] Waiting for logs. To exit press CTRL+C\')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
#打包发送
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
consumer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
#确定接收消息 条件与logs级别fanout
channel.exchange_declare(exchange=\'logs\',exchange_type=\'fanout\')
message = \' \'.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=\'logs\', routing_key=\'\', body=message)
print(" [x] Sent %r" % message)
connection.close()
第二种:direct --- 组播
producer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\')
#设置 过滤 级别
log_levels = sys.argv[1] if len(sys.argv) > 1 else \'info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
channel.basic_publish(
exchange=\'direct_logs\', routing_key=log_levels, body=message)
print(" [x] Sent %r:%r" % (log_levels, message))
connection.close()
consumer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\')
result = channel.queue_declare(\'\', exclusive=True)
queue_name = result.method.queue
#拿到接收的 级别
log_levels = sys.argv[1:]
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
#可能有多个级别
for severity in log_levels:
channel.queue_bind(
exchange=\'direct_logs\', queue=queue_name, routing_key=severity)
print(\' [*] Waiting for logs. To exit press CTRL+C\')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
第三种:topic - ------规则播
PS: 过滤条件用 正则表达式 表示
producer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
channel.exchange_declare(exchange=\'topic_logs\', exchange_type=\'topic\')
#这里上面不同,这里是用 点 距隔
routing_key = sys.argv[1] if len(sys.argv) > 2 else \'anonymous.info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
channel.basic_publish(
exchange=\'topic_logs\', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
consumer.py
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
channel.exchange_declare(exchange=\'topic_logs\', exchange_type=\'topic\')
result = channel.queue_declare(\'\', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange=\'topic_logs\', queue=queue_name, routing_key=binding_key)
print(\' [*] Waiting for logs. To exit press CTRL+C\')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()