weiming-cheng

吴sir--Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Memcached

Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于一个存储键/值对的hashmap。其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信。

Memcached安装和基本使用

Memcached安装:

1
2
3
4
5
6
7
8
wget http://memcached.org/latest
tar -zxvf memcached-1.x.x.tar.gz
cd memcached-1.x.x
./configure && make && make test && sudo make install
 
PS:依赖libevent
yum install libevent-devel
apt-get install libevent-dev

启动Memcached

1
2
3
4
5
6
7
8
9
10
memcached -d -m 10 -u root -l 10.211.55.4 -p 12000 -c 256 -P /tmp/memcached.pid
 
参数说明:
-d 是启动一个守护进程
-m 是分配给Memcache使用的内存数量,单位是MB
-u 是运行Memcache的用户
-l 是监听的服务器IP地址
-p 是设置Memcache监听的端口,最好是1024以上的端口
-c 选项是最大运行的并发连接数,默认是1024,按照你服务器的负载量来设定
-P 是设置保存Memcache的pid文件

Memcached命令

1
2
3
存储命令: set/add/replace/append/prepend/cas
获取命令: get/gets
其他命令: delete/stats..

Python操作Memcached

安装API

1
2
python操作Memcached使用Python-memcached模块
下载安装:https://pypi.python.org/pypi/python-memcached

1、第一次操作

1
2
3
4
5
6
import memcache
 
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True)
mc.set("foo", "bar")
ret = mc.get(\'foo\')
print ret

Ps:debug = True 表示运行出现错误时,现实错误信息,上线后移除该参数。

2、天生支持集群

python-memcached模块原生支持集群操作,其原理是在内存维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现的次数成正比

1
2
3
4
5
6
7
主机 权重
1.1.1.1 1
1.1.1.2 2
1.1.1.3 1
 
那么在内存中主机列表为:
host_list = ["1.1.1.1", "1.1.1.2", "1.1.1.2", "1.1.1.3", ]

如果用户根据如果要在内存中创建一个键值对(如:k1 = "v1"),那么要执行一下步骤:

  • 根据算法将 k1 转换成一个数字
  • 将数字和主机列表长度求余数,得到一个值 N( 0 <= N < 列表长度 )
  • 在主机列表中根据 第2步得到的值为索引获取主机,例如:host_list[N]
  • 连接 将第3步中获取的主机,将 k1 = "v1" 放置在该服务器的内存中

代码实现如下:

1
2
3
mc = memcache.Client([(\'1.1.1.1:12000\', 1), (\'1.1.1.2:12000\', 2), (\'1.1.1.3:12000\', 1)], debug=True)
 
mc.set(\'k1\', \'v1\')

3、add
添加一条键值对,如果已经存在的 key,重复执行add操作异常

1
2
3
4
5
6
7
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True)
mc.add(\'k1\', \'v1\')
# mc.add(\'k1\', \'v2\') # 报错,对已经存在的key重复添加,失败!!!

4、replace
replace 修改某个key的值,如果key不存在,则异常

1
2
3
4
5
6
7
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True)
# 如果memcache中存在kkkk,则替换成功,否则一场
mc.replace(\'kkkk\',\'999\')

5、set 和 set_multi

set 设置一个键值对,如果key不存在,则创建,如果key存在,则修改
set_multi 设置多个键值对,如果key不存在,则创建,如果key存在,则修改

1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True)
 
mc.set(\'key0\', \'wupeiqi\')
 
mc.set_multi({\'key1\': \'val1\', \'key2\': \'val2\'})

6、delete 和 delete_multi

delete 在Memcached中删除指定的一个键值对
delete_multi 在Memcached中删除指定的多个键值对

1
2
3
4
5
6
7
8
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True)
 
mc.delete(\'key0\')
mc.delete_multi([\'key1\', \'key2\'])

7、get 和 get_multi

get 获取一个键值对
get_multi 获取多一个键值对

1
2
3
4
5
6
7
8
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True)
 
val = mc.get(\'key0\')
item_dict = mc.get_multi(["key1", "key2", "key3"])

8、append 和 prepend

append 修改指定key的值,在该值 后面 追加内容
prepend 修改指定key的值,在该值 前面 插入内容

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True)
# k1 = "v1"
 
mc.append(\'k1\', \'after\')
# k1 = "v1after"
 
mc.prepend(\'k1\', \'before\')
# k1 = "beforev1after"

9、decr 和 incr  

incr 自增,将Memcached中的某一个值增加 N ( N默认为1 )
decr 自减,将Memcached中的某一个值减少 N ( N默认为1 )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True)
mc.set(\'k1\', \'777\')
 
mc.incr(\'k1\')
# k1 = 778
 
mc.incr(\'k1\', 10)
# k1 = 788
 
mc.decr(\'k1\')
# k1 = 787
 
mc.decr(\'k1\', 10)
# k1 = 777

10、gets 和 cas

如商城商品剩余个数,假设改值保存在memcache中,product_count = 900
A用户刷新页面从memcache中读取到product_count = 900
B用户刷新页面从memcache中读取到product_count = 900

如果A、B用户均购买商品

A用户修改商品剩余个数 product_count=899
B用户修改商品剩余个数 product_count=899

如此一来缓存内的数据便不在正确,两个用户购买商品后,商品剩余还是 899
如果使用python的set和get来操作以上过程,那么程序就会如上述所示情况!

如果想要避免此情况的发生,只要使用 gets 和 cas 即可,如:

1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
mc = memcache.Client([\'10.211.55.4:12000\'], debug=True, cache_cas=True)
 
v = mc.gets(\'product_count\')
# ...
# 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
mc.cas(\'product_count\', "899")

Ps:本质上每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值), 如此一来有可能出现非正常数据,则不允许修改。

Memcached 真的过时了吗?

Redis

redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis安装和基本使用

1
2
3
4
wget http://download.redis.io/releases/redis-3.0.6.tar.gz
tar xzf redis-3.0.6.tar.gz
cd redis-3.0.6
make

启动服务端

1
src/redis-server

启动客户端

1
2
3
4
5
src/redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

Python操作Redis

安装API

1
2
3
4
5
6
7
sudo pip install redis
or
sudo easy_install redis
or
源码安装
 
详见:https://github.com/WoLpH/redis-py

常用操作

1、操作模式

redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。

1
2
3
4
5
6
7
8
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import redis
 
r = redis.Redis(host=\'10.211.55.4\', port=6379)
r.set(\'foo\', \'Bar\')
print r.get(\'foo\')

2、连接池

redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import redis
 
pool = redis.ConnectionPool(host=\'10.211.55.4\', port=6379)
 
r = redis.Redis(connection_pool=pool)
r.set(\'foo\', \'Bar\')
print r.get(\'foo\')

3、管道

redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import redis
 
pool = redis.ConnectionPool(host=\'10.211.55.4\', port=6379)
 
r = redis.Redis(connection_pool=pool)
 
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
 
r.set(\'name\', \'alex\')
r.set(\'role\', \'sb\')
 
pipe.execute()

4、发布订阅

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis


class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host=\'10.211.55.4\')
        self.chan_sub = \'fm104.5\'
        self.chan_pub = \'fm104.5\'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()
        pub.subscribe(self.chan_sub)
        pub.parse_response()
        return pub
RedisHelper

订阅者:

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
redis_sub = obj.subscribe()
 
while True:
msg= redis_sub.parse_response()
print msg

发布者:

1
2
3
4
5
6
7
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
obj.public(\'hello\')

更多参见:https://github.com/andymccurdy/redis-py/

RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ安装

1
2
3
4
5
6
7
8
安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
安装erlang
$ yum -y install erlang
 
安装RabbitMQ
$ yum -y install rabbitmq-server

注意:service rabbitmq-server start/stop

安装API

1
2
3
4
5
6
7
pip install pika
or
easy_install pika
or
源码
 
https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue实现生产者消费者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
View Code

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
import pika
 
# ######################### 生产者 #########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=\'localhost\'))
channel = connection.channel()
 
channel.queue_declare(queue=\'hello\')
 
channel.basic_publish(exchange=\'\',
routing_key=\'hello\',
body=\'Hello World!\')
print(" [x] Sent \'Hello World!\'")
connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env python
import pika
 
# ########################## 消费者 ##########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=\'localhost\'))
channel = connection.channel()
 
channel.queue_declare(queue=\'hello\')
 
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
 
channel.basic_consume(callback,
queue=\'hello\',
no_ack=True)
 
print(\' [*] Waiting for messages. To exit press CTRL+C\')
channel.start_consuming()

1、acknowledgment 消息不丢失

no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'10.211.55.4\'))
channel = connection.channel()

channel.queue_declare(queue=\'hello\')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print \'ok\'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=\'hello\',
                      no_ack=False)

print(\' [*] Waiting for messages. To exit press CTRL+C\')
channel.start_consuming()
消费者

2、durable 消息不丢失

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=\'10.211.55.4\'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue=\'hello\', durable=True)

channel.basic_publish(exchange=\'\',
                      routing_key=\'hello\',
                      body=\'Hello World!\',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent \'Hello World!\'")
connection.close()
生产者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=\'10.211.55.4\'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue=\'hello\', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print \'ok\'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=\'hello\',
                      no_ack=False)

print(\' [*] Waiting for messages. To exit press CTRL+C\')
channel.start_consuming()
消费者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=\'10.211.55.4\'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue=\'hello\')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print \'ok\'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue=\'hello\',
                      no_ack=False)

print(\' [*] Waiting for messages. To exit press CTRL+C\')
channel.start_consuming()
消费者

4、发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

exchange type = fanout

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'logs\',
                         type=\'fanout\')

message = \' \'.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=\'logs\',
                      routing_key=\'\',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
发布者
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'logs\',
                         type=\'fanout\')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=\'logs\',
                   queue=queue_name)

print(\' [*] Waiting for logs. To exit press CTRL+C\')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
订阅者

5、关键字发送

exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'direct_logs\',
                         type=\'direct\')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange=\'direct_logs\',
                       queue=queue_name,
                       routing_key=severity)

print(\' [*] Waiting for logs. To exit press CTRL+C\')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消费者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'direct_logs\',
                         type=\'direct\')

severity = sys.argv[1] if len(sys.argv) > 1 else \'info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
channel.basic_publish(exchange=\'direct_logs\',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
生产者

6、模糊匹配

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • * 表示只能匹配 一个 单词
1
2
3
发送者路由值 队列中
old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(

        host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'topic_logs\',
                         type=\'topic\')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange=\'topic_logs\',
                       queue=queue_name,
                       routing_key=binding_key)

print(\' [*] Waiting for logs. To exit press CTRL+C\')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消费者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'topic_logs\',
                         type=\'topic\')

routing_key = sys.argv[1] if len(sys.argv) > 1 else \'anonymous.info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
channel.basic_publish(exchange=\'topic_logs\',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生产者

 

SQLAlchemy

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
 
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
 
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
 
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
 
更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html

步骤一:

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from sqlalchemy import create_engine
 
 
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
 
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (\'2\', \'v1\')"
)
 
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%s, %s)",
((555, "v1"),(666, "v1"),)
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)",
id=999, name="v1"
)
 
result = engine.execute(\'select * from ts_test\')
result.fetchall()
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from sqlalchemy import create_engine


engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)


# 事务操作
with engine.begin() as conn:
    conn.execute("insert into table (x, y, z) values (1, 2, 3)")
    conn.execute("my_special_procedure(5)")
    
    
conn = engine.connect()
# 事务操作 
with conn.begin():
       conn.execute("some statement", {\'x\':5, \'y\':10})
事务操作

注:查看数据库连接:show status like \'Threads%\';

步骤二:

使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
 
metadata = MetaData()
 
user = Table(\'user\', metadata,
Column(\'id\', Integer, primary_key=True),
Column(\'name\', String(20)),
)
 
color = Table(\'color\', metadata,
Column(\'id\', Integer, primary_key=True),
Column(\'name\', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
 
metadata.create_all(engine)
# metadata.clear()
# metadata.remove()
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey

metadata = MetaData()

user = Table(\'user\', metadata,
    Column(\'id\', Integer, primary_key=True),
    Column(\'name\', String(20)),
)

color = Table(\'color\', metadata,
    Column(\'id\', Integer, primary_key=True),
    Column(\'name\', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)

conn = engine.connect()

# 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)
conn.execute(user.insert(),{\'id\':7,\'name\':\'seven\'})
conn.close()

# sql = user.insert().values(id=123, name=\'wu\')
# conn.execute(sql)
# conn.close()

# sql = user.delete().where(user.c.id > 1)

# sql = user.update().values(fullname=user.c.name)
# sql = user.update().where(user.c.name == \'jack\').values(name=\'ed\')

# sql = select([user, ])
# sql = select([user.c.id, ])
# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
# sql = select([user.c.name]).order_by(user.c.name)
# sql = select([user]).group_by(user.c.name)

# result = conn.execute(sql)
# print result.fetchall()
# conn.close()
增删改查

更多内容详见:

http://www.jianshu.com/p/e6bba189fcbd

http://docs.sqlalchemy.org/en/latest/core/expression_api.html

注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。

步骤三:

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
 
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
 
Base = declarative_base()
 
 
class User(Base):
__tablename__ = \'users\'
id = Column(Integer, primary_key=True)
name = Column(String(50))
 
# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine)
 
Session = sessionmaker(bind=engine)
session = Session()
 
 
# ########## 增 ##########
# u = User(id=2, name=\'sb\')
# session.add(u)
# session.add_all([
# User(id=3, name=\'sb\'),
# User(id=4, name=\'sb\')
# ])
# session.commit()
 
# ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()
 
# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({\'cluster_id\' : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name=\'sb\').first()
 
# ret = session.query(User).filter_by(name=\'sb\').all()
# print ret
 
# ret = session.query(User).filter(User.name.in_([\'sb\',\'bb\'])).all()
# print ret
 
# ret = session.query(User.name.label(\'name_label\')).all()
# print ret,type(ret)
 
# ret = session.query(User).order_by(User.id).all()
# print ret
 
# ret = session.query(User).order_by(User.id)[1:3]
# print ret
# session.commit()

更多功能参见文档,猛击这里下载PDF

 

分类:

技术点:

相关文章: