【问题标题】:Token Bucket Algorithm based async semaphore基于令牌桶算法的异步信号量
【发布时间】:2018-08-12 18:14:03
【问题描述】:

我想实现一个异步token bucket algorithm,它管理何时运行任务(协同例程)。简而言之,这将控制在任何给定时间跨度内运行的协同程序的数量。

我已经尝试过leaky bucketsemaphores,但它们都不是真正的令牌桶算法。如果有能力这样做,目标是耗尽存储桶(即运行协同例程),否则在运行下一个例程之前等待足够长的时间。

所以我决定做的是重写semaphore 类来控制我在任何时间段内运行多少协程,如下所示:

TokenSemaphore.py

​​>
import datetime
from asyncio.locks import Semaphore
import asyncio
import collections
from math import floor


class TokenSemaphore(Semaphore):
    """A Semaphore implementation.
    A semaphore manages an internal counter which is decremented by each
    acquire() call and incremented by each release() call. The counter
    can never go below zero; when acquire() finds that it is zero, it blocks,
    waiting until some other thread calls release().
    Semaphores also support the context management protocol.
    The optional argument gives the initial value for the internal
    counter; it defaults to 1. If the value given is less than 0,
    ValueError is raised.
    """    
    def __init__(self, capacity=1, rate=1, loop=None):
        if capacity < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._capacity = capacity
        self._rate = rate
        self._time_table = collections.deque(maxlen=self._capacity)
        super().__init__(value=capacity, loop=loop) 

    @property
    def capacity(self):
        return self._capacity

    def _wake_up_next(self):
        while self._waiters:
            waiter = self._waiters.popleft()
            if not waiter.done():
                waiter.set_result(None)
                return

    def has_capacity(self):
        if len(self._time_table) < self._capacity:
            self._time_table.append(datetime.datetime.now())
            return True
        tf = datetime.datetime.now()
        delta = (tf - self._time_table[0]).total_seconds()
        if delta < self._rate:
            return False
        else:
            self._time_table.append(tf)
            return True    
    def locked(self):
        """Returns True if semaphore can not be acquired immediately."""
        return self._capacity == 0    

    async def acquire(self):
        """Acquire a semaphore.
        If the internal counter is larger than zero on entry,
        decrement it by one and return True immediately.  If it is
        zero on entry, block, waiting until some other coroutine has
        called release() to make it larger than 0, and then return
        True.
        """
        while not self.has_capacity():
            fut = self._loop.create_future()
            self._waiters.append(fut)
            try:
                await fut
            except:
                # See the similar code in Queue.get.
                fut.cancel()
                if self._capacity > 0 and not fut.cancelled():
                    self._wake_up_next()
                raise
        self._capacity -= 1
        return True    

    async def release(self):
        """Release a semaphore, incrementing the internal counter by one.
        When it was zero on entry and another coroutine is waiting for it to
        become larger than zero again, wake up that coroutine.
        """
        tf = datetime.datetime.now()
        delta = (tf - self._time_table[-1]).total_seconds()
        result = self._rate * floor(delta)
        sleep_time = 1.0/float(self._rate) - result if result < 1.0/float(self._rate) else 0
        await asyncio.sleep(sleep_time)
        tf = datetime.datetime.now()
        delta = (tf - self._time_table[-1]).total_seconds()
        self._capacity += result
        self._wake_up_next()

请注意我的release() async def,因为如果我的桶中没有足够的令牌,我想我需要在这里睡觉。 Semaphore 的发布不是async def。我有一种感觉,这就是我搞砸的地方,但我不确定。

为了测试我的实现,我写了这个:

运行.py

​​>
import asyncio
import aiohttp
import re
import datetime
from TokenSemaphore import TokenSemaphore
SITE = "https://example.com"

async def myWorker(semaphore):
    await semaphore.acquire()
    print("Successfully acquired the semaphore")
    async with aiohttp.ClientSession() as session:
        async with session.get(SITE, verify_ssl=False) as resp:
            print(resp.status, datetime.datetime.now() - ref, semaphore.capacity)
    print("Releasing Semaphore")
    await semaphore.release()   


async def main(loop):
    mySemaphore = TokenSemaphore(capacity=40, rate=2)
    # mySemaphore = asyncio.Semaphore(value=40)
    tasks = [myWorker(mySemaphore) for _ in range(44)]
    await asyncio.wait(tasks)
    print("Main Coroutine") 

ref = datetime.datetime.now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
print("All Workers Completed")
loop.close()

问题

所以看起来 TokenSemaphore 有效,但如果它们的容量足够,它不会耗尽存储桶。我的打印语句显示了存储桶的可用容量,它表明它有足够的容量(即可以运行更多任务)。我无法理解为什么我的令牌信号量没有运行更多的协同程序,而它们有足够的能力这样做。

$ python run.py 
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
200 0:00:00.177742 20
Releasing Semaphore
200 0:00:00.178944 20
Releasing Semaphore
200 0:00:00.184608 20
Releasing Semaphore
200 0:00:01.103417 20
Releasing Semaphore
200 0:00:01.105539 22
Releasing Semaphore
200 0:00:01.106280 22
Releasing Semaphore
200 0:00:01.106929 22
Releasing Semaphore
200 0:00:01.107701 22
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:01.110719 29
Releasing Semaphore
200 0:00:01.111228 29
Releasing Semaphore
200 0:00:01.111801 29
Releasing Semaphore
200 0:00:01.112366 29
Releasing Semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
200 0:00:01.116581 25
Releasing Semaphore
200 0:00:01.153321 25
Releasing Semaphore
200 0:00:01.155235 25
Releasing Semaphore
200 0:00:01.155791 25
Releasing Semaphore
200 0:00:01.156530 25
Releasing Semaphore
200 0:00:01.157258 25
Releasing Semaphore
200 0:00:01.221712 25
Releasing Semaphore
200 0:00:01.223267 25
Releasing Semaphore
200 0:00:01.223724 25
Releasing Semaphore
200 0:00:01.224246 25
Releasing Semaphore
200 0:00:01.224745 25
Releasing Semaphore
200 0:00:01.228829 25
Releasing Semaphore
200 0:00:04.326125 25
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:04.361430 30
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:04.910990 29
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:05.440614 28
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:05.974999 27
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:06.516174 26
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:07.051482 25
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:07.601656 24
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:08.147306 23
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:08.682823 22
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:09.216370 21
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:09.752510 20
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:10.302981 19
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:10.843989 18
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:11.384492 17
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:11.939925 16
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:12.485116 15
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:13.016098 14
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:13.554884 13
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:14.096828 12
Releasing Semaphore
Main Coroutine
All Workers Completed

【问题讨论】:

  • 好的,我在self._capacity 中看到一个错误。我减少了acquire() 的容量,这会影响has_capacity()。所以我可以解决这个问题,它会将我的能力耗尽到零!但这还没有完成。前 40 个任务运行良好。任何未完成的任务都有错误Task was destroyed but it is pending!

标签: python python-3.x async-await semaphore


【解决方案1】:

3 个问题:

  1. 我们希望_time_table 达到_capacity 的大小,但在acquire 中它会减小。最好将更改从has_capacity 移到_time_table
  2. release 中,result 的计算结果为 0,因此协程唤醒后容量不会增加。只需将容量增加 1。
  3. 一般来说,您可能希望睡在acquire 而不是release,这样您就不会在执行结束时无缘无故地等待。

看看这个,看看它是否有帮助:

class TokenSemaphore(Semaphore):
    """A Semaphore implementation.
    A semaphore manages an internal counter which is decremented by each
    acquire() call and incremented by each release() call. The counter
    can never go below zero; when acquire() finds that it is zero, it blocks,
    waiting until some other thread calls release().
    Semaphores also support the context management protocol.
    The optional argument gives the initial value for the internal
    counter; it defaults to 1. If the value given is less than 0,
    ValueError is raised.
    """
    def __init__(self, capacity=1, rate=1, loop=None):
        if capacity < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._capacity = capacity
        # Tracks of coroutines waiting on acquire.
        self._waiting = 0
        self._rate = rate
        self._time_table = collections.deque(maxlen=self._capacity)
        # Time of last token that was issued.
        self._last_token = None
        super().__init__(value=capacity, loop=loop)

    @property
    def capacity(self):
        return max(self._capacity - self._waiting, 0)

    def locked(self):
        """Returns True if semaphore can not be acquired immediately."""
        return self.capacity == 0

    def _get_sleep_time(self):
        now = datetime.datetime.now()
        token_freq = datetime.timedelta(seconds=(1.0/float(self._rate)))
        if self._last_token is None:
            delta = now - self._time_table[-1]
            sleep_time = token_freq - delta
            self._last_token = now + sleep_time
            return sleep_time.total_seconds()
        elif self._last_token < now:
            self._last_token += token_freq
            return 0
        else:
            self._last_token += token_freq
            return (self._last_token - now).total_seconds()

    async def acquire(self):
        """Acquire a semaphore.
        If the internal counter is larger than zero on entry,
        decrement it by one and return True immediately.  If it is
        zero on entry, block, waiting until some other coroutine has
        called release() to make it larger than 0, and then return
        True.
        """
        print(self._capacity)
        if self.locked():
            self._waiting += 1
            fut = self._loop.create_future()
            self._waiters.append(fut)
            sleep_time = self._get_sleep_time()
            # Schedule the execution.
            await asyncio.sleep(sleep_time)
            try:
                # Wait for the corresponding task that's already executing to
                # finish.
                await fut
            except:
                # See the similar code in Queue.get.
                fut.cancel()
                if self._capacity > 0 and not fut.cancelled():
                    self._wake_up_next()
                raise
            finally:
                self._waiting -= 1
        else:
            self._last_token = None
        self._capacity -= 1
        self._time_table.append(datetime.datetime.now())
        return True

    def _wake_up_next(self):
        while self._waiters:
            waiter = self._waiters.popleft()
            if not waiter.done():
                waiter.set_result(None)
                return

    async def release(self):
        """Release a semaphore, incrementing the internal counter by one.
        When it was zero on entry and another coroutine is waiting for it to
        become larger than zero again, wake up that coroutine.
        """
        self._capacity += 1
        self._wake_up_next()

【讨论】:

  • 这有帮助,但我现在认为我有一个问题,我可能需要锁定 self._capacity。我认为每个例程都增加了容量的价值。 200 0:00:00.201525 0 释放信号量 成功获取信号量 成功获取信号量 成功获取信号量 成功获取信号量 200 0:00:00.698236 28 释放信号量 200 0:00:00.702178 29 释放信号量 30100.70425 30100.70425释放信号量 200 0:00:00.707803 36 释放信号量主协程所有工作人员已完成
  • “锁定self._capacity”是什么意思?这里不需要同步来更新self._capacity,因为所有协程都在同一个线程中执行。追踪到 semaphore.capacity 的不同数字是由于其他协程在最后 4 项完成之间完成。
  • 示例中的额外协程提前运行是怎么回事。所以我的想法是self._capacity 得到+1,但现在还不应该。在示例中的 44 个任务中,前 40 个任务运行良好。第 41 个任务等待的时间足够长,但第 42-44 个任务等待的时间不够长。
  • @torrho,我明白 - 这不仅是速率限制,而且是平滑。我已经更新了答案以反映这一点,并修复了另一个问题(执行结束时不必要的睡眠)。
  • “不仅是速率限制,而且是平滑。”是的,天哪,这比我预期的要难得多。感谢您解释平滑问题。
猜你喜欢
  • 1970-01-01
  • 2011-10-08
  • 2021-05-31
  • 2017-11-02
  • 1970-01-01
  • 2010-11-11
  • 1970-01-01
  • 1970-01-01
  • 2023-03-25
相关资源
最近更新 更多