zyyhxbs

FTP上传下载文件(函数简易版)

# 服务端
import socket
import json
import hashlib
import struct
import os

user_dic = {
    \'太上老君\': \'123456\',
    \'元始天尊\': \'123456\',
    \'通天教主\': \'123456\'
}

server = socket.socket()
server.bind((\'192.168.13.19\', 2021))
server.listen(5)
conn, addr = server.accept()
def login():  # 登录
    time = 3
    while time > 0:
        username, psw = conn.recv(1024).decode(\'utf-8\').split(\'|\')
        if user_dic.get(username) == psw:
            conn.send(\'True\'.encode(\'utf-8\'))
            return True
        else:
            time -= 1
            conn.send(str(time).encode(\'utf-8\'))
    return False

def up():   # 上传
    while 1:
        len_head_dic = struct.unpack(\'i\', conn.recv(4))[0]
        head_dic = json.loads(conn.recv(len_head_dic).decode(\'utf-8\'))
        file_name = head_dic[\'file_name\']
        file_md5 = head_dic[\'file_md5\']
        file_size = head_dic[\'file_size\']
        file_path = os.path.join(os.path.dirname(__file__), \'上传/\', file_name)
        with open(file_path, mode=\'wb\') as f:
            md5 = hashlib.md5()
            len_data = 0
            while len_data < file_size:
                data = conn.recv(1024)
                len_data += len(data)
                md5.update(data)
                f.write(data)
            data_md5 = md5.hexdigest()
        if data_md5 == file_md5:
            conn.send(\'True\'.encode(\'utf-8\'))
            return True
        else:
            conn.send(\'False\'.encode(\'utf-8\'))


def down():  # 下载
    file_path = os.path.join(os.path.dirname(__file__), \'上传/\')
    file_lst = os.listdir(file_path)
    file_show = \'\'
    for n, file in enumerate(file_lst, 1):
        file_show += f\'序号:{n}\t文件名:{file}\n\'
    to_client = f\'可下载文件:\n{file_show}\'.encode(\'utf-8\')
    conn.send(to_client)
    while 1:
        from_client = conn.recv(1024).decode(\'utf-8\')
        try:
            with open(os.path.join(file_path, file_lst[int(from_client) - 1]), mode=\'rb\') as f:
                file_size = 0
                md5 = hashlib.md5()
                while 1:
                    data = f.read(1024)
                    if data:
                        md5.update(data)
                        file_size += len(data)
                    else:
                        break
                file_md5 = md5.hexdigest()
                conn.send(\'True\'.encode(\'utf-8\'))
        except Exception:
            conn.send(\'False.\'.encode(\'utf-8\'))
            continue
        else:
            head_dic = {
                        \'file_md5\': file_md5,
                        \'file_name\': file_lst[int(from_client) - 1],
                        \'file_size\': file_size
                    }
            head_dic_json_bytes = json.dumps(head_dic).encode(\'utf-8\')
            len_head_dic_bytes = struct.pack(\'i\', len(head_dic_json_bytes))
            conn.send(len_head_dic_bytes)
            conn.send(head_dic_json_bytes)
            with open(os.path.join(file_path, file_lst[int(from_client) - 1]), mode=\'rb\') as f:
                while 1:
                    data = f.read(1024)
                    if data:
                        conn.send(data)
                    else:
                        return True

def run(): # 主循环
    if login():
        while 1:
            from_client = conn.recv(1024).decode(\'utf-8\')
            if from_client == \'1\':
                up()
            elif from_client == \'2\':
                down()
            elif from_client == \'3\':
                break
if __name__ == \'__main__\':
    run()
# 客户端
import socket
import hashlib
import json
import struct
import os

client = socket.socket()
client.connect((\'192.168.13.19\', 2021))

def login():  # 登录
    while 1:
        username = input(\'请输入账号:\').strip()
        psw = input(\'请输入密码:\').strip()
        to_server = f\'{username}|{psw}\'.encode(\'utf-8\')
        client.send(to_server)
        from_server = client.recv(1024).decode(\'utf-8\')
        if from_server == \'True\':
            return True
        elif from_server == \'0\':
            return False
        else:
            print(f\'账号或密码错误,还有{from_server}次机会\')

def up():   # 上传
    print(\'欢迎进入上传页面~~~~~~~~~~~~~~~~~~\')
    while 1:
        file_path = input(\'请输入上传的文件路径:\').strip()
        try:
            with open(file_path, mode=\'rb\') as f:
                file_size = 0
                md5 = hashlib.md5()
                while 1:
                    data = f.read(1024)
                    if data:
                        md5.update(data)
                        file_size += len(data)
                    else:
                        break
                file_md5 = md5.hexdigest()
        except Exception:
            print(\'路径错误,请重新输入.\')
            continue
        else:
            head_dic = {
                \'file_md5\': file_md5,
                \'file_name\': os.path.basename(file_path),
                \'file_size\': file_size
            }
            head_dic_json_bytes = json.dumps(head_dic).encode(\'utf-8\')
            len_head_dic_bytes = struct.pack(\'i\', len(head_dic_json_bytes))
            client.send(len_head_dic_bytes)
            client.send(head_dic_json_bytes)
            with open(file_path, mode=\'rb\') as f:
                while 1:
                    data = f.read(1024)
                    if data:
                        client.send(data)
                    else:
                        break
            from_server = client.recv(1024).decode(\'utf-8\')
            if from_server == \'True\':
                print(\'上传成功.\')
                return True
            else:
                print(\'上传失败.\')


def down():    # 下载
    print(\'欢迎进入下载页面~~~~~~~~~~~~~~~~~~\')
    from_server = client.recv(1024).decode(\'utf-8\')
    print(from_server)
    while 1:
        choice = input(\'请输入选择的序号:\').strip().encode(\'utf-8\')
        client.send(choice)
        return_from_server = client.recv(1024).decode(\'utf-8\')
        if return_from_server == \'True\':
            len_head_dic = struct.unpack(\'i\', client.recv(4))[0]
            head_dic = json.loads(client.recv(len_head_dic).decode(\'utf-8\'))
            file_name = head_dic[\'file_name\']
            file_md5 = head_dic[\'file_md5\']
            file_size = head_dic[\'file_size\']
            file_path = os.path.join(os.path.dirname(__file__), \'下载/\', file_name)
            with open(file_path, mode=\'wb\') as f:
                md5 = hashlib.md5()
                len_data = 0
                while len_data < file_size:
                    data = client.recv(1024)
                    len_data += len(data)
                    md5.update(data)
                    f.write(data)
                data_md5 = md5.hexdigest()
            if data_md5 == file_md5:
                print(\'下载成功.\')
                return True
            else:
                print(\'下载失败.\')
        else:
            print(\'输入错误,请重新输入.\')

def run(): # 主循环
    if login():
        print(\'登陆成功~~~~~~~\')
        print(\'欢迎来到主页面~~~~~~\')
        while 1:
            print(\'[1]上传\t[2]下载\t[3]退出\')
            choice = input(\'请输入选项:\').strip()
            client.send(choice.encode(\'utf-8\'))
            if choice == \'1\':
                up()
            elif choice == \'2\':
                down()
            elif choice == \'3\':
                print(\'正在退出...\')
                break
            else:
                print(\'输入错误,请重新输入\')
    else:
        print(\'登录失败,正在退出....\')
if __name__ == \'__main__\':
    run()

分类:

技术点:

相关文章: