前言:
Python主流WEB框架对比
Django:1个重武器,包含了web开发中常用的功能、组件的框架;(ORM、Session、Form、Admin、分页、中间件、信号、缓存、ContenType....);
Tornado:2大特性就是异步非阻塞、原生支持WebSocket协议;
Flask:封装功能不及Django完善,性能不及Tornado,但是Flask可扩展性强,因为flask的第三方开源组件丰富;
Bottle:比较简单;
结论
都不是我写的!!!不论优劣,不同的工具而已;
小型web应用设计的功能点不多使用Flask;
大型web应用设计的功能点比较多使用的组件也会比较多,使用Django(自带功能多不用去找插件);
如果追求性能可以考虑Tornado;
WSGI介绍
Django使用wsgiref模块Flask使用Werkzeug模块实现了WSGI协议。
HTTP协议是建立在TCP协议之上的,实现UWSGI协议本质是实现Socket服务端,也就是TCP层实现HTTP请求的接收、 对请求内容进行预处理如分割HTPP请求内容的Header和body,然后再触发后面的Django/Flask框架。
Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架
pip3 install flask #安装flask
Flask框架是基于werkzeug模块实现的。
from werkzeug.wrappers import Request,Response from werkzeug.serving import run_simple @Request.application def hello(request): return Response("Hello World") if __name__ == \'__main__\': #请求一旦到来,执行第3个参数,hello(上下文) run_simple(\'localhost\', 4000, hello)
Flask简单使用
from flask import Flask app=Flask(__name__) #创建1个Flask实例 @app.route(\'/\') #路由系统生成 视图对应url,1. decorator=app.route() 2. decorator(first_flask) def first_flask(): #视图函数 return \'Hello World\' #response if __name__ == \'__main__\': app.run() #启动socket
一、配置文件
app=Flask(__name__,template_folder=\'templates\',static_url_path=\'/static/\',static_path=\'/zhanggen\')
模板路径: template_folder=\'templates\'
静态文件路径:static_url_path=\'/static/\'
静态文件引入别名:static_path=\'/zhanggen\'
设置为调试环境:app.debug=True (代码修改自动更新)
设置json编码格式 如果为False 就不使用ascii编码:app.config[\'JSON_AS_ASCII\']=False
设置响应头信息Content-Type app.config[\'JSONIFY_MIMETYPE\'] ="application/json;charset=utf-8" (注意 ;charset=utf-8)
二、路由系统
1.动态路由(url传参)
@app.route(\'/user/<name>\')
from flask import Flask app=Flask(__name__) @app.route(\'/<name>\') #设置url传参数 http://127.0.0.1:5000/zhanggen def first_flask(name): #视图必须有对应接收参数 print(name) return \'Hello World\' #response if __name__ == \'__main__\': app.run()
@app.route(\'/post/<int:age>\')
#接收整型数字参数 app=Flask(__name__) @app.route(\'/<int:age>/\') #设置url传参数 http://127.0.0.1:5000/18/ def first_flask(age): #视图必须有对应接收参数 print(age) return \'Hello World\' #response if __name__ == \'__main__\': app.run()
@app.route(\'/post/<float:salary>\')
#接收浮点型型数字参数 app=Flask(__name__) @app.route(\'/<float:salary>/\') #设置url传参数http://127.0.0.1:5000/2345555.8889/ def first_flask(salary): #视图必须有对应接收参数 print(salary) return \'Hello World\' #response if __name__ == \'__main__\': app.run()
@app.route(\'/post/<path:path>\')
# 接收URL链接类型参数 app=Flask(__name__) @app.route(\'/<path:url>/\') #设置url传参数:http://127.0.0.1:5000/http://www.baiu.com/ def first_flask(url): #视图必须有对应接收参数 print(url) return \'Hello World\' #response if __name__ == \'__main__\': app.run()
2、指定允许的请求方法
@app.route(\'/login\', methods=[\'GET\', \'POST\'])
# 指定允许的请求方法 app=Flask(__name__) @app.route(\'/<path:url>/\',methods=[\'get\']) #只允许get请求 def first_flask(url): print(url) return \'Hello World\' #response if __name__ == \'__main__\': app.run()
3、通过别名反向生成url
#反向生成url from flask import Flask,url_for app=Flask(__name__) @app.route(\'/<path:url>\',endpoint=\'name1\') def first_flask(url): print(url_for(\'name1\',url=url)) #如果设置了url参数,url_for(别名,加参数) return \'Hello World\' if __name__ == \'__main__\': app.run()
4、通过app.add_url_rule()调用路由
#方式2通过app.add_url_rule()方法的方式调用路由 app=Flask(__name__) def first_flask(): return \'Hello World\' app.add_url_rule(rule=\'/index/\',endpoint=\'name1\',view_func=first_flask,methods=[\'GET\']) #app.add_url_rule(rule=访问的url,endpoint=路由别名,view_func=视图名称,methods=[允许访问的方法]) if __name__ == \'__main__\': app.run()
5、扩展路由功能:正则匹配url
如果需要一些复杂的匹配规则可以自定义正则匹配url
from flask import Flask, views, url_for from werkzeug.routing import BaseConverter app = Flask(import_name=__name__) class RegexConverter(BaseConverter): """ 自定义URL匹配正则表达式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 :param value: :return: """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数 :param value: :return: """ val = super(RegexConverter, self).to_url(value) return val # 添加到flask中 app.url_map.converters[\'regex\'] = RegexConverter @app.route(\'/index/<regex("\d+"):nid>\') def index(nid): print(url_for(\'index\', nid=\'888\')) return \'Index\' if __name__ == \'__main__\': app.run()
四、视图
1、给Flask视图函数加装饰器
注意如果要给视图函数加装饰器增加新功能,一点要加在路由装饰器下面,才会被路由装饰器装饰,才能生生成url关系;
#给Flask视图加装饰器 #1、定义1个装饰器 def auth(func): print(\'我在上面\') def inner(*args,**kwargs): return func(*args,**kwargs) return inner app=Flask(__name__) @app.route(\'/\',methods=[\'GET\']) @auth #注意如果要给视图函数加装饰器,一点要加在路由装饰器下面,才会被路由装饰器装饰 def first_flask(): print(\'ffff\') return \'Hello World\' if __name__ == \'__main__\': app.run()
2、request和response
a.请求相关信息
request.method: 获取请求方法
request.json
request.json.get("json_key"):获取json数据 **较常用
request.argsget(\'name\') :获取get请求参数
request.form.get(\'name\') :获取POST请求参数
request.form.getlist(\'name_list\'):获取POST请求参数列表(多个)
request.values.get(\'age\') :获取GET和POST请求携带的所有参数(GET/POST通用)
request.cookies.get(\'name\'):获取cookies信息
request.headers.get(\'Host\'):获取请求头相关信息
request.path:获取用户访问的url地址,例如(/,/login/,/ index/);
request.full_path:获取用户访问的完整url地址+参数 例如(/login/?age=18)
request.script_root: 抱歉,暂未理解其含义;
request.url:获取访问url地址,例如http://127.0.0.1:5000/?age=18;
request.base_url:获取访问url地址,例如 http://127.0.0.1:5000/;
request.url_root
request.host_url
request.host:获取主机地址
request.files:获取用户上传的文件
obj = request.files[\'the_file_name\']
obj.save(\'/var/www/uploads/\' + secure_filename(f.filename)) 直接保存
b、响应相关信息
return "字符串" :响应字符串
return render_template(\'html模板路径\',**{}):响应模板
return redirect(\'/index.html\'):跳转页面
响应json数据
方式1: return jsonify(user_list)
app.config[\'JSON_AS_ASCII\']=False #指定json编码格式 如果为False 就不使用ascii编码, app.config[\'JSONIFY_MIMETYPE\'] ="application/json;charset=utf-8" #指定浏览器渲染的文件类型,和解码格式;
方式2:
return Response(data,mimetype="application/json;charset=utf-8",)
如果需要设置响应头就需要借助make_response()方法
from flask import Flask,request,make_response
response = make_response(render_template(\'index.html\'))
response是flask.wrappers.Response类型
response.delete_cookie(\'key\')
response.set_cookie(\'key\', \'value\')
response.headers[\'X-Something\'] = \'A value\'
return respons
3 、Flask之CBV视图
#CBV视图 from flask import Flask,url_for,views #----------------------------------------------------- app=Flask(__name__) #装饰器 def auth(func): print(\'我在上面\') def inner(*args,**kwargs): return func(*args,**kwargs) return inner #-------------------------------------------------------- class IndexView(views.MethodView): #CBV视图 methods=[\'GET\'] #允许的http请求方法(改CBV只允许GET方法) decorators = [auth,] #每次请求过来都加auth装饰器 def get(self): return \'Index.GET\' def post(self): return \'Index.POST\' app.add_url_rule(\'/index/\',view_func=IndexView.as_view(name=\'name1\')) #(name=\'name1\'反向生成url别名 if __name__ == \'__main__\': app.run()
五、模板语言
Flask使用的是Jinja2模板,所以其语法和Django无差别(Django的模板语言参考Jinja2)
1.引用静态文件
方式1:别名引入
<link rel="stylesheet" href="/zhanggen/commons.css">
方式2:url_for()方法引入
<link rel="stylesheet" href="{{ url_for(\'static\',filename=\'commons.css\') }}">
2.模板语言引用上下文对象
变量
<h1>{{user_list}}</h1> <!--变量 -->
循环、索引取值
<ul> {% for user in user_list %} <!--循环 --> <li>{{user}}</li> {% endfor %} {{user_list.0}} <!-- 索引取值--> </ul>
Flask的Jinjia2可以通过Context 把视图中的函数传递把模板语言中执行,这就是Django中的simple_tag和simple_fifter;
simple_tag(只能传2个参数,支持for、if)
@app.template_global() #simple_tag def foo(arg): return \'<input type="text">\'
<h1>{{foo(1)|safe}}</h1> <!--Flask的模板语言支持simple_tag-->
simple_fifter(对参数个数无限制,不支持for、if)
@app.template_filter() #simple_fifter def foo1(arg1,arg2,arg3): return arg1+arg2+arg3
<h1> {{ \'alex\'|foo1(\'s \',\'b\',) }} </h1> <!-- simple_fifter -->
3.wtform(flask表单验证插件)
3.0.简介
wtforms WTForms是一个支持多个web框架的form组件,主要对用户请求数据 进行表单验证。
3.1. 安装
pip install wtforms #安装wtfroms插件
3.2.简单使用
wtforms和Django自带的form验证插件功能相同,使用起来大同小异;
用户登录页面验证
#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app=Flask(__name__,template_folder=\'templates\') #知道模板文件 app.debug=True #登录验证实例 class LoginForm(Form): #不同的字段 内部包含正则表达式 html5.EmailField | html5.DateTimeField... name=simple.StringField( label=\'用户名\', validators=[ #验证规则和错误提示信息 validators.DataRequired(message=\'用户名不能为空.\'), validators.Length(min=6, max=18, message=\'用户名长度必须大于%(min)d且小于%(max)d\') ], widget=widgets.TextInput(), #前端页面显示的插件.TextArea render_kw={\'class\': \'form-control\'} #设置form标签的class信息 ) # 不同的字段 内部包含正则表达式 html5.EmailField | html5.DateTimeField... pwd = simple.PasswordField( label=\'密码\', validators=[ validators.DataRequired(message=\'密码不能为空.\'), validators.Length(min=8, message=\'用户名长度必须大于%(min)d\'), #自定义验证规则 validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message=\'密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符\') ], widget=widgets.PasswordInput(), render_kw={\'class\': \'form-control\'} ) @app.route(\'/login/\', methods=[\'GET\', \'POST\']) def login(): if request.method == \'GET\': form = LoginForm() #实例化 form验证类 return render_template(\'login.html\', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): #判断是否验证成功? print(\'用户提交数据通过格式验证,提交的值为:\', form.data) else: print(form.errors) return render_template(\'login.html\', form=form) if __name__ == \'__main__\': app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登录</h1> <form method="post" novalidate> <!--<input type="text" name="name">--> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <!--<input type="password" name="pwd">--> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html> login.html
用户注册页面验证
#用户注册 from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder=\'templates\') app.debug = True class RegisterForm(Form): name = simple.StringField( label=\'用户名\', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={\'class\': \'form-control\'}, default=\'张根\' #设置input标签中默认值 ) pwd = simple.PasswordField( label=\'密码\', validators=[ validators.DataRequired(message=\'密码不能为空.\') ], widget=widgets.PasswordInput(), render_kw={\'class\': \'form-control\'} ) pwd_confirm = simple.PasswordField( #第二次输入密码 label=\'重复密码\', validators=[ validators.DataRequired(message=\'重复密码不能为空.\'), validators.EqualTo(\'pwd\', message="两次密码输入不一致") #验证2次输入的密码是否一致? ], widget=widgets.PasswordInput(), render_kw={\'class\': \'form-control\'} ) email = html5.EmailField( label=\'邮箱\', validators=[ validators.DataRequired(message=\'邮箱不能为空.\'), validators.Email(message=\'邮箱格式错误\') ], widget=widgets.TextInput(input_type=\'email\'), #生成email input标签 render_kw={\'class\': \'form-control\'} ) gender = core.RadioField( label=\'性别\', choices=( #choice radio选项 (1, \'男\'), (2, \'女\'), ), coerce=int #讲用户提交过来的 \'4\' 强制转成 int 4 ) city = core.SelectField( label=\'城市\', choices=( (\'bj\', \'北京\'), (\'sh\', \'上海\'), ) ) hobby = core.SelectMultipleField( #select 下拉框多选框 label=\'爱好\', choices=( (1, \'篮球\'), (2, \'足球\'), ), coerce=int ) favor = core.SelectMultipleField( label=\'喜好\', choices=( (1, \'篮球\'), (2, \'足球\'), ), widget=widgets.ListWidget(prefix_label=False), #生成Checkbox 多选框 option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): #重写form验证类的__init__方法可以实时同步数据中数据 super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, \'篮球\'), (2, \'足球\'), (3, \'羽毛球\')) def validate_pwd_confirm(self, field): #wtforms验证 钩子函数 """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data[\'pwd\']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 不再继续后续验证 @app.route(\'/register/\', methods=[\'GET\', \'POST\']) def register(): if request.method == \'GET\': form = RegisterForm(data={\'gender\': 1}) #默认值 return render_template(\'register.html\', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print(\'用户提交数据通过格式验证,提交的值为:\', form.data) else: print(form.errors) return render_template(\'register.html\', form=form) if __name__ == \'__main__\': app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
3.2.wtforms源码 猜想....
A.自动生成html标签
先来分析一下form验证类的结构
LoginForm类中包含了2个字段: name 和 pwd,而name / pwd字段 = 对象,所以LoginForm 类包含了2个对象;
如果实例化了obj=LoginForm() 就等于 在 这1个对象中嵌套了 2个对象;
前端使用Form验证插件:
那如果在前端for循环LoginForm对象,就等于调用LoginForm对象的__iter__方法,把每个字段(对象)封装的数据 返回
如果前端{{ obj }}= 直接调用了字段对象的__str__方法;
class InputText(object): #插件 def __str__(self): return \'<input type="text" />\' class InputPassword(object): def __str__(self): return \'<input type="password" />\' #----------------------------------------------------------- class StringField(object): #字段 def __init__(self,wg): self.widget=wg def __str__(self): #调用插件的__str__ return str(self.widget) class DateField(object): def __init__(self, wg): self.widget = wg def __str__(self): return str(self.widget) #-------------------------------------------------------------- class LoginForm(object): #统一 灵活接口 (对象嵌套对象,多层封装) name=StringField(wg=InputText()) #wg=InputText() 对象 StringField(wg=InputText())对象 pwd=DateField(wg=InputPassword()) l_obj=LoginForm() print(l_obj.name) print(l_obj.pwd)
B.数据校验
后台定义好正则
用户发来数据
对数据进行校验
3.3.源码流程
生成HTML标签并显示
1.验证类(LogibForm)生成
1.1.由于 metaclass=FormMeta,所以LoginForm是由FormMeta创建的
\'\'\' class BaseForm(): pass class NewBase(BaseForm,metaclass=FormMeta,): pass class Form(NewBase): pass class LoginForm(Form): pass \'\'\' class Form(with_metaclass(FormMeta,BaseForm)):
1.2.执行FormMeta 的__init__方法,在LoginForm中添加2个静态字段
class FormMeta(type): """ The metaclass for `Form` and any subclasses of `Form`. `FormMeta`\'s responsibility is to create the `_unbound_fields` list, which is a list of `UnboundField` instances sorted by their order of instantiation. The list is created at the first instantiation of the form. If any fields are added/removed from the form, the list is cleared to be re-generated on the next instantiation. Any properties which begin with an underscore or are not `UnboundField` instances are ignored by the metaclass. """ def __init__(cls, name, bases, attrs): type.__init__(cls, name, bases, attrs) #继承type的功能 cls._unbound_fields = None #在LoginForm中添加1个静态字段 cls._wtforms_meta = None #在LoginForm中添加1个静态字段
1.3.开始解释LoginForm中的 实例化字段对象name=simple.StringField()simple.PasswordField()
StringField/PasswordField开始实例化(提到实例化就应该想到:指定元类的__call__、自己/父类的__new__、__init__):
StringField/PasswordField是默认元类,自己没有__new__和__init__方法;
但父类Field类中有__new__方法,所以执行父类的__new__(Field.__new__)返回UnboundField对象
def __new__(cls, *args, **kwargs):#执行__new__方法 if \'_form\' in kwargs and \'_name\' in kwargs: return super(Field, cls).__new__(cls) else: #我x 没想到 ! __new__既然返回了1个 UnboundField()而不是StringField/PasswordField对象;狸猫换了太子 ? return UnboundField(cls, *args, **kwargs)
由于Field.__new__方法返回了 1个 UnboundField对象,来看 UnboundField的__init__方法
class UnboundField(object): _formfield = True creation_counter = 0 #静态字段 设置计数器 def __init__(self, field_class, *args, **kwargs): #field_class=.StringField / PasswordField #获取到field_class 的 参数封装到 UnboundField对象中,并且设置 排序 \'creation_counter\': 2 UnboundField.creation_counter += 1 #每实例化1个 UnboundField对象 计数器+1 self.field_class = field_class self.args = args self.kwargs = kwargs #{\'label\': \'用户名\', \'validators\': [<wtforms.validators.DataRequired object at 0x00000000038EF080>, <wtforms.validators.Length object at 0x00000000038EF0F0>], \'widget\': <wtforms.widgets.core.TextInput object at 0x00000000038EF0B8>, \'render_kw\': {\'class\': \'form-control\'}} self.creation_counter = UnboundField.creation_counter# \'\'\' print(self.__dict__) { \'field_class\': <class \'wtforms.fields.simple.PasswordField\'>, \'args\': (), \'kwargs\': {\'label\': \'密码\', \'validators\': [<wtforms.validators.DataRequired object at 0x00000000038EF198>, <wtforms.validators.Length object at 0x00000000038EF1D0>, <wtforms.validators.Regexp object at 0x00000000038EF208>], \'widget\': <wtforms.widgets.core.PasswordInput object at 0x00000000038EF2B0>, \'render_kw\': {\'class\': \'form-control\'}}, \'creation_counter\': 2 } \'\'\'
UnboundField的__init__方法在 UnboundField对象中封装了Field类的参数和计数器,所以现在LoginForml类中封装数据如下
""" print(LoginForm.__dict__) LoginForm ={ \'__module__\': \'__main__\', \'name\': <1 UnboundField(StringField, (),{\'creation_counter\': 1, \'label\': \'用户名\', \'validators\': [<wtforms.validators.DataRequired object at 0x00000000037DAEB8>, <wtforms.validators.Length object at 0x000000000382B048>], \'widget\': <wtforms.widgets.core.TextInput object at 0x000000000382B080>, \'render_kw\': {\'class\': \'form-control\'} })>, \'pwd\': <2 UnboundField(PasswordField, (),{\'creation_counter\': 2,\'label\': \'密码\', \'validators\': [<wtforms.validators.DataRequired object at 0x000000000382B0F0>, <wtforms.validators.Length object at 0x000000000382B128>, <wtforms.validators.Regexp object at 0x000000000382B160>], \'widget\': <wtforms.widgets.core.PasswordInput object at 0x000000000382B208>, \'render_kw\': {\'class\': \'form-control\'}})>, \'__doc__\': None, \'_unbound_fields\': None, \'_wtforms_meta\': None, } """
启发:
不一定要把代码都写在当前类中,如过多个类和类之间有同性方法、属性可以抽出来集中到父类之中;子类继承父类所以子类实例化对象之后,继承享有2者的属性和方法;所以看源码遇到继承一点要注意 观察父类;
每个对象实例化(在排除MetaClass的情况下)都会执行 父类的__new__方法,再去执行__init__方法;而__new__实质性决定了实例化出来的对象是神马?
class Queen(object): def __new__(cls, *args, **kwargs): #类中__new__方法决定了类(),实例化出什么对象; return Cat(\'狸猫\',\'男\',\'太子\') def __init__(self,name): #由于__nwe__方法返回了其他对象,所以不会执行Queen的__init__方法 print(\'ok\') self.name=name Prince=Queen(\'王子\') print(Prince.name) print(Prince.gender) print(Prince.identity)
2.LoginForm实例化
谈到类实例化应该先检查该类是否指定了 Meta类,如果指定了Meta类, 就需要先执行 (指定元类的__call__、自己/父类的__new__、__init__)
21.执行FormMeta的__call__方法,赋值LoginForm的_unbound_fields 和 _wtforms_meta属性;
根据unbound对象的creation_counter属性对 LoginForm中的字段进行排序,并填充到 LoginForm的_unbound_fields属性中
根据 LoginForm的__mro__继承顺序:获取当前类(FormLogin)所有父类,并在每个父类中 提取Meta属性添加到列表,转成元组,最后创建Meta类让其继承,赋值LoginForm._wtforms_meta属性
def __call__(cls, *args, **kwargs): if cls._unbound_fields is None: #在创建类时 已经设置LoginForm的_unbound_fields为空 fields = [] # 获取LoginForm类中,中所有属性的key:[ \'_get_translations\', \'_unbound_fields\', \'_wtforms_met,\'name\', \'populate_obj\', \'process\', \'pwd\', \'validate\'..... ] for name in dir(cls): if not name.startswith(\'_\'): #排除__下划线的私有属性 name. pwd unbound_field = getattr(cls, name) #cls =LoginForm类 #根据key 获取unbound_field 对象 if hasattr(unbound_field, \'_formfield\'): #检查unbound_field 对象是否包含_formfield = True fields.append((name, unbound_field)) # \'\'\' # fields = [ # (name,name的unbound对象), # (pwd,pwd的unbound对象), # ] # \'\'\' #对fields 按照定义顺序 进行排序 fields.sort(key=lambda x: (x[1].creation_counter, x[0])) #根据unbound对象的creation_counter进行字段排序 cls._unbound_fields = fields if cls._wtforms_meta is None: bases = [] #bases = [DefaultMeta], # 按照继承顺序:获取当前类(FormLogin)所有父类 for mro_class in cls.__mro__: if \'Meta\' in mro_class.__dict__: #去每个父类(mro_class)获取 Meta = DefaultMeta bases.append(mro_class.Meta) #bases = [DefaultMeta], \'\'\' class Meta(DefaultMeta): pass \'\'\' cls._wtforms_meta = type(\'Meta\', tuple(bases), {}) #cls._wtforms_meta=Meta(DefaultMeta)类: return type.__call__(cls, *args, **kwargs)
执行完了指定元类 FormMeta.__call__()方法之后的LoginForm类中封装的数据
print(LoginForm.__dict__) LoginForm ={ \'__module__\': \'__main__\', \'name\': <1 UnboundField(StringField, (),{\'creation_counter\': 1, \'label\': \'用户名\', \'validators\': [<wtforms.validators.DataRequired object at 0x00000000037DAEB8>, <wtforms.validators.Length object at 0x000000000382B048>], \'widget\': <wtforms.widgets.core.TextInput object at 0x000000000382B080>, \'render_kw\': {\'class\': \'form-control\'} })>, \'pwd\': <2 UnboundField(PasswordField, (),{\'creation_counter\': 2,\'label\': \'密码\', \'validators\': [<wtforms.validators.DataRequired object at 0x000000000382B0F0>, <wtforms.validators.Length object at 0x000000000382B128>, <wtforms.validators.Regexp object at 0x000000000382B160>], \'widget\': <wtforms.widgets.core.PasswordInput object at 0x000000000382B208>, \'render_kw\': {\'class\': \'form-control\'}})>, \'__doc__\': None, \'_unbound_fields\': [ (name, UnboundField对象(1,simple.StringField,参数)), (pwd, UnboundField对象(2,simple.PasswordField,参数)), ],, \'_wtforms_meta\': Meta(DefaultMeta)类, } """
启发:
#sort排序 v1=[ (11,\'Martin11\',18), (121,\'Martin121\',19), (311,\'Martin311\',25), (311, \'Martin311\', 26) #按元素1排序,如果元素1相同按照 元素3排序 ] v1.sort(key=lambda x:(x[0],x[2])) #列表的sort方法,根据 列表中的元组元素 进行排序 print(v1) \'\'\' [(11, \'Martin11\', 18), (121, \'Martin121\', 19), (311, \'Martin311\', 25), (311, \'Martin311\', 26)] \'\'\'
class F1(object): pass class F2(object): pass class F3(F1): pass class F4(F2,F3): pass print(F4.__mro__) #打印F4 的继承关系 \'\'\' ( <class \'__main__.F4\'>, <class \'__main__.F2\'>, <class \'__main__.F3\'>, <class \'__main__.F1\'>, <class \'object\'>) \'\'\'
2.2.执行LoginForm的__new__方法
没有__new__方法 pass
2.3.执行LoginForm的__init__方法实例化form对象
def __init__(self, formdata=None, obj=None, prefix=\'\', data=None, meta=None, **kwargs): # 实例化LoginForm中封装的 Meta类进行实例化,以后用于生成CSRF Tocken 标签 meta_obj = self._wtforms_meta() #meta是 form = LoginForm(meta={\'csrf\':\'true\'})传过来的参数,封装到meta_obj中 if meta is not None and isinstance(meta, dict): meta_obj.update_values(meta) #执行父类的构造方法,参数 # self._unbound_fields \'\'\' \'_unbound_fields\'=[ (name, UnboundField对象(1,simple.StringField,参数)), (pwd, UnboundField对象(2,simple.PasswordField,参数)), ], \'\'\' # meta_ob=Meta(DefaultMeta)对象 super(Form, self).__init__(self._unbound_fields, meta=meta_obj, prefix=prefix) #给 form对象 中的_fields字段赋值如下; \'\'\' _fields: { name: StringField对象(), pwd: PasswordField对象(), } name: StringField对象(widget=widgets.TextInput()), pwd: PasswordField对象(widget=widgets.PasswordInput()) \'\'\' #循环form对象 中的_fields字段(字典),给form对象赋值 form.name/form.pwd for name, field in iteritems(self._fields): setattr(self, name, field) \'\'\' _fields: { name: StringField对象(), pwd: PasswordField对象(), } name: StringField对象(widget=widgets.TextInput()), pwd: PasswordField对象(widget=widgets.PasswordInput()) \'\'\' self.process(formdata, obj, data=data, **kwargs)
执行Form父类BaseForm.__init__方法,把UnboundField对象转换成StringField对象,并赋值到form对象的_fields:{}字典中;
class BaseForm(object): def __init__(self, fields, prefix=\'\', meta=DefaultMeta()): \'\'\' 参数 fields=[ (name, UnboundField对象(1,simple.StringField,参数)), (pwd, UnboundField对象(2,simple.PasswordField,参数)), ], meta=Meta(DefaultMeta)对象 \'\'\' if prefix and prefix[-1] not in \'-_;:/.\': prefix += \'-\' self.meta = meta self._prefix = prefix self._errors = None self._fields = OrderedDict() if hasattr(fields, \'items\'): fields = fields.items() translations = self._get_translations() extra_fields = [] #------------------------------------------------------ if meta.csrf: #生成 CSRF tocken隐藏标签 self._csrf = meta.build_csrf(self) extra_fields.extend(self._csrf.setup_form(self)) for name, unbound_field in itertools.chain(fields, extra_fields):# #(name, UnboundField对象(1,simple.StringField,参数)) #(pwd, UnboundField对象(2,simple.PasswordField,参数)) options = dict(name=name, prefix=prefix, translations=translations) #(name, UnboundField对象(1,simple.StringField,参数)) #真正实例化 simple.StringField(参数) field = meta.bind_field(self, unbound_field, options) #UnboundField对象转换成StringField对象 self._fields[name] = field
form = {
_fields: {
name: StringField对象(),
pwd: PasswordField对象(),
}
循环form对象 中的_fields字段(字典),分别赋值到form对象,这样就可以通过form.name/form.pwd直接获取到Field对象了
,无需form._fields[\'name\'] / form._fields[\'name\']
代码:
for name, field in iteritems(self._fields): setattr(self, name, field)
form对象封装数据就变成以下内容喽
form = { _fields: { name: StringField对象(), pwd: PasswordField对象(), } name: StringField对象(widget=widgets.TextInput()), pwd: PasswordField对象(widget=widgets.PasswordInput()) }
3. 当form对象生成之后 print(form.name) = 执行StringField对象的__str__方法;
StringField类中没有__str__方法,所以去执行基类Field的,Field.__str__方法返回了: self() = StringFieldObj.__call__()
def __str__(self): return self() #执行LoginForm的__call__方法
StringField没有__call__所以执行其基类Field.__call__方法,调用了self.meta.render_field(self, kwargs)
def __call__(self, **kwargs): # self=StringField对象 return self.meta.render_field(self, kwargs) #把StringField对象传传入meta.render_field方法
下面来看self.meta.render_field(self, kwargs)做了什么?
def render_field(self, field, render_kw): other_kw = getattr(field, \'render_kw\', None) if other_kw is not None: render_kw = dict(other_kw, **render_kw) # StringField对象.widget(field, **render_kw) #插件.__call__() \'\'\' #field =StringField对象 StringField对象.widget对象()=调用widget对象的.__call__方法 \'\'\' return field.widget(field, **render_kw)
来看widget对象=TextInput()的__call__方法,最终打印了obj.name的结果
def __call__(self, field, **kwargs): kwargs.setdefault(\'id\', field.id) kwargs.setdefault(\'type\', self.input_type) if \'value\' not in kwargs: kwargs[\'value\'] = field._value() if \'required\' not in kwargs and \'required\' in getattr(field, \'flags\', []): kwargs[\'required\'] = True return HTMLString(\'<input %s>\' % self.html_params(name=field.name, **kwargs))
""" 0. Form.__iter__: 返回所有字段对象 1. StringField对象.__str__ 2. StringField对象.__call__ 3. meta.render_field(StringField对象,) 4. StringField对象.widget(field, **render_kw) 5. 插件.__call__() """
4.执行for iteam in form对象的执行流程
执行form对象基类BaseForm的__inter__方法,变量self._fields字典中的内容
def __iter__(self): """Iterate form fields in creation order.""" return iter(itervalues(self._fields))
_fields: {
name: StringField对象(),
pwd: PasswordField对象(),
}
用户输入数据的校验验证流程form = LoginForm(formdata=request.form)
# 请求发过来的值 form = LoginForm(formdata=request.form) # 值.getlist(\'name\') # 实例:编辑 # # 从数据库对象 # form = LoginForm(obj=\'值\') # 值.name/值.pwd # # # 字典 {} # form = LoginForm(data=request.form) # 值[\'name\'] # 1. 循环所有的字段 # 2. 获取每个字段的钩子函数 # 3. 为每个字段执行他的验证流程 字段.validate(钩子函数+内置验证规则)
六、session功能
1. Flask自带的session功能
from flask import session import json app=Flask(__name__,template_folder=\'templates\',static_path=\'/static/\',static_url_path=\'/static/\') app.debug=True app.secret_key=\'sjehfjeefrjewth43u\' #设置session加密 app.config[\'JSON_AS_ASCII\']=False #指定json编码格式 如果为False 就不使用ascii编码, app.config[\'JSONIFY_MIMETYPE\'] ="application/json;charset=utf-8" #指定浏览器渲染的文件类型,和解码格式; @app.route(\'/login/\',methods=[\'GET\',\'POST\']) def login(): msg = \'\' if request.method==\'POST\': name=request.values.get(\'user\') pwd=request.values.get(\'pwd\') if name ==\'zhanggen\' and pwd==\'123.com\': session[\'user\']=name #设置session的key value return redirect(\'/index/\') else: msg=\'用户名或者密码错误\' return render_template(\'login.html\',msg=msg) @app.route(\'/index/\',methods=[\'GET\',\'POST\']) def index(): user_list = [\'张根\', \'egon\', \'eric\'] user=session.get(\'user\') #获取session if user: user=[\'alex\',\'egon\',\'eric\'] return jsonify(user_list) else: return redirect(\'/login/\') if __name__ == \'__main__\': app.run()
2.第三方session组件(Session)
安装 pip install flask-session
from flask import session, Flask,request,make_response,render_template,redirect,jsonify,Response from flask.ext.session import Session #引入第三方session import json app=Flask(__name__,template_folder=\'templates\',static_path=\'/static/\',static_url_path=\'/static/\') app.debug=True app.secret_key=\'sjehfjeefrjewth43u\' #设置session加密 app.config[\'JSON_AS_ASCII\']=False #指定json编码格式 如果为False 就不使用ascii编码, app.config[\'JSONIFY_MIMETYPE\'] ="application/json;charset=utf-8" #指定浏览器渲染的文件类型,和解码格式; app.config[\'SESSION_TYPE\']=\'redis\' from redis import Redis #引入连接 redis模块 app.config[\'SESSION_REDIS\']=Redis(host=\'192.168.0.94\',port=6379) #连接redis Session(app) @app.route(\'/login/\',methods=[\'GET\',\'POST\']) def login(): msg = \'\' if request.method==\'POST\': name=request.values.get(\'user\') pwd=request.values.get(\'pwd\') if name ==\'zhanggen\' and pwd==\'123.com\': session[\'user\']=name #设置session的key value return redirect(\'/index/\') else: msg=\'用户名或者密码错误\' return render_template(\'login.html\',msg=msg) @app.route(\'/index/\',methods=[\'GET\',\'POST\']) def index(): user_list = [\'张根\', \'egon\', \'eric\'] user=session.get(\'user\') #获取session if user: user=[\'alex\',\'egon\',\'eric\'] return jsonify(user_list) else: return redirect(\'/login/\') if __name__ == \'__main__\': app.run()
不仅可以把session存放到redis还可放到文件、内存、memcache...
def _get_interface(self, app): config = app.config.copy() config.setdefault(\'SESSION_TYPE\', \'null\') config.setdefault(\'SESSION_PERMANENT\', True) config.setdefault(\'SESSION_USE_SIGNER\', False) config.setdefault(\'SESSION_KEY_PREFIX\', \'session:\') config.setdefault(\'SESSION_REDIS\', None) config.setdefault(\'SESSION_MEMCACHED\', None) config.setdefault(\'SESSION_FILE_DIR\', os.path.join(os.getcwd(), \'flask_session\')) config.setdefault(\'SESSION_FILE_THRESHOLD\', 500) config.setdefault(\'SESSION_FILE_MODE\', 384) config.setdefault(\'SESSION_MONGODB\', None) config.setdefault(\'SESSION_MONGODB_DB\', \'flask_session\') config.setdefault(\'SESSION_MONGODB_COLLECT\', \'sessions\') config.setdefault(\'SESSION_SQLALCHEMY\', None) config.setdefault(\'SESSION_SQLALCHEMY_TABLE\', \'sessions\') if config[\'SESSION_TYPE\'] == \'redis\': session_interface = RedisSessionInterface( config[\'SESSION_REDIS\'], config[\'SESSION_KEY_PREFIX\'], config[\'SESSION_USE_SIGNER\'], config[\'SESSION_PERMANENT\']) elif config[\'SESSION_TYPE\'] == \'memcached\': session_interface = MemcachedSessionInterface( config[\'SESSION_MEMCACHED\'], config[\'SESSION_KEY_PREFIX\'], config[\'SESSION_USE_SIGNER\'], config[\'SESSION_PERMANENT\']) elif config[\'SESSION_TYPE\'] == \'filesystem\': session_interface = FileSystemSessionInterface( config[\'SESSION_FILE_DIR\'], config[\'SESSION_FILE_THRESHOLD\'], config[\'SESSION_FILE_MODE\'], config[\'SESSION_KEY_PREFIX\'], config[\'SESSION_USE_SIGNER\'], config[\'SESSION_PERMANENT\']) elif config[\'SESSION_TYPE\'] == \'mongodb\': session_interface = MongoDBSessionInterface( config[\'SESSION_MONGODB\'], config[\'SESSION_MONGODB_DB\'], config[\'SESSION_MONGODB_COLLECT\'], config[\'SESSION_KEY_PREFIX\'], config[\'SESSION_USE_SIGNER\'], config[\'SESSION_PERMANENT\']) elif config[\'SESSION_TYPE\'] == \'sqlalchemy\': session_interface = SqlAlchemySessionInterface( app, config[\'SESSION_SQLALCHEMY\'], config[\'SESSION_SQLALCHEMY_TABLE\'], config[\'SESSION_KEY_PREFIX\'], config[\'SESSION_USE_SIGNER\'], config[\'SESSION_PERMANENT\']) else: session_interface = NullSessionInterface() return session_interface
3.自定义session组件
#!/usr/bin/env python # -*- coding:utf-8 -*- import uuid import json from flask.sessions import SessionInterface from flask.sessions import SessionMixin from itsdangerous import Signer, BadSignature, want_bytes class MySession(dict, SessionMixin): def __init__(self, initial=None, sid=None): self.sid = sid self.initial = initial super(MySession, self).__init__(initial or ()) def __setitem__(self, key, value): super(MySession, self).__setitem__(key, value) def __getitem__(self, item): return super(MySession, self).__getitem__(item) def __delitem__(self, key): super(MySession, self).__delitem__(key) class MySessionInterface(SessionInterface): session_class = MySession container = {} def __init__(self): import redis self.redis = redis.Redis() def _generate_sid(self): return str(uuid.uuid4()) def _get_signer(self, app): if not app.secret_key: return None return Signer(app.secret_key, salt=\'flask-session\', key_derivation=\'hmac\') def open_session(self, app, request): """ 程序刚启动时执行,需要返回一个session对象 """ sid = request.cookies.get(app.session_cookie_name) if not sid: sid = self._generate_sid() return self.session_class(sid=sid) signer = self._get_signer(app) try: sid_as_bytes = signer.unsign(sid) sid = sid_as_bytes.decode() except BadSignature: sid = self._generate_sid() return self.session_class(sid=sid) # session保存在redis中 # val = self.redis.get(sid) # session保存在内存中 val = self.container.get(sid) if val is not None: try: data = json.loads(val) return self.session_class(data, sid=sid) except: return self.session_class(sid=sid) return self.session_class(sid=sid) def save_session(self, app, session, response): """ 程序结束前执行,可以保存session中所有的值 如: 保存到resit 写入到用户cookie """ domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) httponly = self.get_cookie_httponly(app) secure = self.get_cookie_secure(app) expires = self.get_expiration_time(app, session) val = json.dumps(dict(session)) # session保存在redis中 # self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime) # session保存在内存中 self.container.setdefault(session.sid, val) session_id = self._get_signer(app).sign(want_bytes(session.sid)) response.set_cookie(app.session_cookie_name, session_id, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure)
from flask import Flask from flask import session from my_session import MySessionInterface app = Flask(__name__) app.secret_key = \'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT\' app.session_interface = MySessionInterface() @app.route(\'/login/\', methods=[\'GET\', "POST"]) def login(): print(session) session[\'user1\'] = \'alex\' session[\'user2\'] = \'alex\' del session[\'user2\'] return "内容" if __name__ == \'__main__\': app.run()
七、蓝图
from flask import Blueprint
Flask的Blueprint功能称为蓝图,它的功能就像Django中的include或者Gin的路由分组,旨在为web框架的路由进行合理拆分。
from flask import Blueprint #把名称为test_zhanggen的蓝图register到app中 test_zhanggen= Blueprint(\'oss_api_zhanggen\',__name__) @test_zhanggen.route(\'/index/\', methods=[\'GET\', \'POST\']) def index(): """功能项辅助审核""" return "测试/张根/Index" @test_zhanggen.route(\'/home/\', methods=[\'GET\', \'POST\']) def home(): """功能项辅助审核""" return "测试/张根/Home"
在app注册已经生成的蓝图
#把api_zhanggen文件中test_zhanggen蓝图注册到flask appp app.register_blueprint(blueprint=api_zhanggen.test_zhanggen, url_prefix=\'/test/zhanggen\')
八、message (闪现)
message是一个基于Session实现的用于保存数据的集合,其特点是:一次性。
特点:和labada匿名函数一样不长期占用内存
from flask import Flask,request,flash,get_flashed_messages app = Flask(__name__) app.secret_key = \'some_secret\' @app.route(\'/set/\') def index2(): flash(\'Disposable\') #在message中设置1个个值 return \'ok\' #--------------------------------------------------------------------------------- @app.route(\'/\') def index1(): messages = get_flashed_messages() #获取message中设置的值,只能获取1次。(1次性) print(messages) return "Index1" if __name__ == "__main__": app.run()
九、中间件
flask也有中间件功能和Django类似,不同的是使用的是使用3个装饰器来实现的;
1.@app.before_first_request :请求第1次到来执行1次,之后都不执行;
2.@app.before_request:请求到达视图之前执行;(改函数不能有返回值,否则直接在当前返回)
3.@app.after_request:请求 经过视图之后执行;(最下面的先执行)
#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask, Request, render_template app = Flask(__name__, template_folder=\'templates\') app.debug = True @app.before_first_request #第1个请求到来执行 def before_first_request1(): print(\'before_first_request1\') @app.before_request #中间件2 def before_request1(): Request.nnn = 123 print(\'before_request1\') #不能有返回值,一旦有返回值在当前返回 @app.before_request def before_request2(): print(\'before_request2\') @app.errorhandler(404) def page_not_found(error): return \'This page does not exist\', 404 @app.route(\'/\') def hello_world(): return "Hello World" @app.after_request #中间件 执行视图之后 def after_request1(response): print(\'after_request1\', response) return response @app.after_request #中间件 执行视图之后 先执行 after_request2 def after_request2(response): print(\'after_request2\', response) return response if __name__ == \'__main__\': app.run()
十、Flask相关组件
2、flask-script组件
flask-script组件:用于通过脚本的形式,启动 flask;(实现类似Django的python manager.py runserver 0.0.0.0:8001)
pip install flask-script #安装
#!/usr/bin/env python # -*- coding:utf-8 -*- from sansa import create_app from flask_script import Manager #导入 app = create_app() manager=Manager(app) #实例化Manager对象 if __name__ == \'__main__\': manager.run()
python run.py runserver -h 0.0.0.0 -p 8001
* Running on http://0.0.0.0:8001/ (Press CTRL+C to quit)
3.flask-migrate组件
在线修改、迁移数据库(Django的 migrate 。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sansa import create_app,db from flask_script import Manager #导入 from flask_migrate import Migrate,MigrateCommand app = create_app() manager=Manager(app) #实例化Manager对象 migrate=Migrate(app,db) manager.add_command(\'db\',MigrateCommand) #注册命令 if __name__ == \'__main__\': manager.run()
pip install flask-migrate #安装
3.1.初始化数据库:python run.py db init
3.2.迁移数据: python run.py db migrate
3.3.生成表: python run.py db upgrade
ps:修改表结构 first 直接注释静态字段代码,second 执行 python run.py db upgrade.
D:\Flask练习\sansa>python run.py db init Creating directory D:\Flask练习\sansa\migrations ... done Creating directory D:\Flask练习\sansa\migrations\versions ... done Generating D:\Flask练习\sansa\migrations\alembic.ini ... done Generating D:\Flask练习\sansa\migrations\env.py ... done Generating D:\Flask练习\sansa\migrations\README ... done Generating D:\Flask练习\sansa\migrations\script.py.mako ... done Please edit configuration/connection/logging settings in \'D:\\Flask练习\\sansa\\migrations\\alembic.ini\' before proceeding. D:\Flask练习\sansa>python run.py db migrate INFO [alembic.runtime.migration] Context impl MySQLImpl. INFO [alembic.runtime.migration] Will assume non-transactional DDL. INFO [alembic.autogenerate.compare] Detected added table \'users666\' Generating D:\Flask练习\sansa\migrations\versions\a7f412a8146f_.py ... done D:\Flask练习\sansa>python run.py db upgrade INFO [alembic.runtime.migration] Context impl MySQLImpl. INFO [alembic.runtime.migration] Will assume non-transactional DDL. INFO [alembic.runtime.migration] Running upgrade -> a7f412a8146f, empty message D:\Flask练习\sansa>
文档: http://docs.jinkan.org/docs/flask/quickstart.html
银角大王:http://www.cnblogs.com/wupeiqi/articles/7552008.html
银角大王:http://www.cnblogs.com/wupeiqi/articles/5713330.html(执行原生SQL模块 pymsql ,ORM框架 SQLAchemy)