Source code for aiter.gated_aiter
import asyncio
from .azip import azip
from .active_aiter import active_aiter
from .map_filter_aiter import map_filter_aiter
from .push_aiter import push_aiter
[docs]class gated_aiter:
"""
Returns an aiter that you can "push" integer values into.
When a number is pushed, that many items are allowed out through the gate.
This is kind of like a discrete version of an electronic transistor.
:type aiter: aiter
:param aiter: an async iterator
:return: an async iterator yielding the same values as the original aiter
:rtype: :class:`aiter.gated_aiter <gated_aiter>`
"""
def __init__(self, aiter):
self._gate = push_aiter()
self._open_aiter = active_aiter(azip(aiter, map_filter_aiter(range, self._gate))).__aiter__()
self._semaphore = asyncio.Semaphore()
def __aiter__(self):
return self
async def __anext__(self):
async with self._semaphore:
return (await self._open_aiter.__anext__())[0]
[docs] def push(self, count):
"""
Note that several additional items are allowed through the gated_aiter.
:type count: int
:param count: the number of items that can be allowed out the aiter. These are cumulative.
"""
if not self._gate.is_stopped():
self._gate.push(count)
[docs] def stop(self):
"""
After the previously authorized items (from `push`) are pulled out the aiter, the aiter will exit.
"""
self._gate.stop()