之前一篇Python 封装DBUtils 和pymysql 中写过一个basedao.py,最近几天又重新整理了下思绪,优化了下 basedao.py,目前支持的方法还不多,后续会进行改进、添加。

  主要功能:

    1.查询单个对象:

      所需参数:表名,过滤条件

    2.查询多个对象:
      所需参数:表名,过滤条件

    3.按主键查询:
      所需参数:表名,值

    4.分页查询:
      所需参数:表名,页码,每页记录数,过滤条件

  调用方法锁获取的对象都是以字典形式存储,例如:查询user表(字段有id,name,age)里的id=1的数据返回的对象为user = {"id":1,"name","zhangsan","age":18},我们可以通过user.get("id")来获取id值,非常方便,不用定义什么类对象来表示。如果查询的是多个,那么多个字典对象将会存放在一个列表里返回。

  具体代码如下:  

  1 import json, os, sys, time, pymysql, pprint
  2 
  3 from DBUtils import PooledDB
  4 
  5 def print(*args):
  6     pprint.pprint(args)
  7 
  8 def get_time():
  9     '获取时间'
 10     return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
 11 
 12 def stitch_sequence(seq=None, suf=None):
 13     '如果参数("suf")不为空,则根据特殊的suf拼接列表元素,返回一个字符串'
 14     if seq is None: raise Exception("Parameter seq is None");
 15     if suf is None: suf = ","
 16     r = str()
 17     for s in seq:
 18         r += s + suf
 19     return r[:-len(suf)]
 20 
 21 class BaseDao(object):
 22     """
 23     简便的数据库操作基类
 24     """
 25     def __init__(self, creator=pymysql, host="localhost",port=3306, user=None, password="",
 26                     database=None, charset="utf8"):
 27         if host is None: raise Exception("Parameter [host] is None.")
 28         if port is None: raise Exception("Parameter [port] is None.")
 29         if user is None: raise Exception("Parameter [user] is None.")
 30         if password is None: raise Exception("Parameter [password] is None.")
 31         if database is None: raise Exception("Parameter [database] is None.")
 32         # 数据库连接配置
 33         self.__config = dict({
 34             "creator" : creator, "charset":charset, "host":host, "port":port, 
 35             "user":user, "password":password, "database":database
 36         })
 37         self.__database = self.__config["database"]     # 用于存储查询数据库
 38         self.__tableName = None                         # 用于临时存储当前查询表名
 39         # 初始化
 40         self.__init_connect()                           # 初始化连接
 41         self.__init_params()                            # 初始化参数
 42         print(get_time(), self.__database, "数据库初始化成功。")
 43         
 44     def __del__(self):
 45         '重写类被清除时调用的方法'
 46         if self.__cursor: self.__cursor.close()
 47         if self.__conn: self.__conn.close()
 48         print(get_time(), self.__database, "连接关闭")
 49 
 50     def __init_connect(self):
 51         self.__conn = PooledDB.connect(**self.__config)
 52         self.__cursor = self.__conn.cursor()
 53 
 54     def __init_params(self):
 55         '初始化参数'
 56         self.__init_table_dict()
 57         self.__init__table_column_dict_list()
 58 
 59     def __init__information_schema_columns(self):
 60         "查询 information_schema.`COLUMNS` 中的列"
 61         sql =   """ SELECT COLUMN_NAME FROM information_schema.`COLUMNS`
 62                     WHERE TABLE_SCHEMA='information_schema' AND TABLE_NAME='COLUMNS'
 63                 """
 64         result_tuple = self.__exec_query(sql)
 65         column_list = [r[0] for r in result_tuple]
 66         return column_list
 67 
 68     def __init_table_dict(self):
 69         "查询配置数据库中改的所有表"
 70         schema_column_list = self.__init__information_schema_columns()
 71         stitch_str = stitch_sequence(schema_column_list)
 72         sql1 =  """ SELECT TABLE_NAME FROM information_schema.`TABLES`
 73                     WHERE TABLE_SCHEMA='%s'
 74                 """ %(self.__database)
 75         table_tuple = self.__exec_query(sql1)
 76         self.__table_dict = {t[0]:{} for t in table_tuple}
 77         for table in self.__table_dict.keys():
 78             sql =   """ SELECT %s FROM information_schema.`COLUMNS`
 79                         WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'
 80                     """ %(stitch_str, self.__database, table)
 81             column_tuple = self.__exec_query(sql)
 82             column_dict = {}
 83             for vs in column_tuple:
 84                 d = {k:v for k,v in zip(schema_column_list, vs)}
 85                 column_dict[d["COLUMN_NAME"]] = d
 86             self.__table_dict[table] = column_dict
 87 
 88     def __init__table_column_dict_list(self):
 89         self.__table_column_dict_list = {}
 90         for table, column_dict in self.__table_dict.items():
 91             column_list = [column for column in column_dict.keys()]
 92             self.__table_column_dict_list[table] = column_list
 93         
 94     def __exec_query(self, sql, single=False):
 95         '''
 96         执行查询方法
 97         - @sql    查询 sql
 98         - @single 是否查询单个结果集,默认False
 99         '''
100         try:
101             self.__cursor.execute(sql)
102             print(get_time(), "SQL[%s]"%sql)
103             if single:
104                 result_tuple = self.__cursor.fetchone()
105             else:
106                 result_tuple = self.__cursor.fetchall()
107             return result_tuple
108         except Exception as e:
109             print(e)
110 
111     def __exec_update(self, sql):
112         try:
113             # 获取数据库游标
114             result = self.__cursor.execute(sql)
115             print(get_time(), "SQL[%s]"%sql)
116             self.__conn.commit()
117             return result
118         except Exception as e:
119             print(e)
120             self.__conn.rollback()
121 
122     def __parse_result(self, result):
123         '用于解析单个查询结果,返回字典对象'
124         if result is None: return None
125         obj = {k:v for k,v in zip(self.__column_list, result)}
126         return obj
127 
128     def __parse_results(self, results):
129         '用于解析多个查询结果,返回字典列表对象'
130         if results is None: return None
131         objs = [self.__parse_result(result) for result in results]
132         return objs
133 
134     def __getpk(self, tableName):
135         if self.__table_dict.get(tableName) is None: raise Exception(tableName, "is not exist.")
136         for column, column_dict in self.__table_dict[tableName].items():
137             if column_dict["COLUMN_KEY"] == "PRI": return column
138 
139     def __get_table_column_list(self, tableName=None):
140         '查询表的字段列表, 将查询出来的字段列表存入 __fields 中'
141         return self.__table_column_dict_list[tableName]
142 
143     def __query_util(self, filters=None):
144         """
145         SQL 语句拼接方法
146         @filters 过滤条件
147         """
148         sql = r'SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}'
149         # 拼接查询表
150         sql = sql.replace("#{TABLE_NAME}", self.__tableName)
151         # 拼接查询字段
152         FIELDS = stitch_sequence(self.__get_table_column_list(self.__tableName))
153         sql = sql.replace("#{FIELDS}", FIELDS)
154         # 拼接查询条件(待优化)
155         if filters is None:
156             sql = sql.replace("#{FILTERS}", "")
157         else:
158             FILTERS =  ""
159             if not isinstance(filters, dict):
160                 raise Exception("Parameter [filters] must be dict type. ")
161             isPage = False
162             if filters.get("_limit_"): isPage = True
163             if isPage: beginindex, limit = filters.pop("_limit_")
164             for k, v in filters.items():
165                 if k.startswith("_in_"):                # 拼接 in
166                     FILTERS += "AND %s IN (" %(k[4:])
167                     values = v.split(",")
168                     for value in values:
169                         FILTERS += "%s,"%value
170                     FILTERS = FILTERS[0:len(FILTERS)-1] + ") "
171                 elif k.startswith("_nein_"):            # 拼接 not in
172                     FILTERS += "AND %s NOT IN (" %(k[4:])
173                     values = v.split(",")
174                     for value in values:
175                         FILTERS += "%s,"%value
176                     FILTERS = FILTERS[0:len(FILTERS)-1] + ") "
177                 elif k.startswith("_like_"):            # 拼接 like
178                     FILTERS += "AND %s like '%%%s%%' " %(k[6:], v)
179                 elif k.startswith("_ne_"):              # 拼接不等于
180                     FILTERS += "AND %s != '%s' " %(k[4:], v)
181                 elif k.startswith("_lt_"):              # 拼接小于
182                     FILTERS += "AND %s < '%s' " %(k[4:], v)
183                 elif k.startswith("_le_"):              # 拼接小于等于
184                     FILTERS += "AND %s <= '%s' " %(k[4:], v)
185                 elif k.startswith("_gt_"):              # 拼接大于
186                     FILTERS += "AND %s > '%s' " %(k[4:], v)
187                 elif k.startswith("_ge_"):              # 拼接大于等于
188                     FILTERS += "AND %s >= '%s' " %(k[4:], v)
189                 else:                # 拼接等于
190                     FILTERS += "AND %s='%s' "%(k, v)
191             sql = sql.replace("#{FILTERS}", FILTERS)
192             if isPage: sql += "LIMIT %d,%d"%(beginindex, limit)
193         return sql
194 
195     def __check_params(self, tableName):
196         '''
197         检查参数
198         '''
199         if tableName is None and self.__tableName is None:
200             raise Exception("Parameter [tableName] is None.")
201         elif self.__tableName is None or self.__tableName != tableName:
202             self.__tableName = tableName
203             self.__column_list = self.__table_column_dict_list[self.__tableName]
204 
205     def select_one(self, tableName=None, filters={}):
206         '''
207         查询单个对象
208         @tableName 表名
209         @filters 过滤条件
210         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value
211         '''
212         self.__check_params(tableName)
213         sql = self.__query_util(filters)
214         result = self.__exec_query(sql, single=True)
215         return self.__parse_result(result) 
216 
217     def select_pk(self, tableName=None, primaryKey=None):
218         '''
219         按主键查询
220         @tableName 表名
221         @primaryKey 主键值
222         '''
223         self.__check_params(tableName)
224         filters = {}
225         filters.setdefault(self.__getpk(tableName), primaryKey)
226         sql = self.__query_util(filters)
227         result = self.__exec_query(sql, single=True)
228         return self.__parse_result(result)
229         
230     def select_all(self, tableName=None, filters={}):
231         '''
232         查询所有
233         @tableName 表名
234         @filters 过滤条件
235         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value
236         '''
237         self.__check_params(tableName)
238         sql = self.__query_util(filters)
239         results = self.__exec_query(sql)
240         return self.__parse_results(results)
241 
242     def count(self, tableName=None):
243         '''
244         统计记录数
245         '''
246         self.__check_params(tableName)
247         sql = "SELECT count(*) FROM %s"%(self.__tableName)
248         result = self.__exec_query(sql, single=True)
249         return result[0]
250 
251     def select_page(self, tableName=None, pageNum=1, limit=10, filters={}):
252         '''
253         分页查询
254         @tableName 表名
255         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value
256         '''
257         self.__check_params(tableName)
258         totalCount = self.count(tableName)
259         if totalCount / limit == 0 :
260             totalPage = totalCount / limit
261         else:
262             totalPage = totalCount // limit + 1
263         if pageNum > totalPage:
264             print("最大页数为%d"%totalPage)
265             pageNum = totalPage
266         elif pageNum < 1:
267             print("页数不能小于1")
268             pageNum = 1
269         beginindex = (pageNum-1) * limit
270         filters.setdefault("_limit_", (beginindex, limit))
271         sql = self.__query_util(filters)
272         result_tuple = self.__exec_query(sql)
273         return self.__parse_results(result_tuple)
274 
275 if __name__ == "__main__":
276     config = {
277         # "creator": pymysql,
278         # "host" : "127.0.0.1", 
279         "user" : "root", 
280         "password" : "root",
281         "database" : "test", 
282         # "port" : 3306,
283         # "charset" : 'utf8'
284     }
285     base = BaseDao(**config)
286     ########################################################################
287     user = base.select_one("user")
288     print(user)
289     ########################################################################
290     # users = base.select_all("user")
291     # print(users)
292     ########################################################################
293     filter1 = {
294         "status":1,
295         "_in_id":"1,2,3,4,5",
296         "_like_name":"zhang",
297         "_ne_name":"wangwu"
298     }
299     user_filters = base.select_all("user", filter1)
300     print(user_filters)
301     ########################################################################
302     role = base.select_one("role")
303     print(role)
304     ########################################################################
305     user_pk = base.select_pk("user", 2)
306     print(user_pk)
307     ########################################################################
308     user_limit = base.select_page("user", 1, 10)
309     print(user_limit)
310     ########################################################################
View Code

相关文章:

  • 2022-12-23
  • 2021-10-08
  • 2022-02-26
  • 2022-12-23
  • 2022-12-23
  • 2021-10-14
  • 2022-02-08
猜你喜欢
  • 2021-05-26
  • 2022-12-23
  • 2022-02-27
  • 2022-12-23
  • 2021-08-25
  • 2021-09-15
  • 2021-05-27
相关资源
相似解决方案