h520522

使用datacompy比较两个列表

需求: 判断DB的数据与EXCEL的数据是否完全一致

该需求用到的知识有点多

  • pandas读取SQL
  • pandas读取EXCEL
  • datacompy比较列表
  • pandas写EXCEL

开发前准备

pip install pymysql
pip install pandas
pip install sqlalchemy
pip install datacompy
pip install openpyxl

开发代码

  • 连接DB并获取数据
    def do_db(self):
        engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
        sql = "select lot_no as \'批号\' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = \'0\'"

        self.df1 = pd.read_sql_query(sql, engine)
        print(self.df1)
  • 读取EXCEL
    def do_excel(self):
        self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={\'批号\': str})
        self.df2 = self.df2.drop_duplicates() 
        print(self.df2)

PS:这里需要注意的是,使用datacompy比较的两个列表中不能又重复的数据,所以要使用self.df2.drop_duplicates()去重

  • 比较列表,并将差异存入EXCEL
    def dict_compare(self):
        self.do_db()
        self.do_excel()

        compare = datacompy.Compare(self.df1, self.df2, join_columns=[\'批号\'])

        # print(compare.matches())  # 最后判断是否相等,返回 bool
        # print(compare.report())  # 打印报告详情,返回 string
        # print(compare.report(sample_count=5000))  # 打印报告详情,返回 string

        df1_unq_rows = compare.df1_unq_rows
        df2_unq_rows = compare.df2_unq_rows

        writer = pd.ExcelWriter(self.file_name, engine=\'openpyxl\')
        writer.book = load_workbook(self.file_name)
        df1_unq_rows.to_excel(writer, sheet_name=\'EXCEL缺少的数据\')
        df2_unq_rows.to_excel(writer, sheet_name="DB缺少的数据")
        writer.save()
        writer.close()

查看datacompy文档

完整代码

#!/usr/bin/python3
# -*- encoding: utf-8 -*-
\'\'\'
@File        :检查.py
@Time        :2020/10/26 10:39:06
@Author      :He
@Software    :vsCode
\'\'\'


import pymysql
import time
import datetime
import uuid
import os
from sqlalchemy import create_engine
import pandas as pd
import datacompy
from openpyxl import load_workbook


class mysql_class:
    def __init__(self):

        self.host = \'IP\'
        self.port = \'端口\'
        self.passwd = \'密码\'
        self.user = \'root\'
        self.db = \'\'
        self.file_name = \'EXCEL.xlsx\'

    def do_db(self):
        engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
        sql = "select lot_no as \'批号\' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = \'0\'"

        self.df1 = pd.read_sql_query(sql, engine)
        print(self.df1)

    def do_excel(self):
        self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={\'批号\': str})
        self.df2 = self.df2.drop_duplicates()
        print(self.df2)

    def getCurrentTime(self):
        return time.strftime(\'%Y-%m-%d %H:%M:%S\', time.localtime(time.time()))

    def dict_compare(self):
        self.do_db()
        self.do_excel()

        compare = datacompy.Compare(self.df1, self.df2, join_columns=[\'批号\'])

        df1_unq_rows = compare.df1_unq_rows
        df2_unq_rows = compare.df2_unq_rows

        writer = pd.ExcelWriter(self.file_name, engine=\'openpyxl\')
        writer.book = load_workbook(self.file_name)
        df1_unq_rows.to_excel(writer, sheet_name=\'EXCEL缺少的数据\')
        df2_unq_rows.to_excel(writer, sheet_name="DB缺少的数据")
        writer.save()
        writer.close()


if __name__ == "__main__":
    os.chdir(os.path.abspath(os.path.dirname(__file__)))

    starttime = datetime.datetime.now()
    print(starttime)

    mysql_class = mysql_class()
    mysql_class.dict_compare()

    endtime = datetime.datetime.now()
    print(endtime)

    print(\'\n数据处理成功!所用时间为:\' + str((endtime - starttime).seconds))


分类:

技术点:

相关文章:

  • 2021-12-25
  • 2022-02-09
  • 2022-01-02
猜你喜欢
  • 2022-12-23
  • 2021-09-12
  • 2021-07-15
  • 2021-10-27
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案