Source code for aiter.aiter_forker
from .deferred_coroutine import deferred_coroutine
class _aiter_fork:
"""
Implementation of an aiter fork. Traces through a linked list of aiter elements, waiting
when necessary.
"""
def __init__(self, next_awaitable, is_active=False):
self._next_awaitable = next_awaitable
self._is_active = is_active
def __aiter__(self):
return self
async def __anext__(self):
this_item, next_awaitable = await self._next_awaitable.wait(is_active=self._is_active)
self._next_awaitable = next_awaitable
return this_item
def fork(self, is_active=True):
"""
Create a new fork: either active, which uses the current task to await the next
item; or passive, which waits until an active fork awaits it.
"""
return _aiter_fork(self._next_awaitable, is_active=is_active)
[docs]def aiter_forker(aiter):
"""
If you have an aiter that you would like to fork (split into multiple
iterators, each of which produces the same elements), wrap it with this
function.
Returns a :class:`aiter._aiter_fork <_aiter_fork>` object that will yield
the same objects in the same order. This object supports
:py:func:`fork <aiter._aiter_fork.fork>`, which will let you create a
duplicate stream.
:type aiter: aiter
:param aiter: an async iterator
:return: a :class:`aiter._aiter_fork <_aiter_fork>`
:rtype: :class:`aiter._aiter_fork <_aiter_fork>`
"""
_open_aiter = aiter.__aiter__()
async def get_next():
next_item = await _open_aiter.__anext__()
return next_item, deferred_coroutine(get_next)
next_awaitable = deferred_coroutine(get_next)
return _aiter_fork(next_awaitable, is_active=True)