出于某些原因,不能使用eveything这个软件,但是这个软件的功能是真挺好用的,免去了在文件海洋中寻找;
搜索模块
import os
import re
import subprocess
import time
'''
获取盘符列表,返回元组,元组第一个元素为bool值,表示函数成功与否,第二个值表示原因(失败的情况,字符串形式)或结果(成功的情况,列表形式)
'''
def get_DriveLetter():
result_str = subprocess.run('''powershell $my_path = $env:temp + '\Get-Volume.txt';(Get-Volume).DriveLetter |Out-File -Encoding utf8 $my_path''')
if result_str.returncode == 0:
env_temp_path_str = os.environ['TEMP'] + r'\Get-Volume.txt'
with open(env_temp_path_str,'r',encoding='utf-8_sig')as fd:
result = fd.read().splitlines()
return (True,result)
else:
return (False,"cmd_Fail")
'''
初始目录initial_path已被记录,本次不记录
因为重复的文件名或目录名,并且要考虑到以后有拓展信息字段的需求,所以数据形状设计为这样:{'name':[['path1','other'],['path2','other'],]}
规定,初始目录字符串结尾必须带\
2022年1月25日更新
代码没有更新,但是有个思路,这里不使用listdir进行访问,使用scandir进行获取目录下所有文件的信息,可以分辩文件和目录,便可以减少许多次IO调用,性能能提升3倍以上。
'''
def get_fileList(all_file_dict,initial_path):
if os.path.isdir(initial_path):
#部分目录不允许访问,不过这部分目录也不在我的需求范围内。
try:
child_file_list = os.listdir(initial_path)
except:
#print(initial_path)
return
for i in child_file_list:
if i in all_file_dict.keys():
all_file_dict[i].append([initial_path,])
else:
all_file_dict[i] = [[initial_path,],]
new_path_str = initial_path + i + '\\'
if os.path.isdir(new_path_str):
get_fileList(all_file_dict,new_path_str)
'''
将结果字典格式化输出到文件中存储。
输出到用户家目录下存放,名称叫eveything_data.data
'''
def Formatted_output(all_file_dict):
out_file_path_str = os.environ['USERPROFILE'] + 'eveything_data.data'
with open(out_file_path_str,'w',encoding='utf-8')as fd:
for key,value in all_file_dict.items():
for value_i in value:
line_str = key
for value_i_i in value_i:
line_str += "\t{}".format(value_i_i)
fd.write(line_str + '\n')
'''
直接输出
输出到用户家目录下存放,名称叫eveything_data.data
'''
def Direct_output(all_file_dict):
out_file_path_str = os.environ['USERPROFILE'] + 'eveything_data.data'
with open(out_file_path_str,'w',encoding='utf-8')as fd:
print(all_file_dict,file=fd)
'''
主控函数
'''
def main():
start_time = time.time()
result = get_DriveLetter()
if result[0]:
for i in result[1]:
get_fileList(all_file_dict = all_file_dict,initial_path = '{}:\\'.format(i))
#get_fileList(all_file_dict = all_file_dict,initial_path ='F:\\temp\\')
Formatted_output(all_file_dict)
#Direct_output(all_file_dict)
end_time = time.time()
print("扫描花费时间:{}".format(end_time-start_time))
return all_file_dict
'''
全局变量,主要是为了共享变量,与交互式命令行中的代码
'''
all_file_dict = {}
if __name__ == '__main__':
main()
人机交互模块
import 扫描模块 as saomiao
import re
import time
import threading
import os
from copy import _reconstruct
import colorama
'''
读取csv格式的数据文件(以制表符为分隔符的文件)
'''
def get_data_csv(filepath_str):
if os.path.exists(filepath_str):
with open(filepath_str,'r',encoding= 'utf-8-sig') as fd:
lines_list= fd.read().splitlines()
lines_list_ = [i.split('\t') for i in lines_list]
return lines_list_
else:
return []
'''
从扫描模块中直接获取其运行结果数据,这里会比较慢,大概在5-10分钟才能获取到结果
'''
def get_data_dict():
saomiao.main()
#检验是否含有中文字符
def is_contains_chinese(strs):
for _char in strs:
if '\u4e00' <= _char <= '\u9fa5':
return True
return False
#计算字符串中中文字符个数
def count_chinese_num(strs):
ret_num = 0
for _char in strs:
if '\u4e00' <= _char <= '\u9fa5':
ret_num +=1
return ret_num
'''
计算字符串中的非英文字符个数;
不咋个完美,字符问题真他妈头大,有些非英文字符只占用一个字节,有些又占用两个,喵了个咪的!
'''
def count_notASCII_num(strs):
return int((len(strs.encode()) - len(strs))/2)
'''
专门美化输出的函数
参数是列表类型
原本还计划着说根据命令行窗口大小来进行省略超长部分,但是我本就是查询路径信息,省略了我如何看呢,所以便不进行此操作
'''
def beautify_output(data_result,regex_str):
#查询出最长的文件名长度
name_Maxlen =0
for i in data_result:
if len(i[0]) > name_Maxlen:
name_Maxlen = len(i[0])
#获取命令行的高
terminal_high = os.get_terminal_size()[1]
#输出
num_line = 0 #记录输出的行数
for i in data_result:
#目前只有两列信息,名称和路径,后续需要的话再加即可
key_words_str = re.findall(regex_str,i[0],re.I)[0]
key_words_index = i[0].find(key_words_str)
print("{}\033[1;32;40m{}\033[0m{}{} | {}".format(i[0][0 : key_words_index],i[0][key_words_index : key_words_index + len(key_words_str)],i[0][key_words_index + len(key_words_str) :] , ' ' * (name_Maxlen - len(i[0]) - count_notASCII_num(i[0])),i[1]))
num_line += 1
if num_line % (terminal_high - 2) == 0: #这里并不是很准确的显示下一页,因为某些文件的路径过长会导致换行输出。
print("按下字母q结束输出,按其他键继续输出下一页:",end='')
in_str = input()
if in_str == 'Q' or in_str == 'q':
break
print("总计{}个结果!".format(len(data_result)))
print("输出完毕!\n\n")
'''
专门美化输出的函数v2
参数是列表类型
原本还计划着说根据命令行窗口大小来进行省略超长部分,但是我本就是查询路径信息,省略了我如何看呢,所以便不进行此操作
'''
def beautify_output_v2(data_result,regex_str):
#累积输出了多少行
exported_lines_num_int= 0
while True:
name_Maxlen =0
temp_list = []
#计算当前应该输出多少行
out_lines_num_int = int(os.get_terminal_size()[1]) - 2
if(out_lines_num_int + exported_lines_num_int) > len(data_result):
temp_list = data_result[exported_lines_num_int - 1:]
else:
if exported_lines_num_int == 0:
temp_list = data_result[exported_lines_num_int:out_lines_num_int]
else:
temp_list = data_result[exported_lines_num_int - 1 : out_lines_num_int + exported_lines_num_int]
for i in temp_list:
if len(i[0]) > name_Maxlen:
name_Maxlen = len(i[0])
for i in temp_list:
#目前只有两列信息,名称和路径,后续需要的话再加即可
key_words_str = re.findall(regex_str,i[0],re.I)[0]
key_words_index = i[0].find(key_words_str)
print("{}\033[1;32;40m{}\033[0m{}{} | {}".format(i[0][0 : key_words_index],i[0][key_words_index : key_words_index + len(key_words_str)],i[0][key_words_index + len(key_words_str) :] , ' ' * (name_Maxlen - len(i[0]) - count_notASCII_num(i[0])),i[1]))
exported_lines_num_int += out_lines_num_int
if exported_lines_num_int >= len(data_result):
break
print("按下字母q结束输出,按其他键继续输出下一页:",end='')
in_str = input()
if in_str == 'Q' or in_str == 'q':
break
print("总计{}个结果!".format(len(data_result)))
print("输出完毕!\n\n")
'''
处理用户输入和系统的提示性输出的,返回元祖,一个是原始输入的,另一个是处理之后的
'''
def Human_computer_interaction():
while True:
print('请输入:')
regex_str = input()
if regex_str == '':
continue
else:
replace_result_str = r'(?<=\t)[^\t]*?'
for index in range(len(regex_str)):
if regex_str[index] == '.':
if index != 0 and regex_str[index - 1] == '\\':
replace_result_str += regex_str[index]
else:
replace_result_str += r'[^\t]'
else:
replace_result_str += regex_str[index]
replace_result_str += r'[^\t]*?(?=\t)'
return (regex_str,replace_result_str)
'''
查询函数,从列表中读取
'''
def find_match(data,regex_str):
start_time = time.time()
result_list = []
for i in data:
result_regex = re.findall(regex_str,i[0],re.I)
if len(result_regex) != 0:
result_list.append(i)
print('查询花费:{}秒。'.format(time.time() - start_time))
return result_list
'''
查询函数,从字典中读取,并生成结果列表类型变量
'''
def find_match_dict(data,regex_str):
start_time = time.time()
result_list = []
if type({}) == type(data):
for key in data.keys():
result_regex = re.findall(regex_str,key,re.I)
if len(result_regex) != 0:
temp_ = {}
temp_[key] = data[key]
result_list += dict_Conversion_list(temp_)
else:
print("未传入字典类型变量!")
exit()
print('查询花费:{}秒。'.format(time.time() - start_time))
return result_list
'''
查询函数v2,从字典中读取,并生成结果列表类型变量
'''
def find_match_dict_v2(data,regex_str):
start_time = time.time()
result_list = []
if type({}) == type(data):
keys_list = data.keys()
keys_str = "\t"
keys_str += '\t'.join(keys_list)
keys_str += "\t"
result_key = re.findall(regex_str,keys_str,re.I)
for key in result_key:
temp_ = {}
temp_[key] = data[key]
result_list += dict_Conversion_list(temp_)
else:
print("未传入字典类型变量!")
exit()
print('查询花费:{}秒。'.format(time.time() - start_time))
return result_list
'''
将字典键和值转变为列表
'''
def dict_Conversion_list(dict_data):
if type({}) != type(dict_data):
return []
return_result = []
for key,value in dict_data.items():
for value_i in value:
temp_list = []
for value_i_i in value_i:
temp_list.append(key)
temp_list.append(value_i_i)
return_result.append(temp_list)
return return_result
'''
全局变量
'''
saomiao_result_data = ''
'''
深拷贝,copy库的deepcopy性能不太行,所以在deepcopy的基础上改了一下
这个深拷贝函数仅用于本脚本中,copy库中的deepcopy函数慢是有原因的,了解之后再使用。
'''
def my_deepcopy(x):
reductor = getattr(x, "__reduce_ex__", None)
if reductor is not None:
rv = reductor(4)
y = _reconstruct(x, None, *rv)
return y
def main():
colorama.init(autoreset=True) #启用颜色模块
#程序刚运行时使用之前扫描的格式化输出结果文件
data_file_path_str = r'C:\Users\Administratoreveything_data.data'
data = get_data_csv(data_file_path_str)
threading_Operating_condition = '' #子线程运行情况
saomiao_result_data = '' #扫描模块的结果
while True:
if threading_Operating_condition != '' and threading_Operating_condition.is_alive() == False:
#线程结束,可以获取数据了
_time_ = time.time()
#释放空间
if saomiao_result_data != '':
del saomiao_result_data
saomiao_result_data = my_deepcopy(saomiao.all_file_dict)
print("深拷贝所用时间:{}".format(time.time() - _time_))
#深拷贝完成之后清除全局变量的数据,避免出现重复值的bug,同时节约空间
saomiao.all_file_dict.clear()
#销毁从文件中读取的数据
if 'data' in locals().keys():
del data
print("数据量:".format(len(saomiao_result_data)))
#只有在线程结束或者未开始时才运行
if threading_Operating_condition == '' or threading_Operating_condition.is_alive() == False:
#get_data_dict()
threading_Operating_condition = threading.Thread(target = get_data_dict)
threading_Operating_condition.start()
user_in_tuple = Human_computer_interaction()
result_list = []
if saomiao_result_data != '':
result_list = find_match_dict_v2(data = saomiao_result_data,regex_str = user_in_tuple[1])
else:
result_list = find_match(data = data,regex_str = user_in_tuple[0])
beautify_output_v2(result_list,user_in_tuple[0])
if __name__ == '__main__':
main()
我很尽力的去做对齐了,但是这个超出了我能力范围,字符真是个好垃圾!
历经几次迭代和优化,现在查询96万个结果只需要3秒左右,优化显示效果,优化正则表达式匹配的速度
运行结果: