FTP上传下载文件(函数简易版)
# 服务端
import socket
import json
import hashlib
import struct
import os
user_dic = {
\'太上老君\': \'123456\',
\'元始天尊\': \'123456\',
\'通天教主\': \'123456\'
}
server = socket.socket()
server.bind((\'192.168.13.19\', 2021))
server.listen(5)
conn, addr = server.accept()
def login(): # 登录
time = 3
while time > 0:
username, psw = conn.recv(1024).decode(\'utf-8\').split(\'|\')
if user_dic.get(username) == psw:
conn.send(\'True\'.encode(\'utf-8\'))
return True
else:
time -= 1
conn.send(str(time).encode(\'utf-8\'))
return False
def up(): # 上传
while 1:
len_head_dic = struct.unpack(\'i\', conn.recv(4))[0]
head_dic = json.loads(conn.recv(len_head_dic).decode(\'utf-8\'))
file_name = head_dic[\'file_name\']
file_md5 = head_dic[\'file_md5\']
file_size = head_dic[\'file_size\']
file_path = os.path.join(os.path.dirname(__file__), \'上传/\', file_name)
with open(file_path, mode=\'wb\') as f:
md5 = hashlib.md5()
len_data = 0
while len_data < file_size:
data = conn.recv(1024)
len_data += len(data)
md5.update(data)
f.write(data)
data_md5 = md5.hexdigest()
if data_md5 == file_md5:
conn.send(\'True\'.encode(\'utf-8\'))
return True
else:
conn.send(\'False\'.encode(\'utf-8\'))
def down(): # 下载
file_path = os.path.join(os.path.dirname(__file__), \'上传/\')
file_lst = os.listdir(file_path)
file_show = \'\'
for n, file in enumerate(file_lst, 1):
file_show += f\'序号:{n}\t文件名:{file}\n\'
to_client = f\'可下载文件:\n{file_show}\'.encode(\'utf-8\')
conn.send(to_client)
while 1:
from_client = conn.recv(1024).decode(\'utf-8\')
try:
with open(os.path.join(file_path, file_lst[int(from_client) - 1]), mode=\'rb\') as f:
file_size = 0
md5 = hashlib.md5()
while 1:
data = f.read(1024)
if data:
md5.update(data)
file_size += len(data)
else:
break
file_md5 = md5.hexdigest()
conn.send(\'True\'.encode(\'utf-8\'))
except Exception:
conn.send(\'False.\'.encode(\'utf-8\'))
continue
else:
head_dic = {
\'file_md5\': file_md5,
\'file_name\': file_lst[int(from_client) - 1],
\'file_size\': file_size
}
head_dic_json_bytes = json.dumps(head_dic).encode(\'utf-8\')
len_head_dic_bytes = struct.pack(\'i\', len(head_dic_json_bytes))
conn.send(len_head_dic_bytes)
conn.send(head_dic_json_bytes)
with open(os.path.join(file_path, file_lst[int(from_client) - 1]), mode=\'rb\') as f:
while 1:
data = f.read(1024)
if data:
conn.send(data)
else:
return True
def run(): # 主循环
if login():
while 1:
from_client = conn.recv(1024).decode(\'utf-8\')
if from_client == \'1\':
up()
elif from_client == \'2\':
down()
elif from_client == \'3\':
break
if __name__ == \'__main__\':
run()
# 客户端
import socket
import hashlib
import json
import struct
import os
client = socket.socket()
client.connect((\'192.168.13.19\', 2021))
def login(): # 登录
while 1:
username = input(\'请输入账号:\').strip()
psw = input(\'请输入密码:\').strip()
to_server = f\'{username}|{psw}\'.encode(\'utf-8\')
client.send(to_server)
from_server = client.recv(1024).decode(\'utf-8\')
if from_server == \'True\':
return True
elif from_server == \'0\':
return False
else:
print(f\'账号或密码错误,还有{from_server}次机会\')
def up(): # 上传
print(\'欢迎进入上传页面~~~~~~~~~~~~~~~~~~\')
while 1:
file_path = input(\'请输入上传的文件路径:\').strip()
try:
with open(file_path, mode=\'rb\') as f:
file_size = 0
md5 = hashlib.md5()
while 1:
data = f.read(1024)
if data:
md5.update(data)
file_size += len(data)
else:
break
file_md5 = md5.hexdigest()
except Exception:
print(\'路径错误,请重新输入.\')
continue
else:
head_dic = {
\'file_md5\': file_md5,
\'file_name\': os.path.basename(file_path),
\'file_size\': file_size
}
head_dic_json_bytes = json.dumps(head_dic).encode(\'utf-8\')
len_head_dic_bytes = struct.pack(\'i\', len(head_dic_json_bytes))
client.send(len_head_dic_bytes)
client.send(head_dic_json_bytes)
with open(file_path, mode=\'rb\') as f:
while 1:
data = f.read(1024)
if data:
client.send(data)
else:
break
from_server = client.recv(1024).decode(\'utf-8\')
if from_server == \'True\':
print(\'上传成功.\')
return True
else:
print(\'上传失败.\')
def down(): # 下载
print(\'欢迎进入下载页面~~~~~~~~~~~~~~~~~~\')
from_server = client.recv(1024).decode(\'utf-8\')
print(from_server)
while 1:
choice = input(\'请输入选择的序号:\').strip().encode(\'utf-8\')
client.send(choice)
return_from_server = client.recv(1024).decode(\'utf-8\')
if return_from_server == \'True\':
len_head_dic = struct.unpack(\'i\', client.recv(4))[0]
head_dic = json.loads(client.recv(len_head_dic).decode(\'utf-8\'))
file_name = head_dic[\'file_name\']
file_md5 = head_dic[\'file_md5\']
file_size = head_dic[\'file_size\']
file_path = os.path.join(os.path.dirname(__file__), \'下载/\', file_name)
with open(file_path, mode=\'wb\') as f:
md5 = hashlib.md5()
len_data = 0
while len_data < file_size:
data = client.recv(1024)
len_data += len(data)
md5.update(data)
f.write(data)
data_md5 = md5.hexdigest()
if data_md5 == file_md5:
print(\'下载成功.\')
return True
else:
print(\'下载失败.\')
else:
print(\'输入错误,请重新输入.\')
def run(): # 主循环
if login():
print(\'登陆成功~~~~~~~\')
print(\'欢迎来到主页面~~~~~~\')
while 1:
print(\'[1]上传\t[2]下载\t[3]退出\')
choice = input(\'请输入选项:\').strip()
client.send(choice.encode(\'utf-8\'))
if choice == \'1\':
up()
elif choice == \'2\':
down()
elif choice == \'3\':
print(\'正在退出...\')
break
else:
print(\'输入错误,请重新输入\')
else:
print(\'登录失败,正在退出....\')
if __name__ == \'__main__\':
run()