AIOHTTP

用于asyncio和Python的异步HTTP客户端/服务器

 

主要特点:

  • 支持客户端和HTTP服务器。
  • 支持服务器WebSockets和 客户端WebSockets开箱即用,没有回调地狱。
  • Web服务器具有中间件, 信号和可插拔路由。

 

 入门

客户端:

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())  

服务端:

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

app = web.Application()
app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])

web.run_app(app)

  

 Web服务

 1、post、get接收参数数据

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
from aiohttp import web


class Application(object):
    '''
    aiohttp 接口
    '''

    def __init__(self):
        pass

    async def prepare_init(self):
        '''
        预加载
        :return:
        '''
        self.hearders_setting = [
            web.post('/', self.post),
            web.get('/', self.get),
        ]

    async def get(self,request):
        print(request.app['db'])
        arguments = request.query
        print(arguments)
        # < MultiDictProxy('model': 'aiohttp') >
        return web.Response(text='get')

    async def post(self,request):
        arguments = await request.post()
        print(arguments)
        # < MultiDictProxy('model': 'aiohttp') >
        # 获取ip地址
        print(request.transport.get_extra_info('peername'))
        # ('127.0.0.1', 53245)
        print(request.path)
        # /
        print(request.raw_path)
        return web.Response(text='post')

    async def app_factory(self):
        '''
        配置app
        :return:
        '''
        await self.prepare_init()
        app = web.Application()
        app.add_routes(self.hearders_setting)
        app['db'] = 'db'
        return app

    def run_forever(self):
        '''
        开启端口
        :return:
        '''
        web.run_app(self.app_factory())


Application().run_forever()
1 import requests
2 
3 
4 requests.post(url='http://127.0.0.1:8080/',data={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
5 requests.get(url='http://127.0.0.1:8080/',params={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
请求脚本

相关文章: