手撸web框架
简单的请求响应实现
要实现最简单的web框架,首先要对网络熟悉,首先HTTP协议是应用层的协议,只要我们给数据加上HTTP格式的响应报头,我们的数据就能基于socket进行实现了
import socket
sever = socket.socket()
sever.bind((\'127.0.0.1\',10000))
sever.listen(5)
while True:
conn,addr = sever.accept()
data = conn.recv(1024)
print(data)
#响应行
conn.send(b\'HTTP/1.1 200 OK\n\r\')
#响应头
conn.send(b\'name:zx\r\n\')
conn.send(b\'age:23\r\n\')
conn.send(b\'sex:man\r\n\')
#空行!区分头和响应体
conn.send(b\'\r\n\r\n\')
#响应体
conn.send(b\'<h1>Hello world!</h1>\')
#关闭连接
conn.close()
web框架的特点
我们熟悉的web框架其实都很类似,基本上也就三大块
路由选择-业务处理-ORM
路由选择
根据客户端的请求,跳转到响应的业务处理
业务处理
业务处理
数据库操作
网页模板渲染
ORM
数据库关系映射
代码实现
路由选择-urls.py
from views import *
urls = [
(\'/index\',index),
(\'/login\',login),
(\'/xxx\',xxx),
(\'/get_time\',get_time),
(\'/get_db\',get_db)
]
业务处理-views.py
from orm import Teacher
def index(env):
return \'index\'
def login(env):
return \'login\'
def error(env):
return \'404 error\'
def xxx(env):
return \'xxx\'
from datetime import datetime
def get_time(env):
current_time = datetime.now().strftime(\'%Y-%m-%d %X\')
with open(r\'C:\Users\Administrator\Desktop\01python\web\zx_web\time.html\',\'r\',encoding=\'utf-8\') as f:
data = f.read()
#模板HTML渲染
data = data.replace(\'$$time$$\',current_time)
return data
def get_db(env):
#ORM数据库操作
ret = Teacher.select(tid=1)[0]
print(ret)
return str(ret)
ORM
orm.py
from MySQL import MySQL
# 定义字段类
class Field(object):
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
class StringField(Field):
def __init__(self,name,
column_type=\'varchar=(255)\',
primary_key=False,
default=None):
super().__init__(name,column_type,primary_key,default)
class IntegerField(Field):
def __init__(self,
name,
column_type=\'int\',
primary_key=False,
default=None):
super().__init__(name, column_type, primary_key, default)
class ModelMetaClass(type):
print("ModelMetaClass")
def __new__(cls,class_name,class_base,class_attrs):
print("ModelMetaClass_new")
#实例化对象的时候也会执行,我们要把这一次拦截掉
if class_name == \'Models\':
#为了能让实例化顺利完成,返回一个空对象就行
return type.__new__(cls,class_name,class_base,class_attrs)
#获取表名
table_name = class_attrs.get(\'table_name\',class_name)
#定义一个存主键的的变量
primary_key = None
#定义一个字典存储字段信息
mapping = {}
#name=\'tid\',primary_key=True
#for来找到主键字段
for k,v in class_attrs.items():
#判断信息是否是字段
if isinstance(v,Field):
mapping[k] = v
#寻找主键
if v.primary_key:
if primary_key:
raise TypeError("主键只有一个")
primary_key=v.name
#将重复的键值对删除,因为已经放入了mapping
for k in mapping.keys():
class_attrs.pop(k)
if not primary_key:
raise TypeError("表必须要有一个主键")
class_attrs[\'table_name\']=table_name
class_attrs[\'primary_key\']=primary_key
class_attrs[\'mapping\']=mapping
return type.__new__(cls,class_name,class_base,class_attrs)
class Models(dict,metaclass=ModelMetaClass):
print("Models")
def __init__(self,**kwargs):
print(f\'Models_init\')
super().__init__(self,**kwargs)
def __getattr__(self, item):
return self.get(item,"没有该值")
def __setattr__(self, key, value):
self[key]=value
#查找
@classmethod
def select(cls,**kwargs):
ms=MySQL()
#如果没有参数默认是查询全部的
if not kwargs:
sql=\'select * from %s\'%cls.table_name
res=ms.select(sql)
else:
k = list(kwargs.keys())[0]
v = kwargs.get(k)
sql=\'select * from %s where %s=?\'%(cls.table_name,k)
#防sql注入
sql=sql.replace(\'?\',\'%s\')
res=ms.select(sql,v)
if res:
return [cls(**i) for i in res]
#新增
def save(self):
ms=MySQL()
#存字段名
fields=[]
#存值
values=[]
args=[]
for k,v in self.mapping.items():
#主键自增,不用给他赋值
if not v.primary_key:
fields.append(v.name)
args.append("?")
values.append(getattr(self,v.name))
sql = "insert into %s(%s) values(%s)"%(self.table_name,",".join(fields),",".join((args)))
sql = sql.replace(\'?\',\'%s\')
ms.execute(sql,values)
def update(self):
ms = MySQL()
fields = []
valuse = []
pr = None
for k,v in self.mapping.items():
#获取主键值
if v.primary_key:
pr = getattr(self,v.name,v.default)
else:
fields.append(v.name+\'=?\')
valuse.append(getattr(self,v.name,v.default))
print(fields,valuse)
sql = \'update %s set %s where %s = %s\'%(self.table_name,\',\'.join(fields),self.primary_key,pr)
sql = sql.replace(\'?\',"%s")
ms.execute(sql,valuse)
class Teacher(Models):
print("teacher")
table_name=\'teacher\'
tid = IntegerField(name=\'tid\',primary_key=True)
tname = StringField(name=\'tname\')
if __name__ == \'__main__\':
# tea=Teacher(tname="haha")
tea2=Teacher(tname="haha",tid=5)
# print(Teacher.select(tid=1))
# Teacher.save(tea)
Teacher.update(tea2)
MYSQL.py
import pymysql
class MySQL:
#单例模式
__instance = None
def __new__(cls, *args, **kwargs):
if not cls.__instance:
cls.__instance = object.__new__(cls)
return cls.__instance
def __init__(self):
self.mysql = pymysql.connect(
host=\'127.0.0.1\',
port=3306,
user=\'root\',
database=\'orm_demo\',
password=\'root\',
charset=\'utf8\',
autocommit=True
)
#获取游标
self.cursor = self.mysql.cursor(
pymysql.cursors.DictCursor
)
#查看
def select(self,sql,args=None):
print(sql,args)
#提交sql语句
self.cursor.execute(sql,args)
#获取查询的结果
res = self.cursor.fetchall()
return res
#提交
def execute(self,sql,args):
#提交语句可能会发生异常
print(sql,args)
try:
self.cursor.execute(sql,args)
except Exception as e:
print(e)
def close(self):
self.cursor.close()
self.mysql.close()
socket层
import socket
from urls import urls
from views import *
sever = socket.socket()
sever.bind((\'127.0.0.1\',10000))
sever.listen(5)
while True:
conn,addr = sever.accept()
#获取HTTP请求信息
data = conn.recv(1024)
data = data.decode(\'utf8\')
print(data)
#用户请求的路由
choice = data.split(\' \')[1]
#找到路由
func = None
for url in urls:
if choice == url[0]:
func = url[1]
break
if func:
res = func(data)
else:
res = \'<h1>404 error</h1>\'
#响应行
conn.send(b\'HTTP/1.1 200 OK\n\r\')
#响应头
conn.send(b\'name:zx\r\n\')
conn.send(b\'age:23\r\n\')
conn.send(b\'sex:man\')
#空行!区分头和响应体
conn.send(b\'\r\n\r\n\')
#响应体
conn.send(res.encode(\'utf8\'))
#关闭连接
conn.close()
总结
其实并不是所有内容都要自己写,Python有很多的模块可以帮我们实现许多的功能
wsgiref模块:封装的一个socket服务,只需要关注数据发送和接收,不需要太多的关注HTTP协议的部分
from wsgiref.simple_server import make_server
from urls import urls
from views import *
def run(env,response):
"""
:param env: 请求相关的所有数据
:param response: 响应相关的所有数据
:return:
"""
response(\'200 OK\',[])
# print(env)
current_path = env.get(\'PATH_INFO\')
# 先定义一个变量名 用来存储后续匹配到的函数名
func = None
# for循环 匹配后缀
for url in urls:
if current_path == url[0]:
func = url[1] # 一旦匹配成功 就将匹配到的函数名赋值给func变量
break # 主动结束匹配
# 判断func是否有值
if func:
res = func(env)
else:
res = error(env)
return [res.encode(\'utf-8\')]
if __name__ == \'__main__\':
server = make_server(\'127.0.0.1\',8080,run)
# 实时监听该地址 只要有客户端来连接 统一交给run函数去处理
server.serve_forever() # 启动服务端
jinja2模块:模板渲染功能
模板语法(极其贴近python后端语法)
<p>{{ user }}</p>
<p>{{ user.name }}</p>
<p>{{ user[\'pwd\'] }}</p>
<p>{{ user.get(\'hobby\') }}</p>
{% for user_dict in user_list %}
<tr>
<td>{{ user_dict.id }}</td>
<td>{{ user_dict.name }}</td>
<td>{{ user_dict.pwd }}</td>
</tr>
{% endfor %}