1.RabbitMQ 消息队列
python 中包含threading queue 和 multiprocessing queue 这两种队列只能在一个程序中进行交互。
RabbitMQ 可以在不同类型的程序之间进行交互。
2. 安装RabbitMQ
(1).安装Erlang
(2).安装RabbitMQ
http://www.rabbitmq.com/install-standalone-mac.html
(3).安装pika
pip install pika
3.简单实现队列通信
send端
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 \'localhost\')) #建立一个socket 6 channel = connection.channel() #声明一个管道 7 8 # 声明queue 9 channel.queue_declare(queue=\'hello\') 10 11 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 12 channel.basic_publish(exchange=\'\', 13 routing_key=\'hello\', #queue名 14 body=\'Hello World!\') #消息内容 15 print(" [x] Sent \'Hello World!\'") 16 connection.close()
receive端
1 # _*_coding:utf-8_*_ 2 __author__ = \'Alex Li\' 3 import pika 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 \'localhost\')) 7 channel = connection.channel() 8 9 # You may ask why we declare the queue again ‒ we have already declared it in our previous code. 10 # We could avoid that if we were sure that the queue already exists. For example if send.py program 11 # was run before. But we\'re not yet sure which program to run first. In such cases it\'s a good 12 # practice to repeat declaring the queue in both programs. 13 channel.queue_declare(queue=\'hello\') 14 15 16 def callback(ch, method, properties, body): 17 \'\'\' 18 ch:管道内存对象地址 19 method:包含接收发送对象信息 20 properties: 21 \'\'\' 22 print(" [x] Received %r" % body) 23 24 #接收hello队列消息 25 channel.basic_consume(callback, #如果收到消息就调用callback函数来处理消息 26 queue=\'hello\',#指定收消息的队列 27 no_ack=True) 28 29 print(\' [*] Waiting for messages. To exit press CTRL+C\') 30 channel.start_consuming()
4.rabbitmq 消息轮巡
当有多个接收消息者时,每个接收者将轮流接收消息。
channel.basic_consume(callback, #如果收到消息就调用callback函数来处理消息
queue=\'hello\',#指定收消息的队列
no_ack=True) # 消息处理完毕后不向发送端返回信号
注: 当no_ack 缺省时,发送端必须要接收到接收返回信号后,才会把发送的消息从队列中删除。
在callback中加入,手动返回接收信号:ch.basic_ack(delivery_tag=method.delivery_tag) #手动和发送端进行确认
5.rabbitmq 消息持久化
H:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.10\sbin 目录下包含RabbitMQ的一些管理命令。
为了确保RabbitMQ down机时所有消息丢失,因此要进行消息队列持久化:
channel.queue_declare(queue=\'hello\', durable=True) #队列持久化,队列中的消息仍然丢失
channel.basic_publish(exchange=\'\',
routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # 消息持久化 ))
send端
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 \'localhost\')) #建立一个socket 6 channel = connection.channel() #声明一个管道 7 8 # 声明queue 9 channel.queue_declare(queue=\'hello2\',durable=True) #队列持久化 10 11 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 12 channel.basic_publish(exchange=\'\', 13 routing_key=\'hello2\', #queue名 14 body=\'Hello World!\', 15 properties = pika.BasicProperties(delivery_mode=2, ) # 消息持久化 16 ) #消息内容 17 print(" [x] Sent \'Hello World!\'") 18 connection.close()
receive端
1 # _*_coding:utf-8_*_ 2 __author__ = \'Alex Li\' 3 import pika,time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 \'localhost\')) 7 channel = connection.channel() 8 9 # You may ask why we declare the queue again ‒ we have already declared it in our previous code. 10 # We could avoid that if we were sure that the queue already exists. For example if send.py program 11 # was run before. But we\'re not yet sure which program to run first. In such cases it\'s a good 12 # practice to repeat declaring the queue in both programs. 13 channel.queue_declare(queue=\'hello2\',durable=True) #队列持久化 14 15 16 def callback(ch, method, properties, body): 17 \'\'\' 18 ch:管道内存对象地址 19 method:包含接收发送对象信息 20 properties: 21 \'\'\' 22 time.sleep(3) 23 print(" [x] Received %r" % body) 24 #ch.basic_ack(delivery_tag=method.delivery_tag) # 手动和发送端进行确认 25 26 #接收hello队列消息 27 channel.basic_consume(callback, #如果收到消息就调用callback函数来处理消息 28 queue=\'hello2\', 29 )#指定收消息的队列) # 30 31 print(\' [*] Waiting for messages. To exit press CTRL+C\') 32 channel.start_consuming()
6.RabbitMQ fanout广播模式
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=1)
发送端
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 \'localhost\')) #建立一个socket 6 channel = connection.channel() #声明一个管道 7 8 # 声明queue 9 channel.queue_declare(queue=\'hello2\',durable=True) #队列持久化 10 11 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 12 channel.basic_publish(exchange=\'\', 13 routing_key=\'hello2\', #queue名 14 body=\'Hello World!\', 15 properties = pika.BasicProperties(delivery_mode=2, ) # 消息持久化 16 ) #消息内容 17 print(" [x] Sent \'Hello World!\'") 18 connection.close()
接收端1
1 # _*_coding:utf-8_*_ 2 __author__ = \'Alex Li\' 3 import pika,time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 \'localhost\')) 7 channel = connection.channel() 8 9 # You may ask why we declare the queue again ‒ we have already declared it in our previous code. 10 # We could avoid that if we were sure that the queue already exists. For example if send.py program 11 # was run before. But we\'re not yet sure which program to run first. In such cases it\'s a good 12 # practice to repeat declaring the queue in both programs. 13 channel.queue_declare(queue=\'hello2\',durable=True) #队列持久化 14 15 16 def callback(ch, method, properties, body): 17 \'\'\' 18 ch:管道内存对象地址 19 method:包含接收发送对象信息 20 properties: 21 \'\'\' 22 #time.sleep(20) 23 print(" [x] Received %r" % body) 24 ch.basic_ack(delivery_tag=method.delivery_tag) # 手动和发送端进行确认 25 26 # 当接收者正在处理消息时,不再向其发送新消息 27 channel.basic_qos(prefetch_count=1) 28 #接收hello队列消息 29 channel.basic_consume(callback, #如果收到消息就调用callback函数来处理消息 30 queue=\'hello2\', 31 )#指定收消息的队列) # 32 33 print(\' [*] Waiting for messages. To exit press CTRL+C\') 34 channel.start_consuming()
接收端2
1 # _*_coding:utf-8_*_ 2 __author__ = \'Alex Li\' 3 import pika,time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 \'localhost\')) 7 channel = connection.channel() 8 9 # You may ask why we declare the queue again ‒ we have already declared it in our previous code. 10 # We could avoid that if we were sure that the queue already exists. For example if send.py program 11 # was run before. But we\'re not yet sure which program to run first. In such cases it\'s a good 12 # practice to repeat declaring the queue in both programs. 13 channel.queue_declare(queue=\'hello2\',durable=True) #队列持久化 14 15 16 def callback(ch, method, properties, body): 17 \'\'\' 18 ch:管道内存对象地址 19 method:包含接收发送对象信息 20 properties: 21 \'\'\' 22 time.sleep(20) 23 print(" [x] Received %r" % body) 24 ch.basic_ack(delivery_tag=method.delivery_tag) # 手动和发送端进行确认 25 26 # 当接收者正在处理消息时,不再向其发送新消息 27 channel.basic_qos(prefetch_count=1) 28 #接收hello队列消息 29 channel.basic_consume(callback, #如果收到消息就调用callback函数来处理消息 30 queue=\'hello2\', 31 )#指定收消息的队列) # 32 33 print(\' [*] Waiting for messages. To exit press CTRL+C\') 34 channel.start_consuming()
1对多广播发送消息
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。
Exchange 类型
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
fanout 广播发送
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=\'localhost\')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=\'logs\', 9 type=\'fanout\') 10 11 message = \' \'.join(sys.argv[1:]) or "info: Hello World!" 12 channel.basic_publish(exchange=\'logs\', 13 routing_key=\'\', 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close()
fanout 广播接收
1 # _*_coding:utf-8_*_ 2 __author__ = \'Alex Li\' 3 import pika 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host=\'localhost\')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange=\'logs\', 10 type=\'fanout\') 11 12 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 13 queue_name = result.method.queue #获得上面随机生成的queue名 14 15 channel.queue_bind(exchange=\'logs\', 16 queue=queue_name) 17 18 print(\' [*] Waiting for logs. To exit press CTRL+C\') 19 20 21 def callback(ch, method, properties, body): 22 print(" [x] %r" % body) 23 24 25 channel.basic_consume(callback, 26 queue=queue_name, 27 no_ack=True) 28 29 channel.start_consuming()
direct 广播发送
direct send 端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=\'localhost\')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=\'direct_logs\', 9 type=\'direct\') 10 11 severity = sys.argv[1] if len(sys.argv) > 1 else \'info\' 12 message = \' \'.join(sys.argv[2:]) or \'Hello World!\' 13 channel.basic_publish(exchange=\'direct_logs\', 14 routing_key=severity, 15 body=message) 16 print(" [x] Sent %r:%r" % (severity, message)) 17 connection.close()
direct receive 端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=\'localhost\')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=\'direct_logs\', 9 type=\'direct\') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 severities = sys.argv[1:] 15 if not severities: 16 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 17 sys.exit(1) 18 19 for severity in severities: 20 channel.queue_bind(exchange=\'direct_logs\', 21 queue=queue_name, 22 routing_key=severity) 23 24 print(\' [*] Waiting for logs. To exit press CTRL+C\') 25 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 31 channel.basic_consume(callback, 32 queue=queue_name, 33 no_ack=True) 34 35 channel.start_consuming()
topic 广播发送
To receive all the logs run:
python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
topic send端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=\'localhost\')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=\'topic_logs\', 9 type=\'topic\') 10 11 routing_key = sys.argv[1] if len(sys.argv) > 1 else \'anonymous.info\' 12 message = \' \'.join(sys.argv[2:]) or \'Hello World!\' 13 channel.basic_publish(exchange=\'topic_logs\', 14 routing_key=routing_key, 15 body=message) 16 print(" [x] Sent %r:%r" % (routing_key, message)) 17 connection.close()
topic receive端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=\'localhost\')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=\'topic_logs\', 9 type=\'topic\') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 binding_keys = sys.argv[1:] 15 if not binding_keys: 16 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 17 sys.exit(1) 18 19 for binding_key in binding_keys: 20 channel.queue_bind(exchange=\'topic_logs\', 21 queue=queue_name, 22 routing_key=binding_key) 23 24 print(\' [*] Waiting for logs. To exit press CTRL+C\') 25 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 31 channel.basic_consume(callback, 32 queue=queue_name, 33 no_ack=True) 34 35 channel.start_consuming()
7.RabbitMQ rpc
server端
1 # _*_coding:utf-8_*_ 2 __author__ = \'Alex Li\' 3 import pika 4 import time 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host=\'localhost\')) 8 9 channel = connection.channel() 10 11 channel.queue_declare(queue=\'rpc_queue\') 12 13 14 def fib(n): 15 if n == 0: 16 return 0 17 elif n == 1: 18 return 1 19 else: 20 return fib(n - 1) + fib(n - 2) 21 22 23 def on_request(ch, method, props, body): 24 n = int(body) 25 26 print(" [.] fib(%s)" % n) 27 response = fib(n) 28 29 ch.basic_publish(exchange=\'\', 30 routing_key=props.reply_to, 31 properties=pika.BasicProperties(correlation_id= \ 32 props.correlation_id), 33 body=str(response)) 34 ch.basic_ack(delivery_tag=method.delivery_tag) 35 36 37 channel.basic_qos(prefetch_count=1) 38 channel.basic_consume(on_request, queue=\'rpc_queue\') 39 40 print(" [x] Awaiting RPC requests") 41 channel.start_consuming()
client端
1 import pika 2 import uuid 3 4 5 class FibonacciRpcClient(object): 6 def __init__(self): 7 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 8 host=\'localhost\')) 9 10 self.channel = self.connection.channel() 11 12 result = self.channel.queue_declare(exclusive=True) 13 self.callback_queue = result.method.queue 14 15 self.channel.basic_consume(self.on_response, no_ack=True, 16 queue=self.callback_queue) 17 18 def on_response(self, ch, method, props, body): 19 if self.corr_id == props.correlation_id: 20 self.response = body 21 22 def call(self, n): 23 self.response = None 24 self.corr_id = str(uuid.uuid4()) 25 self.channel.basic_publish(exchange=\'\', 26 routing_key=\'rpc_queue\', 27 properties=pika.BasicProperties( 28 reply_to=self.callback_queue, 29 correlation_id=self.corr_id, 30 ), 31 body=str(n)) 32 while self.response is None: 33 self.connection.process_data_events() #非阻塞版的start_consuming() 34 return int(self.response) 35 36 37 fibonacci_rpc = FibonacciRpcClient() 38 39 print(" [x] Requesting fib(30)") 40 response = fibonacci_rpc.call(30) 41 print(" [.] Got %r" % response)
8.缓存系统
redis: 可以持久化
http://www.cnblogs.com/alex3714/articles/6217453.html
memcached: 轻量级的缓存系统,不能持久化。
http://www.cnblogs.com/wupeiqi/articles/5132791.html
9.redis
建立redis连接
1 import redis 2 3 r = redis.Redis(host=\'127.0.0.1\', port=6379) 4 r.set(\'foo\', \'Bar\') 5 print(r.get(\'foo\'))
连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。
redis String 操作
redis中的String在在内存中按照一个name对应一个value来存储。如图:
set(name, value, ex=None, px=None, nx=False, xx=False)
1 在Redis中设置值,默认,不存在则创建,存在则修改 2 参数: 3 ex,过期时间(秒) 4 px,过期时间(毫秒) 5 nx,如果设置为True,则只有name不存在时,当前set操作才执行 6 xx,如果设置为True,则只有name存在时,岗前set操作才执行
setnx(name, value)
1 设置值,只有name不存在时,执行设置操作(添加)
setex(name, value, time)
1 # 设置值 2 # 参数: 3 # time,过期时间(数字秒 或 timedelta对象)
psetex(name, time_ms, value)
1 # 设置值 2 # 参数: 3 # time_ms,过期时间(数字毫秒 或 timedelta对象)
mset(*args, **kwargs)
1 批量设置值 2 如: 3 mset(k1=\'v1\', k2=\'v2\') 4 或 5 mget({\'k1\': \'v1\', \'k2\': \'v2\'})
get(name)#获取值
mget(keys, *args)
1 批量获取 2 如: 3 mget(\'ylr\', \'wupeiqi\') 4 或 5 r.mget([\'ylr\', \'wupeiqi\'])
getset(name, value)#设置新值并获取原来的值
getrange(key, start, end)
1 # 获取子序列(根据字节获取,非字符) 2 # 参数: 3 # name,Redis 的 name 4 # start,起始位置(字节) 5 # end,结束位置(字节) 6 # 如: "武沛齐" ,0-3表示 "武"
setrange(name, offset, value)
1 # 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加) 2 # 参数: 3 # offset,字符串的索引,字节(一个汉字三个字节) 4 # value,要设置的值
setbit(name, offset, value)
1 # 对name对应值的二进制表示的位进行操作 2 3 # 参数: 4 # name,redis的name 5 # offset,位的索引(将值变换成二进制后再进行索引) 6 # value,值只能是 1 或 0 7 8 # 注:如果在Redis中有一个对应: n1 = "foo", 9 那么字符串foo的二进制表示为:01100110 01101111 01101111 10 所以,如果执行 setbit(\'n1\', 7, 1),则就会将第7位设置为1, 11 那么最终二进制则变成 01100111 01101111 01101111,即:"goo" 12 13 # 扩展,转换二进制表示: 14 15 # source = "武沛齐" 16 source = "foo" 17 18 for i in source: 19 num = ord(i) 20 print bin(num).replace(\'b\',\'\') 21 22 特别的,如果source是汉字 "武沛齐"怎么办? 23 答:对于utf-8,每一个汉字占 3 个字节,那么 "武沛齐" 则有 9个字节 24 对于汉字,for循环时候会按照 字节 迭代,那么在迭代时,将每一个字节转换 十进制数,然后再将十进制数转换成二进制 25 11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000 26 -------------------------- ----------------------------- ----------------------------- 27 武 沛 齐
用途举例,用最省空间的方式,存储在线用户数及分别是哪些用户在线
getbit(name, offset)# 获取name对应的值的二进制表示中的某位的值 (0或1)
bitcount(key, start=None, end=None)
1 # 获取name对应的值的二进制表示中 1 的个数 2 # 参数: 3 # key,Redis的name 4 # start,位起始位置 5 # end,位结束位置
strlen(name)# 返回name对应值的字节长度(一个汉字3个字节)
incr(self, name, amount=1)
1 # 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 2 3 # 参数: 4 # name,Redis的name 5 # amount,自增数(必须是整数) 6 7 # 注:同incrby
incrbyfloat(self, name, amount=1.0)
1 # 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 2 3 # 参数: 4 # name,Redis的name 5 # amount,自增数(浮点型)
decr(self, name, amount=1)
1 # 自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。 2 3 # 参数: 4 # name,Redis的name 5 # amount,自减数(整数)
append(key, value)
1 # 在redis name对应的值后面追加内容 2 3 # 参数: 4 key, redis的name 5 value, 要追加的字符串
redis hash操作
hash表现形式上有些像pyhton中的dict,可以存储一组关联性较强的数据 , redis中Hash在内存中的存储格式如下图:
hset(name, key, value)
1 # name对应的hash中设置一个键值对(不存在,则创建;否则,修改) 2 3 # 参数: 4 # name,redis的name 5 # key,name对应的hash中的key 6 # value,name对应的hash中的value 7 8 # 注: 9 # hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)
hmset(name, mapping)
1 # 在name对应的hash中批量设置键值对 2 3 # 参数: 4 # name,redis的name 5 # mapping,字典,如:{\'k1\':\'v1\', \'k2\': \'v2\'} 6 7 # 如: 8 # r.hmset(\'xx\', {\'k1\':\'v1\', \'k2\': \'v2\'})
hget(name,key) ## 在name对应的hash中获取根据key获取value
hmget(name, keys, *args)
1 # 在name对应的hash中获取多个key的值 2 3 # 参数: 4 # name,reids对应的name 5 # keys,要获取key集合,如:[\'k1\', \'k2\', \'k3\'] 6 # *args,要获取的key,如:k1,k2,k3 7 8 # 如: 9 # r.mget(\'xx\', [\'k1\', \'k2\']) 10 # 或 11 # print r.hmget(\'xx\', \'k1\', \'k2\')
hgetall(name)#获取name对应hash的所有键值
hlen(name)#获取name对应的hash中键值对的个数
hkeys(name) # 获取name对应的hash中所有的key的值
hvals(name) #获取name对应的hash中所有的value的值
hexists(name, key) # 检查name对应的hash是否存在当前传入的key
hdel(name,*keys) # 将name对应的hash中指定key的键值对删除
hincrby(name, key, amount=1)
1 # 自增name对应的hash中的指定key的值,不存在则创建key=amount 2 # 参数: 3 # name,redis中的name 4 # key, hash对应的key 5 # amount,自增数(整数)
hincrbyfloat(name, key, amount=1.0)
1 # 自增name对应的hash中的指定key的值,不存在则创建key=amount 2 3 # 参数: 4 # name,redis中的name 5 # key, hash对应的key 6 # amount,自增数(浮点数) 7 8 # 自增name对应的hash中的指定key的值,不存在则创建key=amount
hscan(name, cursor=0, match=None, count=None)
1 # 增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆 2 3 # 参数: 4 # name,redis的name 5 # cursor,游标(基于游标分批取获取数据) 6 # match,匹配指定key,默认None 表示所有的key 7 # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数 8 9 # 如: 10 # 第一次:cursor1, data1 = r.hscan(\'xx\', cursor=0, match=None, count=None) 11 # 第二次:cursor2, data1 = r.hscan(\'xx\', cursor=cursor1, match=None, count=None) 12 # ... 13 # 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕
hscan_iter(name, match=None, count=None)
1 # 利用yield封装hscan创建生成器,实现分批去redis中获取数据 2 3 # 参数: 4 # match,匹配指定key,默认None 表示所有的key 5 # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数 6 7 # 如: 8 # for item in r.hscan_iter(\'xx\'): 9 # print item
redis list操作
List操作,redis中的List在在内存中按照一个name对应一个List来存储。如图:
lpush(name,values)
1 # 在name对应的list中添加元素,每个新的元素都添加到列表的最左边 2 3 # 如: 4 # r.lpush(\'oo\', 11,22,33) 5 # 保存顺序为: 33,22,11 6 7 # 扩展: 8 # rpush(name, values) 表示从右向左操作
lpushx(name,value)
1 # 在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边 2 3 # 更多: 4 # rpushx(name, value) 表示从右向左操作
llen(name) # name对应的list元素的个数
linsert(name, where, refvalue, value))
1 # 在name对应的列表的某一个值前或后插入一个新值 2 3 # 参数: 4 # name,redis的name 5 # where,BEFORE或AFTER 6 # refvalue,标杆值,即:在它前后插入数据 7 # value,要插入的数据
lset(name, index, value)
1 # 对name对应的list中的某一个索引位置重新赋值 2 3 # 参数: 4 # name,redis的name 5 # index,list的索引位置 6 # value,要设置的值
lrem(name, value, num)
1 # 在name对应的list中删除指定的值 2 3 # 参数: 4 # name,redis的name 5 # value,要删除的值 6 # num, num=0,删除列表中所有的指定值; 7 # num=2,从前到后,删除2个; 8 # num=-2,从后向前,删除2个
lpop(name)
1 # 在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素 2 3 # 更多: 4 # rpop(name) 表示从右向左操作
lindex(name, index) #在name对应的列表中根据索引获取列表元素
lrange(name, start, end)
1 # 在name对应的列表分片获取数据 2 # 参数: 3 # name,redis的name 4 # start,索引的起始位置 5 # end,索引结束位置
ltrim(name, start, end)
1 # 在name对应的列表中移除没有在start-end索引之间的值 2 # 参数: 3 # name,redis的name 4 # start,索引的起始位置 5 # end,索引结束位置
rpoplpush(src, dst)
1 # 从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边 2 # 参数: 3 # src,要取数据的列表的name 4 # dst,要添加数据的列表的name
blpop(keys, timeout)
1 # 将多个列表排列,按照从左到右去pop对应列表的元素 2 3 # 参数: 4 # keys,redis的name的集合 5 # timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞 6 7 # 更多: 8 # r.brpop(keys, timeout),从右向左获取数据
brpoplpush(src, dst, timeout=0)
1 # 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧 2 3 # 参数: 4 # src,取出并要移除元素的列表对应的name 5 # dst,要插入元素的列表对应的name 6 # timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞
redis set操作
set 集合就是不允许重复的列表
sadd(name,values) # name对应的集合中添加元素
scard(name) #获取name对应的集合中元素个数
sdiff(keys, *args) #在第一个name对应的集合中且不在其他name对应的集合的元素集合
sdiffstore(dest, keys, *args) #获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中
sinter(keys, *args) #获取多一个name对应集合的并集
sinterstore(dest, keys, *args) # 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中
sismember(name, value) # 检查value是否是name对应的集合的成员
smembers(name) # 获取name对应的集合的所有成员
smove(src, dst, value) # 将某个成员从一个集合中移动到另外一个集合
spop(name) # 从集合的右侧(尾部)移除一个成员,并将其返回
srandmember(name, numbers) # 从name对应的集合中随机获取 numbers 个元素
srem(name, values) # 在name对应的集合中删除某些值
sunion(keys, *args) # 获取多一个name对应的集合的并集
sunionstore(dest,keys, *args) # 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
# 同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大
有序SET操作
有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。
zadd(name, *args, **kwargs)
1 # 在name对应的有序集合中添加元素 2 # 如: 3 # zadd(\'zz\', \'n1\', 1, \'n2\', 2) 4 # 或 5 # zadd(\'zz\', n1=11, n2=22)
zcard(name) # 获取name对应的有序集合元素的数量
zcount(name, min, max) # 获取name对应的有序集合中分数 在 [min,max] 之间的个数
zincrby(name, value, amount) # 自增name对应的有序集合的 name 对应的分数
r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
1 # 按照索引范围获取name对应的有序集合的元素 2 3 # 参数: 4 # name,redis的name 5 # start,有序集合索引起始位置(非分数) 6 # end,有序集合索引结束位置(非分数) 7 # desc,排序规则,默认按照分数从小到大排序 8 # withscores,是否获取元素的分数,默认只获取元素的值 9 # score_cast_func,对分数进行数据转换的函数 10 11 # 更多: 12 # 从大到小排序 13 # zrevrange(name, start, end, withscores=False, score_cast_func=float) 14 15 # 按照分数范围获取name对应的有序集合的元素 16 # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float) 17 # 从大到小排序 18 # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
zrank(name, value)
1 # 获取某个值在 name对应的有序集合中的排行(从 0 开始) 2 3 # 更多: 4 # zrevrank(name, value),从大到小排序
zrem(name, values)
1 # 删除name对应的有序集合中值是values的成员 2 3 # 如:zrem(\'zz\', [\'s1\', \'s2\'])
zremrangebyrank(name, min, max) # 根据排行范围删除
zremrangebyscore(name, min, max) # 根据分数范围删除
zscore(name, value) # 获取name对应有序集合中 value 对应的分数
zinterstore(dest, keys, aggregate=None)
1 # 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作 2 # aggregate的值为: SUM MIN MAX
zunionstore(dest, keys, aggregate=None)
1 # 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作 2 # aggregate的值为: SUM MIN MAX
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
1 # 同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作
其他常用操作
delete(*names) # 根据删除redis中的任意数据类型
exists(name) # 检测redis的name是否存在
keys(pattern=\'*\')
1 # 根据模型获取redis的name 2 3 # 更多: 4 # KEYS * 匹配数据库中所有 key 。 5 # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。 6 # KEYS h*llo 匹配 hllo 和 heeeeello 等。 7 # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
expire(name ,time)# 为某个redis的某个name设置超时时间
rename(src, dst) # 对redis的name重命名为
move(name, db)) # 将redis的某个值移动到指定的db下
randomkey() # 随机获取一个redis的name(不删除)
type(name) # 获取name对应值的类型
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
# 同字符串操作,用于增量迭代获取key
管道(无卵用)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import redis,time 5 6 pool = redis.ConnectionPool(host=\'10.211.55.4\', port=6379 ,db = 5) 7 8 r = redis.Redis(connection_pool=pool) 9 10 # pipe = r.pipeline(transaction=False) 11 pipe = r.pipeline(transaction=True) 12 13 pipe.set(\'name\', \'alex\') 14 time.sleep(30) 15 pipe.set(\'role\', \'sb\') 16 17 pipe.execute()
广播订阅
类
1 import redis 2 3 4 class RedisHelper: 5 6 def __init__(self): 7 self.__conn = redis.Redis(host=\'127.0.0.1\') 8 self.chan_sub = \'fm104.5\' 9 self.chan_pub = \'fm104.5\' 10 11 def public(self, msg): 12 self.__conn.publish(self.chan_pub, msg) #发布消息 13 return True 14 15 def subscribe(self): 16 pub = self.__conn.pubsub() #打开收音机 17 pub.subscribe(self.chan_sub) #调频道 18 pub.parse_response() #准备接收 19 return pub
广播者
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 from 广播订阅1 import RedisHelper 5 6 obj = RedisHelper() 7 obj.public(\'hello\')
订阅者
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 from 广播订阅1 import RedisHelper 5 6 obj = RedisHelper() 7 redis_sub = obj.subscribe() 8 9 while True: 10 msg = redis_sub.parse_response() 11 print(msg)
10.数据库介绍
什么是数据库?
数据库(Database)是按照数据结构来组织、存储和管理数据的仓库,
每个数据库都有一个或多个不同的API用于创建,访问,管理,搜索和复制所保存的数据。
我们也可以将数据存储在文件中,但是在文件中读写数据速度相对较慢。
所以,现在我们使用关系型数据库管理系统(RDBMS)来存储和管理的大数据量。所谓的关系型数据库,是建立在关系模型基础上的数据库,借助于集合代数等数学概念和方法来处理数据库中的数据。
RDBMS即关系数据库管理系统(Relational Database Management System)的特点:
-.数据以表格的形式出现
-.每行为各种记录名称
-.每列为记录名称所对应的数据域
-.许多的行和列组成一张表单
-.若干的表单组成database
关系型数据库: Oracle,Mysql,SqlServer,DB2,Postgresql,Sqlite
RDBMS 术语
在我们开始学习MySQL 数据库前,让我们先了解下RDBMS的一些术语:
- 数据库: 数据库是一些关联表的集合。.
- 数据表: 表是数据的矩阵。在一个数据库中的表看起来像一个简单的电子表格。
- 列: 一列(数据元素) 包含了相同的数据, 例如邮政编码的数据。
- 行:一行(=元组,或记录)是一组相关的数据,例如一条用户订阅的数据。
- 冗余:存储两倍数据,冗余可以使系统速度更快。(表的规范化程度越高,表与表之间的关系就越多;查询时可能经常需要在多个表之间进行连接查询;而进行连接操作会降低查询速度。例如,学生的信息存储在student表中,院系信息存储在department表中。通过student表中的dept_id字段与department表建立关联关系。如果要查询一个学生所在系的名称,必须从student表中查找学生所在院系的编号(dept_id),然后根据这个编号去department查找系的名称。如果经常需要进行这个操作时,连接查询会浪费很多的时间。因此可以在student表中增加一个冗余字段dept_name,该字段用来存储学生所在院系的名称。这样就不用每次都进行连接操作了。)
- 主键:主键是唯一的。一个数据表中只能包含一个主键。你可以使用主键来查询数据。
- 外键:外键用于关联两个表。
- 复合键:复合键(组合键)将多个列作为一个索引键,一般用于复合索引。
- 索引:使用索引可快速访问数据库表中的特定信息。索引是对数据库表中一列或多列的值进行排序的一种结构。类似于书籍的目录。
- 参照完整性: 参照的完整性要求关系中不允许引用不存在的实体。与实体完整性是关系模型必须满足的完整性约束条件,目的是保证数据的一致性。