由于公司要求使用peewee模块,不能使用pymysql,而我又不是特别喜欢ORM的方式,很喜欢原生sql,所以,我决定重新封装peewee

 

代码如下:

from peewee import MySQLDatabase


class base_peewee(object):
    def __init__(self, host=None, port=3306, user=None, password=None, db_name=None):
        self.db_host = host
        self.db_port = int(port)
        self.user = user
        self.password = str(password)
        self.db = db_name
        self.conn = None
        self.cursor = None

    def connect(self):
        # self.conn = MySQLDatabase(
        #     host=self.db_host,
        #     port=self.db_port,
        #     user=self.user,
        #     passwd=self.password,
        #     database=self.db,
        #     charset="utf8"
        # )
        self.conn = MySQLDatabase(
            host="127.0.0.1",
            port=3306,
            user="root",
            passwd="123123qwe",
            database='test',
            charset="utf8"
        )
        self.cursor = self.conn.cursor()


class ReDefinedPeeWee(base_peewee):

    def __init__(self):
        super(ReDefinedPeeWee, self).__init__()
        self.connect()

    def commit(self):
        self.conn.commit()

    def rollback(self):
        self.conn.rollback()

    def insert_sql(self, sql, value=None, commit=None):
        self.cursor.execute(sql, value)
        if commit:
            self.commit()

    def update_sql(self, sql, value=None, commit=None):
        self.cursor.execute(sql, value)
        if commit:
            self.commit()

    def delete_sql(self, sql, value=None, commit=None):
        self.cursor.execute(sql, value)
        if commit:
            self.commit()

    def selectone_sql(self, sql, columns=None):
        """

        :param sql:
        :param columns: ['id', 'name'...] 要求sql的返回数据相同
        :return:
        """
        self.cursor.execute(sql)
        self.conn.commit()
        if not columns:
            return self.cursor.fetchone()
        else:
            data = self.cursor.fetchone()
            if data and len(data) == len(columns):
                return dict(zip(columns, data))
            else:
                return data

    def selectall_sql(self, sql, columns=None):
        self.cursor.execute(sql)
        self.conn.commit()
        if not columns:
            return self.cursor.fetchall()
        else:
            data = self.cursor.fetchall()

            if len(data) > 0 and len(data[0]) == len(columns):
                return [dict(zip(columns, i)) for i in data]
            else:
                return data

    def select_sql(self, sql, value=None, columns=None):
        self.cursor.execute(sql, value)
        self.conn.commit()
        return self.cursor.fetchall()

    def close(self):
        self.cursor.close()
        self.conn.close()
        self.conn = None
        self.cursor = None


def main():
    ret = ReDefinedPeeWee()
    res = ret.selectone_sql("select * from test", )
    print(res)
    res1 = ret.selectone_sql("select * from test", ["id", 'name', "num"])
    print(res1)
    ret.close()


if __name__ == '__main__':
    main()
View Code

相关文章:

  • 2022-01-29
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-10-10
猜你喜欢
  • 2021-07-19
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案