Source code for throttle

import asyncio
import inspect
import math
from functools import wraps
from time import perf_counter, sleep


[docs]class Throttle: """ This class offers synchronous and asynchronous mechanisms to accurately rate-limit the execution of some iterative code. It can be used explicitly to wait between each time interval or with generator functions to iterate through intervals. Example: Video recorder at 24 fps. Each iteration in the loop starts precisely every 1/24 seconds (if the iterations don't last longer). >>> rate = 24 # fps >>> throttle = Throttle(interval=(1 / rate)) >>> iters = 0 >>> t_start = perf_counter() >>> for i in throttle.loop(24): ... # Take, process and save image ... iters += 1 >>> total_time = round(perf_counter() - t_start, 2) >>> print('iters: {}, total_time: {}'.format(iters, total_time)) iters: 24, total_time: 1.0 """
[docs] def __init__(self, interval): """ Returns a :class:`Throttle` instance. Time reference will be set the first time a timing function is called. :param interval: Interval value for timing functions, in seconds. """ self.interval = interval self.t_start = None self.ticks = 0 self.restart()
def _check(self): """ Get the interval value for timing functions and initialize the time reference if not set yet. """ if self.t_start is None: self.restart() self.t_start = perf_counter()
[docs] def restart(self): """ Reset the instance deleting its time reference, which will be set the next time a timing function is called. """ self.t_start = None self.ticks = 0
[docs] def elapsed(self, auto_reset=True, exact=True): """ Checks if the interval has elapsed. :param auto_reset: If True, the time reference will be updated if the the interval has elapsed. :param exact: If True, the time reference will be increased with the interval value, which will result in deterministic and accurate intervals. If False, the time reference will be updated to the current time. `exact` has no effect if `auto_reset` is False. :return: True if the interval has elapsed, otherwise False. """ self._check() ret = (perf_counter() - self.t_start >= self.interval) if ret and auto_reset: self.ticks += 1 if exact: self.t_start += self.interval else: self.t_start = perf_counter() return ret
[docs] def wait_next(self): """ Blocks until the end of the current interval. Note that this function can return immediately if the next interval to wait has already elapsed. This happens when reusing a :class:`Throttle` instance without calling :func:`restart` first. """ self._check() t_target = (self.t_start + self.interval) self.t_start += self.interval sleep_time = t_target - perf_counter() if sleep_time > 0: sleep(sleep_time) self.ticks += 1
[docs] async def await_next(self): """ Waits asynchronously until the end of the current interval. Note that this function can return immediately if the next interval to wait has already elapsed. This happens when reusing a :class:`Throttle` instance without calling :func:`restart` first. """ self._check() t_target = (self.t_start + self.interval) self.t_start += self.interval await asyncio.sleep(t_target - perf_counter()) self.ticks += 1
[docs] def loop(self, max_ticks=None, duration=None): """ Returns a synchronous generator yielding every time an interval has elapsed. You can specify a maximum number of iterations or a maximum duration for the generator, but not both. If none is set, the generator will run until you call break, return or raise an exception. :param max_ticks: Maximum number of intervals the generator will wait for. :param duration: Seconds of loop duration. The generator will loop for ``ceil(duration / self.interval)`` iterations, so the elapsed time may be longer than `duration`, but never shorter. :return: Yields the number of intervals elapsed since the function was called. """ max_ticks = self._check_loop_params(duration, max_ticks) ticks = 0 while max_ticks is None or ticks < max_ticks: self.wait_next() yield ticks ticks += 1
[docs] async def aloop(self, max_ticks=None, duration=None): """ Returns an asynchronous generator yielding every time an interval has elapsed. You can specify a maximum number of iterations or a maximum duration for the generator, but not both. If none is set, the generator will run until you call break, return or raise an exception. :param max_ticks: Maximum number of intervals the generator will wait for. :param duration: Seconds of loop duration. The generator will loop for ``ceil(duration / self.interval)`` iterations, so the elapsed time may be longer than `duration`, but never shorter. :return: Yields the number of intervals elapsed since the function was called. """ max_ticks = self._check_loop_params(duration, max_ticks) ticks = 0 while max_ticks is None or ticks < max_ticks: await self.await_next() yield ticks ticks += 1
def _check_loop_params(self, duration, max_ticks): """ Helper function for :func:`loop` and :func:`aloop`. """ if max_ticks is not None and duration is not None: raise ValueError("max_ticks and duration cannot be set at" "the same time") self._check() if duration is not None: max_ticks = math.ceil(duration / self.interval) return max_ticks
[docs]def throttle(limit=1, interval=1.0, wait=False, on_fail=None): """ Decorator to limit the number of calls to a synchronous function in an interval of time. It ensures that the decorated function is not called more than `limit` times in the same time interval. If limit is reached, it can return a custom result or sleep until the next time interval. Do not use this function to decorate an asynchronous function (use :func:`~throttle.athrottle` instead). :param limit: Maximum number of calls allowed to the decorated function in each time interval. :param interval: Seconds of every time period. :param wait: If True, it will block when limit is reached until the next interval, and then it will call the function. :param on_fail: Value, object or function to return if the call limit is reached. If `on_fail` is a function, the decorator will return the result of the call to this function. Note that `on_fail` only makes sense if `wait` is False. :return: Result of the call to the decorated function, or `on_fail` if limit is reached (and `wait` is False). """ call_counter = 0 throttle_ = Throttle(interval) def decorator(func): @wraps(func) def wrapper(*args, **kwargs): nonlocal call_counter call_counter += 1 if throttle_.elapsed(): call_counter = 1 elif call_counter > limit: if wait: throttle_.wait_next() call_counter = 1 else: return on_fail() if callable(on_fail) else on_fail return func(*args, **kwargs) return wrapper return decorator
[docs]def athrottle(limit=1, interval=1.0, wait=False, on_fail=None): """ Decorator to limit the number of calls to a synchronous or asynchronous function in an interval of time. It ensures that the decorated function is not called more than `limit` times in the same time interval. If limit is reached, it can return a custom result or sleep until the next time interval. Do not use this function to decorate a synchronous function (use :func:`~throttle.throttle` instead). :param limit: Maximum number of calls allowed to the decorated function in each time interval. :param interval: Seconds of every time period. :param wait: If True, it will asynchronously wait when limit is reached until the next interval, and then it will call the function. :param on_fail: Value, object or function to return if the call limit is reached. If `on_fail` is a function, the decorator will return the result of the call to this function (if this function is asynchronous, it will await for its result). Note that `on_fail` only makes sense if `wait` is False. :return: Result of the call to the decorated function, or `on_fail` if limit is reached (and `wait` is False). """ call_counter = 0 throttle_ = Throttle(interval) def decorator(func): @wraps(func) async def awrapper(*args, **kwargs): nonlocal call_counter call_counter += 1 if throttle_.elapsed(): call_counter = 1 elif call_counter > limit: if wait: await throttle_.await_next() call_counter = 1 else: if inspect.iscoroutinefunction(on_fail): return await on_fail() elif inspect.isfunction(on_fail): return on_fail() else: return on_fail if inspect.iscoroutinefunction(func): return await func(*args, **kwargs) else: return func(*args, **kwargs) return awrapper return decorator