1、基于长轮询、queue实现
app01.py
#基于queue实现在线实时投票 import uuid import queue from flask import Flask,render_template,request,session,redirect,jsonify app=Flask(__name__) app.secret_key="drgs" app.debug=True @app.before_request def check_login(): if request.path=="/login": return None user = session.get("user_info") if not user: return redirect("/login") USER_QUEUE={ } """ USER_QUEUE={ uid:[data] } """ @app.route("/login",methods=["GET","POST"]) def login(): if request.method=="GET": return render_template("login.html") else: user=request.form.get("user") pwd=request.form.get("pwd") if user=="cao" and pwd=="123": uid=str(uuid.uuid4()) #创建队列 USER_QUEUE[uid]=queue.Queue() session["user_info"]={"uid":uid,"name":user} return redirect("/index") else: return render_template("login.html",msg="用户名或密码错误") VOTE_USER_LIST={ "1":{"name":"曹超","count":0}, "2":{"name":"大仙","count":0}, } @app.route("/index") def index(): return render_template("index.html",vote_user_list=VOTE_USER_LIST) @app.route("/get_vote_count") def get_vote_count(): """ 根据用户uid,获取用户队列 :return: """ ret = {"status":True,"data":None} #用户唯一标识 uid=session["user_info"]["uid"] q=USER_QUEUE[uid] try: data=q.get(timeout=10) ret["data"]=data except queue.Empty as e: ret["status"]=False return jsonify(ret) @app.route("/vote",methods=["POST"]) def vote(): # 投票对象的id gid=request.form.get('gid') old=VOTE_USER_LIST[gid]["count"] new=old+1 VOTE_USER_LIST[gid]["count"]=new data={"gid":gid ,"count":new} for q in USER_QUEUE.values(): q.put(data) return "ok" if __name__=="__main__": app.run(host="0.0.0.0",threaded=True)