1. Gevent实例

import gevent
import requests
from gevent import monkey
# socket发送请求以后就会进入等待状态,gevent更改了这个机制
# socket.setblocking(False)  -->发送请求后就不会等待服务器响应
monkey.patch_all()  # 找到内置的socket并更改为gevent自己的东西
 
def fetch_async(method, url, req_kwargs):
    print(method, url, req_kwargs)
    response = requests.request(method=method, url=url, **req_kwargs)
    print(response.url, response.content)
 
# ##### 发送请求 #####
gevent.joinall([
    # 这里spawn是3个任务[实际是3个协程],每个任务都会执行fetch_async函数
    gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])

2. grequests实例

grequests实际上就是封装了gevent里面的方法,然后配合requests实现异步的IO
grequests = gevent + request
grequests.map() 内部实现
import grequests  # 实际上就是requests + gevent
request_list = [
    # 发送get请求
    grequests.get('https://www.baidu.com/', timeout=10.001),
    grequests.get('https://www.taobao.com/'),
    grequests.get('https://hao.360.cn/')
]
# ##### 执行并获取响应列表 #####
response_list = grequests.map(request_list)  # 实际上内部循环执行gevent内部的joinall()方法
print(response_list)
 
# ##### 执行并获取响应列表(处理异常) #####
# def exception_handler(request, exception):
# print(request,exception)
#     print("Request failed")
 
# response_list = grequests.map(request_list, exception_handler=exception_handler)
# print(response_list)

3. 项目中的应用

    def user_batch_import(self, token, file, data):
        v = self._user_role(token, {})
        if "errcode" in v.keys():
            return v

        if 'user[college_id]' not in v:
            return {"errcode": 2, "errmsg": str(v)}

        import xlrd
        show_url = Graphics.upload_image(file, data["context_type"], data["content_type"])
        data_path = Graphics.format_nginx_data_url(show_url)
        book = xlrd.open_workbook(data_path)
        names = book.sheet_names()

        res = None
        for name in names:
            sheet = book.sheet_by_name(name)
            nrows = sheet.nrows  #
            if nrows == 0:
                print(nrows)
                continue
            else:
                q = queue.Queue(maxsize=nrows)
                threading.Thread(args=(q,))

                first_line = sheet.row_values(0)
                if first_line != ['编号', '姓名', '邮箱', '密码', '电话', '地址', '角色'] and len(first_line) != 7:
                    return {"errcode": 400, "errmsg": u"不支持此格式顺序的输入;正确格式:['编号', '姓名', '邮箱', '密码', '电话', '地址', '角色']"}

                for temp in range(1, nrows):
                    rows = sheet.row_values(temp, start_colx=0, end_colx=None)

                    if rows[6] == "老师":
                        rows[6] = "TeacherEnrollment"
                    elif rows[6] == "学生":
                        rows[6] = "StudentEnrollment"
                    elif rows[6] == "助教":
                        rows[6] = "TaEnrollment"
                    elif rows[6] == "设计者":
                        rows[6] = "DesignerEnrollment"
                    elif rows[6] == "观察者":
                        rows[6] = "ObserverEnrollment"
                    else:
                        return {"errcode": 400, "errmsg": u"所写角色不存在,可选:【老师, 学生, 助教, 观察者, 设计者】"}

                    query = {"user[name]": rows[1], "pseudonym[unique_id]": rows[2], "pseudonym[password]": rows[3],
                             "user[tel]": rows[4], "user[company]": rows[5], "user[time_zone]": "Beijing",
                             "user[locale]": "zh-Hans", "user[terms_of_use]": True, "user[role_name]": rows[6],
                             "user[college_id]": v["user[college_id]"]}

                    q.put(query)

                res = self._user_batch_import(q)
        return {"errcode": 0, "errmsg": str(res)}

    def _user_batch_import(self, q):
        from app.api.apis import very_email, very_password, very_phone
        header = {"Authorization": "Bearer %s" % AccessToken}
        request_res = []
        while q.qsize() > 0:
            query = q.get()
            if very_email(query["pseudonym[unique_id]"]) and very_phone(query["user[tel]"], "tel") and very_password(
                    query["pseudonym[password]"], "password"):
                pass
            else:
                del query

            response = grequests.post(API_CANVAS_HOST + '/api/v1/accounts/{account_id}/users'.format(account_id=1),
                                      headers=header, data=query)
            request_res.append(response)

        res = grequests.map(request_res)
        return res

二.asyncio

1.应用

import asyncio
import requests
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
#loop = asyncio.new_event_loop()
x = []
def ccc():
    c = requests.get('https://baidu.com')
    x.append(c)
async def t():
    temp = await loop.run_in_executor(None, ccc)
    print(temp)


try:
    loop.run_until_complete(asyncio.gather(*[t() for i in range(3)]))
finally:
    loop.close()
print(x)

2.实例

    def user_batch_import(self, file, data):
        college_id = g.user_info["college_id"]
        res_account_id = self.get_table_data_first(College, dict(id=college_id))
        account_id = res_account_id["account_id"]
        import xlrd
        show_url = Graphics.upload_image(file, data["context_type"], data["content_type"])
        data_path = Graphics.format_nginx_data_url(show_url)
        book = xlrd.open_workbook(data_path)
        names = book.sheet_names()

        res = None
        for name in names:
            sheet = book.sheet_by_name(name)
            nrows = sheet.nrows  #
            if nrows == 0:
                print(nrows)
                continue
            else:
                q = queue.Queue(maxsize=nrows)
                threading.Thread(args=(q,))

                first_line = sheet.row_values(0)
                if first_line != ["*姓名", "*本人手机", "性别", "出生年月", "民族", "籍贯:省/市", "学校所在地区", "学校全称",
                                  "监护人姓名", "与学生的关系", "监护人联系电话"] and len(first_line) != 11:
                    return {"errcode": 400, "errmsg": u"不支持此格式顺序的输入;正确格式:['*姓名', '*本人手机', '性别', "
                                                      u"'出生年月', '民族', '籍贯:省/市', '学校所在地区', '学校全称', "
                                                      u"'监护人姓名', '与学生的关系', '监护人联系电话']"}

                for temp in range(1, nrows):
                    rows = sheet.row_values(temp, start_colx=0, end_colx=None)

                    # if rows[13] == "老师":
                    #     rows[13] = "TeacherEnrollment"
                    # elif rows[13] == "学生":
                    #     rows[13] = "StudentEnrollment"
                    # elif rows[13] == "助教":
                    #     rows[13] = "TaEnrollment"
                    # elif rows[13] == "设计者":
                    #     rows[13] = "DesignerEnrollment"
                    # elif rows[13] == "观察者":
                    #     rows[13] = "ObserverEnrollment"
                    # else:
                    #     return {"errcode": 400, "errmsg": u"所写角色不存在,可选:【老师, 学生, 助教, 观察者, 设计者】"}

                    if rows[2] == "":
                        rows[2] = 1
                    else:
                        rows[2] = 0

                    guardian_dict = [{
                        "guardian_name": rows[8], "relationship_with_student": rows[9],
                        "guardian_tel": str(rows[10]).split(".")[0]}]

                    query = {"user[name]": rows[0], "pseudonym[unique_id]": str(rows[1]).split(".")[0],
                             "user[tel]": str(rows[1]).split(".")[0], "user[gender]": rows[2],
                             "user[birthdate]": rows[3], "user[nation]": rows[4],
                             "user[province]": rows[5], "user[school_position]": rows[6], "user[school_name]": rows[7],
                             "pseudonym[password]": str(rows[1]).split(".")[0], "parent": json.dumps(guardian_dict),
                             "user[time_zone]": "Beijing", "user[locale]": "zh-Hans",
                             "user[terms_of_use]": True, "user[college_id]": college_id,
                             }

                    q.put(query)

                res = self._user_batch_import(q, account_id)
        # return {"errcode": 0, "errmsg": res}
        return res
View Code

相关文章: