之前一篇Python 封装DBUtils 和pymysql 中写过一个basedao.py,最近几天又重新整理了下思绪,优化了下 basedao.py,目前支持的方法还不多,后续会进行改进、添加。
主要功能:
1.查询单个对象:
所需参数:表名,过滤条件
2.查询多个对象:
所需参数:表名,过滤条件
3.按主键查询:
所需参数:表名,值
4.分页查询:
所需参数:表名,页码,每页记录数,过滤条件
调用方法锁获取的对象都是以字典形式存储,例如:查询user表(字段有id,name,age)里的id=1的数据返回的对象为user = {"id":1,"name","zhangsan","age":18},我们可以通过user.get("id")来获取id值,非常方便,不用定义什么类对象来表示。如果查询的是多个,那么多个字典对象将会存放在一个列表里返回。
具体代码如下:
1 import json, os, sys, time, pymysql, pprint 2 3 from DBUtils import PooledDB 4 5 def print(*args): 6 pprint.pprint(args) 7 8 def get_time(): 9 '获取时间' 10 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) 11 12 def stitch_sequence(seq=None, suf=None): 13 '如果参数("suf")不为空,则根据特殊的suf拼接列表元素,返回一个字符串' 14 if seq is None: raise Exception("Parameter seq is None"); 15 if suf is None: suf = "," 16 r = str() 17 for s in seq: 18 r += s + suf 19 return r[:-len(suf)] 20 21 class BaseDao(object): 22 """ 23 简便的数据库操作基类 24 """ 25 def __init__(self, creator=pymysql, host="localhost",port=3306, user=None, password="", 26 database=None, charset="utf8"): 27 if host is None: raise Exception("Parameter [host] is None.") 28 if port is None: raise Exception("Parameter [port] is None.") 29 if user is None: raise Exception("Parameter [user] is None.") 30 if password is None: raise Exception("Parameter [password] is None.") 31 if database is None: raise Exception("Parameter [database] is None.") 32 # 数据库连接配置 33 self.__config = dict({ 34 "creator" : creator, "charset":charset, "host":host, "port":port, 35 "user":user, "password":password, "database":database 36 }) 37 self.__database = self.__config["database"] # 用于存储查询数据库 38 self.__tableName = None # 用于临时存储当前查询表名 39 # 初始化 40 self.__init_connect() # 初始化连接 41 self.__init_params() # 初始化参数 42 print(get_time(), self.__database, "数据库初始化成功。") 43 44 def __del__(self): 45 '重写类被清除时调用的方法' 46 if self.__cursor: self.__cursor.close() 47 if self.__conn: self.__conn.close() 48 print(get_time(), self.__database, "连接关闭") 49 50 def __init_connect(self): 51 self.__conn = PooledDB.connect(**self.__config) 52 self.__cursor = self.__conn.cursor() 53 54 def __init_params(self): 55 '初始化参数' 56 self.__init_table_dict() 57 self.__init__table_column_dict_list() 58 59 def __init__information_schema_columns(self): 60 "查询 information_schema.`COLUMNS` 中的列" 61 sql = """ SELECT COLUMN_NAME FROM information_schema.`COLUMNS` 62 WHERE TABLE_SCHEMA='information_schema' AND TABLE_NAME='COLUMNS' 63 """ 64 result_tuple = self.__exec_query(sql) 65 column_list = [r[0] for r in result_tuple] 66 return column_list 67 68 def __init_table_dict(self): 69 "查询配置数据库中改的所有表" 70 schema_column_list = self.__init__information_schema_columns() 71 stitch_str = stitch_sequence(schema_column_list) 72 sql1 = """ SELECT TABLE_NAME FROM information_schema.`TABLES` 73 WHERE TABLE_SCHEMA='%s' 74 """ %(self.__database) 75 table_tuple = self.__exec_query(sql1) 76 self.__table_dict = {t[0]:{} for t in table_tuple} 77 for table in self.__table_dict.keys(): 78 sql = """ SELECT %s FROM information_schema.`COLUMNS` 79 WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' 80 """ %(stitch_str, self.__database, table) 81 column_tuple = self.__exec_query(sql) 82 column_dict = {} 83 for vs in column_tuple: 84 d = {k:v for k,v in zip(schema_column_list, vs)} 85 column_dict[d["COLUMN_NAME"]] = d 86 self.__table_dict[table] = column_dict 87 88 def __init__table_column_dict_list(self): 89 self.__table_column_dict_list = {} 90 for table, column_dict in self.__table_dict.items(): 91 column_list = [column for column in column_dict.keys()] 92 self.__table_column_dict_list[table] = column_list 93 94 def __exec_query(self, sql, single=False): 95 ''' 96 执行查询方法 97 - @sql 查询 sql 98 - @single 是否查询单个结果集,默认False 99 ''' 100 try: 101 self.__cursor.execute(sql) 102 print(get_time(), "SQL[%s]"%sql) 103 if single: 104 result_tuple = self.__cursor.fetchone() 105 else: 106 result_tuple = self.__cursor.fetchall() 107 return result_tuple 108 except Exception as e: 109 print(e) 110 111 def __exec_update(self, sql): 112 try: 113 # 获取数据库游标 114 result = self.__cursor.execute(sql) 115 print(get_time(), "SQL[%s]"%sql) 116 self.__conn.commit() 117 return result 118 except Exception as e: 119 print(e) 120 self.__conn.rollback() 121 122 def __parse_result(self, result): 123 '用于解析单个查询结果,返回字典对象' 124 if result is None: return None 125 obj = {k:v for k,v in zip(self.__column_list, result)} 126 return obj 127 128 def __parse_results(self, results): 129 '用于解析多个查询结果,返回字典列表对象' 130 if results is None: return None 131 objs = [self.__parse_result(result) for result in results] 132 return objs 133 134 def __getpk(self, tableName): 135 if self.__table_dict.get(tableName) is None: raise Exception(tableName, "is not exist.") 136 for column, column_dict in self.__table_dict[tableName].items(): 137 if column_dict["COLUMN_KEY"] == "PRI": return column 138 139 def __get_table_column_list(self, tableName=None): 140 '查询表的字段列表, 将查询出来的字段列表存入 __fields 中' 141 return self.__table_column_dict_list[tableName] 142 143 def __query_util(self, filters=None): 144 """ 145 SQL 语句拼接方法 146 @filters 过滤条件 147 """ 148 sql = r'SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}' 149 # 拼接查询表 150 sql = sql.replace("#{TABLE_NAME}", self.__tableName) 151 # 拼接查询字段 152 FIELDS = stitch_sequence(self.__get_table_column_list(self.__tableName)) 153 sql = sql.replace("#{FIELDS}", FIELDS) 154 # 拼接查询条件(待优化) 155 if filters is None: 156 sql = sql.replace("#{FILTERS}", "") 157 else: 158 FILTERS = "" 159 if not isinstance(filters, dict): 160 raise Exception("Parameter [filters] must be dict type. ") 161 isPage = False 162 if filters.get("_limit_"): isPage = True 163 if isPage: beginindex, limit = filters.pop("_limit_") 164 for k, v in filters.items(): 165 if k.startswith("_in_"): # 拼接 in 166 FILTERS += "AND %s IN (" %(k[4:]) 167 values = v.split(",") 168 for value in values: 169 FILTERS += "%s,"%value 170 FILTERS = FILTERS[0:len(FILTERS)-1] + ") " 171 elif k.startswith("_nein_"): # 拼接 not in 172 FILTERS += "AND %s NOT IN (" %(k[4:]) 173 values = v.split(",") 174 for value in values: 175 FILTERS += "%s,"%value 176 FILTERS = FILTERS[0:len(FILTERS)-1] + ") " 177 elif k.startswith("_like_"): # 拼接 like 178 FILTERS += "AND %s like '%%%s%%' " %(k[6:], v) 179 elif k.startswith("_ne_"): # 拼接不等于 180 FILTERS += "AND %s != '%s' " %(k[4:], v) 181 elif k.startswith("_lt_"): # 拼接小于 182 FILTERS += "AND %s < '%s' " %(k[4:], v) 183 elif k.startswith("_le_"): # 拼接小于等于 184 FILTERS += "AND %s <= '%s' " %(k[4:], v) 185 elif k.startswith("_gt_"): # 拼接大于 186 FILTERS += "AND %s > '%s' " %(k[4:], v) 187 elif k.startswith("_ge_"): # 拼接大于等于 188 FILTERS += "AND %s >= '%s' " %(k[4:], v) 189 else: # 拼接等于 190 FILTERS += "AND %s='%s' "%(k, v) 191 sql = sql.replace("#{FILTERS}", FILTERS) 192 if isPage: sql += "LIMIT %d,%d"%(beginindex, limit) 193 return sql 194 195 def __check_params(self, tableName): 196 ''' 197 检查参数 198 ''' 199 if tableName is None and self.__tableName is None: 200 raise Exception("Parameter [tableName] is None.") 201 elif self.__tableName is None or self.__tableName != tableName: 202 self.__tableName = tableName 203 self.__column_list = self.__table_column_dict_list[self.__tableName] 204 205 def select_one(self, tableName=None, filters={}): 206 ''' 207 查询单个对象 208 @tableName 表名 209 @filters 过滤条件 210 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 211 ''' 212 self.__check_params(tableName) 213 sql = self.__query_util(filters) 214 result = self.__exec_query(sql, single=True) 215 return self.__parse_result(result) 216 217 def select_pk(self, tableName=None, primaryKey=None): 218 ''' 219 按主键查询 220 @tableName 表名 221 @primaryKey 主键值 222 ''' 223 self.__check_params(tableName) 224 filters = {} 225 filters.setdefault(self.__getpk(tableName), primaryKey) 226 sql = self.__query_util(filters) 227 result = self.__exec_query(sql, single=True) 228 return self.__parse_result(result) 229 230 def select_all(self, tableName=None, filters={}): 231 ''' 232 查询所有 233 @tableName 表名 234 @filters 过滤条件 235 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 236 ''' 237 self.__check_params(tableName) 238 sql = self.__query_util(filters) 239 results = self.__exec_query(sql) 240 return self.__parse_results(results) 241 242 def count(self, tableName=None): 243 ''' 244 统计记录数 245 ''' 246 self.__check_params(tableName) 247 sql = "SELECT count(*) FROM %s"%(self.__tableName) 248 result = self.__exec_query(sql, single=True) 249 return result[0] 250 251 def select_page(self, tableName=None, pageNum=1, limit=10, filters={}): 252 ''' 253 分页查询 254 @tableName 表名 255 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 256 ''' 257 self.__check_params(tableName) 258 totalCount = self.count(tableName) 259 if totalCount / limit == 0 : 260 totalPage = totalCount / limit 261 else: 262 totalPage = totalCount // limit + 1 263 if pageNum > totalPage: 264 print("最大页数为%d"%totalPage) 265 pageNum = totalPage 266 elif pageNum < 1: 267 print("页数不能小于1") 268 pageNum = 1 269 beginindex = (pageNum-1) * limit 270 filters.setdefault("_limit_", (beginindex, limit)) 271 sql = self.__query_util(filters) 272 result_tuple = self.__exec_query(sql) 273 return self.__parse_results(result_tuple) 274 275 if __name__ == "__main__": 276 config = { 277 # "creator": pymysql, 278 # "host" : "127.0.0.1", 279 "user" : "root", 280 "password" : "root", 281 "database" : "test", 282 # "port" : 3306, 283 # "charset" : 'utf8' 284 } 285 base = BaseDao(**config) 286 ######################################################################## 287 user = base.select_one("user") 288 print(user) 289 ######################################################################## 290 # users = base.select_all("user") 291 # print(users) 292 ######################################################################## 293 filter1 = { 294 "status":1, 295 "_in_id":"1,2,3,4,5", 296 "_like_name":"zhang", 297 "_ne_name":"wangwu" 298 } 299 user_filters = base.select_all("user", filter1) 300 print(user_filters) 301 ######################################################################## 302 role = base.select_one("role") 303 print(role) 304 ######################################################################## 305 user_pk = base.select_pk("user", 2) 306 print(user_pk) 307 ######################################################################## 308 user_limit = base.select_page("user", 1, 10) 309 print(user_limit) 310 ########################################################################