【问题标题】:How to avoid duplicate processing in python API server? [duplicate]如何避免python API服务器中的重复处理? [复制]
【发布时间】:2022-03-24 00:31:33
【问题描述】:

假设一个函数detect_primes 的调用成本很高,我想避免使用重复参数重复调用它。我该怎么办?

使用缓存没有帮助,因为该函数可以在不同的请求中同时调用。当两个请求都将缓存视为空值时,两个请求都将继续执行代价高昂的函数。

def detect_primes(nums: List[int]) -> Dict[int, bool]:
    """ detect whether a list of numbers are prime """
@app.route('/detect', methods=['GET'])
def search():
    args = request.args
    nums = list(map(int, args.get('nums', '').split(',')))
    return detect_primes(nums)

例如,如果一个用户用 13,14,15 请求,另一个用户用 15,16 请求。 答案是{"13": true, "14": false, "15": false}{"15": false, "16": false}

我想避免使用[13, 14, 15][15, 16] 调用detect_primes。理想情况下,两个请求都应该等待与[13, 14, 15, 16] 的调用(或两个调用[13, 14, 15][16]),并返回各自的结果。

web框架的选择对我来说并不重要,你可以假设它是flask或fastapi。

编辑:不确定问题是如何与Are global variables thread-safe in Flask? How do I share data between requests? 重复或在Are global variables thread-safe in Flask? How do I share data between requests? 中回答的。如上所述,不能使用缓存(无论是内存中的 python 缓存还是外部缓存或数据库)。我很高兴被答案证明是错误的。

【问题讨论】:

    标签: python flask queue python-asyncio fastapi


    【解决方案1】:

    根据FastAPI's documentation

    当你用普通的def声明一个路径操作函数而不是 async def,它在外部线程池中运行,然后等待, 而不是直接调用(因为它会阻塞服务器)。

    因此,当您使用def 而不是async def 时,服务器会同时处理请求。

    在您的情况下 - 并且由于您将其描述为 “理想情况下,两个请求都应该等待......” - 您可以使用 async def 声明 search 路由。异步路由在主线程上运行,服务器按顺序处理请求 - 只要在此类路由中没有对 I/O-bound 操作的await 调用(请查看this answer)。通过这种方式,您可以使用字典来缓存先前(已计算)的数字,并使用它在后续请求中快速查找数字。您还可以使用与this 类似的方法来限制字典的大小。下面给出示例。您可以通过 http://127.0.0.1:8000/docs 的 OpenAPI 或使用诸如 http://127.0.0.1:8000/?nums=13&nums=14&nums=15 之类的 URL 来测试以下内容。

    from fastapi import FastAPI, Query
    from typing import List, Dict
    
    app = FastAPI()
    d = {}
    
    def is_prime(n) -> bool:
        # check whether 'n' is prime or not
    
    def detect_primes(nums: List[int]) -> Dict[int, bool]:
        res = {}
        for n in nums:
            if n in d:
                res[n] = d.get(n)
                print(f'{n} found in dict')
            else:
                is_n_Prime = is_prime(n)
                res[n] = is_n_Prime
                d[n] = is_n_Prime
        return res
    
    @app.get("/detect")
    async def search(nums: List[int] = Query(...)):
        return detect_primes(nums)
    

    但是,如果您需要在 async def 路由中使用 await(这会导致请求同时处理),您可以使用例如 Semaphore 对象来控制对字典,如here 所述。但是,如果您计划同时让multiple workers 处于活动状态(每个工作人员都有自己的内存 - 因此,他们don't share the same memory),您应该考虑使用数据库存储或键值存储(缓存) ,例如Redis(看看答案herehere)。此外,您可能想尝试使用aioredlock,它允许“在工作人员(进程)之间创建分布式锁”,如here 所述。

    【讨论】:

    • 我想到了这个,如果你的 else 块中有 await 语句,它不起作用(未来的开发者可能会添加一个)。在await 处会发生上下文切换,因此两个请求都将进入 else 块,因为缓存对它们来说都是空的。
    【解决方案2】:

    但是,我建议至少对在调用 detect_primes 之前使用的值字典使用缓存,以获取每个输入数字的已计算值。到目前为止,对 dict 元素的访问速度不是很大。 尝试异步访问计算值的字典,可能使用 Redis。

    类似的东西

    shared_dict = {}
    async def search():
        args = request.args
        nums = list(map(int, args.get('nums', '').split(',')))
        computed_values = []
        to_compute_values = []
        async for num in nums: 
            if await is_in_dict(num):
                computed_values.update({num:True})
            else:
                to_compute_values.append(num)
        #join to dicts
        return detect_primes(to_compute_values) | computed_values
    

    【讨论】:

    • 我不明白这是如何工作的,computed_values (dict) 总是有值 True,然后返回值不正确,bool 标志应该是数字是否是素数。跨度>
    猜你喜欢
    • 2015-02-12
    • 2016-12-28
    • 1970-01-01
    • 2019-03-30
    • 1970-01-01
    • 2015-10-19
    • 2018-02-16
    • 1970-01-01
    • 2012-09-15
    相关资源
    最近更新 更多