直接上源代码:
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
Created on : 2020/7/20 12:38
@Author : Miracle
@blog : https://blog.csdn.net/weixin_39633383
@github: https://github.com/Mr-Miracle
"""
import os
import datetime
from mailbox import Message, Mailbox
import pandas as pd
import pypyodbc as mdb
import wx
import time
from exchangelib import *
from exchangelib.protocol import BaseProtocol, NoVerifyHTTPAdapter
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
BaseProtocol.HTTP_ADAPTER_CLS = NoVerifyHTTPAdapter
# 读取MDB文件内容
def read_mdb(file):
# 连接mdb文件
conn_str = (r'Driver={Microsoft Access Driver (*.mdb)};DBQ='+file+';'
)
conn = mdb.win_connect_mdb(conn_str)
# 创建游标
port_info = []
cur = conn.cursor()
sql_str = "SELECT (SELECT FCODE FROM BaseInFo) AS PORTCODE,FDATE,(IIF ((SELECT COUNT(1) FROM FCWVCH WHERE FKMH IS NULL)=0, 0, 1)) AS RECORDS FROM GZB GROUP BY FDATE;"
cur.execute(sql_str)
gzb = cur.fetchall()
for i in gzb:
str1 = str(i[1])[0:10].replace('-', '') # 对日期的格式进行调整
port_info.append([i[0], str1, i[2]])
cur.close()
conn.close()
return port_info
# noinspection PyBroadException
def send_email(to, subject, body):
"""
电子邮件发送
:param to: 收件人邮箱
:param subject: 邮件主题
:param body: 邮件正文
:return:"""
cred = Credentials(
username="###", # 创建、申明邮箱账户
password="###",
)
config = Configuration(
server="###", # 发件服务器地址
credentials=cred,
auth_type=NTLM,
)
account = Account(
primary_smtp_address="###",
config=config,
autodiscover=False,
access_type=DELEGATE,
)
message = Message(
account=account, # 发件人、申明的账户
subject=subject, # 邮件主题
body=HTMLBody(body), # 邮件的内容
to_recipients=[Mailbox(email_address=to), ], # 收件人
)
try:
message.send_and_save()
print('邮件发送成功!')
except:
print('邮件发送失败!')
# 定义用户窗体的类
class AppFrame(wx.Frame):
def __init__(self, parent, title):
super(AppFrame, self).__init__(parent, title=title, size=(500, 480))
panel = wx.Panel(self)
# 定义用户的文本输入框等组件
self.text0 = wx.StaticText(panel, -1, label="MDB根目录:") # id = -1,表示窗口id自动生成
self.text1 = wx.StaticText(panel, -1, label="最新文件个数:")
self.text2 = wx.StaticText(panel, -1, label="托管行名称:")
self.text3 = wx.StaticText(panel, -1, label="年金计划:")
self.input0 = wx.TextCtrl(panel, -1, 'F:\TGH')
self.input1 = wx.TextCtrl(panel, -1, '7')
self.banks = ['所有托管', '交通银行', '浦发银行', '招商银行', '建设银行', '农业银行', '工商银行']
self.input2 = wx.ComboBox(panel, -1, choices=self.banks, value=self.banks[0])
self.input3 = wx.TextCtrl(panel, -1, '江西 山东 中央 湖南 安徽 新疆维吾尔 黑龙江')
self.execute_button = wx.Button(panel, -1, label="执行")
self.exit_button = wx.Button(panel, -1, label="退出")
# 创建文本域
self.multiText = wx.TextCtrl(panel, -1, size=(200, 100), style=wx.TE_MULTILINE | wx.TE_READONLY) # 创建一个文本控件,并设置只读
self.multiText.SetInsertionPoint(0) # 设置插入点
# and a status bar
self.CreateStatusBar()
self.SetStatusText("Created by Miracle on 2020.10")
# 定义各个组件的页面布局
v_box = wx.BoxSizer(wx.VERTICAL) # 实例化一个垂直盒子
h_box = wx.BoxSizer(wx.HORIZONTAL) # 实例化一个水平盒子
sizer = wx.FlexGridSizer(5, 2, 15, 30) # 设置5行2列,垂直间距15像素,水平间距20像素
sizer.AddMany([[self.text0, wx.ALIGN_LEFT], (self.input0, wx.EXPAND),
(self.text1, wx.ALIGN_LEFT), (self.input1, wx.EXPAND),
(self.text2, wx.ALIGN_LEFT), (self.input2, wx.EXPAND),
(self.text3, wx.ALIGN_LEFT), (self.input3, wx.EXPAND),
(self.execute_button, wx.Center), (self.exit_button, wx.Center),
])
h_box.Add(sizer, proportion=1, flag=wx.ALL | wx.EXPAND, border=5) # 将这组件插入水平盒子
v_box.Add(h_box, 0, wx.ALL | wx.CENTER, 5) # 在垂直盒子里添加水平盒子
v_box.Add(self.multiText, 1, wx.ALL | wx.EXPAND, 5) # 在垂直盒子里添加文本域
self.SetSizer(v_box) # 启用所有布局
self.Show() # 显示用户页面的窗体
self.Bind(wx.EVT_BUTTON, self.execute_event, self.execute_button) # 定义按钮的点击事件
self.Bind(wx.EVT_BUTTON, self.exit_event, self.exit_button) # 定义按钮的点击事件
def execute_event(self, event):
# 年金计划组合信息
ports = [###] ## 此部分为计划组合信息,数据安全原因,未列示
# 获取用户的输入值
root = self.input0.GetValue()
num = self.input1.GetValue()
annuity_bank = self.input2.GetValue()
annuity_plan = self.input3.GetValue()
# 判断托管行根目录路径是否存在,否则不执行
if os.path.exists(root):
check_plan0 = []
check_plan1 = []
for port in ports:
for s in port:
if annuity_plan:
for i in annuity_plan.split(" "):
if i in s:
check_plan0.append(port)
else: # 否则查询所有计划
check_plan0 = ports
if annuity_bank == '所有托管':
check_plan1 = ports
else: # 否则查询所有计划
for j in annuity_bank.split(" "):
if j in s:
check_plan1.append(port)
# 将2个查询条件的结果取交集,生成需要进行查询的计划组合list
check_plan = [val for val in check_plan0 if val in check_plan1]
result_data = []
for plan in check_plan:
self.write_log_to_text("开始检查:"+','.join(plan))
data = self.mdb_check(root, plan, int(num)) # 取到MDB的检查结果列表
result_data.append(data)
# 对list进行表格化的处理
msg_list = pd.DataFrame(result_data)
msg_list.columns = ['托管行', '年金计划', '组合名称', '组合代码', '日期', '到达情况', '检查结果']
msg = msg_list.to_html(bold_rows=True)
# 发送异常的提醒邮件
send_email('###', '周度数据MDB文件检查_异常数据' +self.get_now_time(), msg)
self.write_log_to_text("异常提醒邮件发送完成!")
else:
wx.MessageBox("托管行MDB文件的目录不存在!\n"+ root, "输入有误", wx.OK | wx.YES_DEFAULT)
event.Skip()
# MDB文件检查的主要逻辑
def mdb_check(self, mdb_root, port, num):
"""
:param mdb_root: MDB文件的根目录-/TGH
:param port: 需要检查的组合的基本信息的list
:param num: 检查的最新文件的个数
:return: 检查结果list
"""
check_list = [] # 程序的返回值,异常数据的list
file_load = os.path.join(mdb_root, port[1], port[5])
mdb_list = []
for root, sub, files in os.walk(file_load):
for file in files:
if file.endswith(".MDB"):
mdb_file = os.path.join(root, file)
mdb_list.append(mdb_file)
mdb_list.sort(reverse=True)
# 如果文件夹下有mdb文件
day_result = []
if len(mdb_list) > 0:
for i in range(num): # 取前num个文件来进行数据读取
print(datetime.datetime.now(), '读取MDB文件:', mdb_list[i])
self.write_log_to_text('读取MDB文件:' + mdb_list[i])
result = read_mdb(mdb_list[i])
for result in result:
day_result.append((result[1], result[2]))
print(datetime.datetime.now(), '取到的数据:\n', day_result)
else:
print(datetime.datetime.now(), '取到的数据: 找不到此组合的MDB文件!!!')
self.write_log_to_text('取到的数据: 找不到此组合的MDB文件!!!')
for i in self.get_last_week():
if (i, 0) in day_result:
# 当且仅当估值表中取到的日期和上周日期一致,估值表的科目字段不为空时显示正常
check_list = [port[0], str(port[2])[0:2], port[4], port[5], i, '已到达 ', '正常']
print(check_list)
self.write_log_to_text(' '.join(check_list))
elif (i, 1) in day_result:
check_list = [port[0], str(port[2])[0:2], port[4], port[5], i, '已到达 ', '异常']
print(check_list)
self.write_log_to_text(' '.join(check_list))
else:
check_list = [port[0], str(port[2])[0:2], port[4], port[5], i, '未到达 ', '异常']
print(check_list)
self.write_log_to_text(' '.join(check_list))
return check_list
def exit_event(self, event):
"""Close the frame, terminating the application."""
self.Close(True)
event.Skip()
# 获取当前时间
@staticmethod
def get_now_time():
current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
return current_time
# 日志动态打印
def write_log_to_text(self, log_msg):
current_time = self.get_now_time()
log_msg_in = "INFO:" + str(current_time) + " " + log_msg + "\n" # 换行
self.multiText.AppendText(log_msg_in)
# 取上周7天的数
@staticmethod
def get_last_week():
now = datetime.datetime.now()
day_list = []
for i in range(7):
time_stamp = now - datetime.timedelta(days=now.weekday() + (7 - i))
str_day = time_stamp.strftime("%Y%m%d")
day_list.append(str_day)
return day_list
if __name__ == '__main__':
app = wx.App() # 创建应用程序对象
frame = AppFrame(None, title="周度MDB文件数据检查_v1.0")
frame.Center() # 设置主窗口居中对齐
app.MainLoop()
项目效果图: