Source code for aiter.push_aiter
import asyncio
[docs]class push_aiter:
"""
An asynchronous iterator based on a linked-list.
Data goes in the head via "push".
Allows peeking to determine how many elements are ready.
This is functionally similar to an :class:`async.Queue <async.Queue>`
object. It creates an aiter that you can `push` items into.
Unlike a `Queue` object, you can also invoke :py:func:`stop <stop>`, which will
raise a `StopAsyncIteration` on the listener's side, allowing for a
clean exit.
You'd use this when you want to "turn around" execution, ie. have
a task that is occasionally invoked (like a hardware interrupt)
to produce a new event for an aiter.
"""
def __init__(self):
self._head = self._tail = asyncio.Future()
[docs] def push(self, *items):
"""
Accept one or more item and push them to the end of the
aiter's queue.
"""
if self._head.cancelled():
raise ValueError("%s closed" % self)
for item in items:
f = asyncio.Future()
self._head.set_result((item, f))
self._head = f
[docs] def stop(self):
"""
Raise a `StopAsyncIteration` exception on the listener side
once no more already-queued elements are pending.
"""
self._head.cancel()
async def __aiter__(self):
try:
while True:
_, self._tail = await self._tail
yield _
except asyncio.CancelledError:
pass
[docs] def available_iter(self):
"""
Return a *synchronous* iterator of elements that are immediately
available to be consumed without waiting for a task switch.
"""
tail = self._tail
try:
while tail.done():
_, tail = tail.result()
yield _
except asyncio.CancelledError:
pass
[docs] def is_stopped(self):
"""
Return a boolean indicating whether or not :py:func:`stop <stop>`
has been called. Additional elements may still be available.
:return: whether or not the aiter has been stopped
:rtype: bool
"""
return self._tail.cancelled()
[docs] def is_item_available(self):
"""
Return a boolean indicating whether or not an element is available without
blocking for a task switch.
:return: whether or not the aiter has been stopped
:rtype: bool
"""
return self.is_len_at_least(1)
[docs] def is_len_at_least(self, n):
"""
Return a boolean indicating whether or not `n` elements are available without
blocking for a task switch.
:type n: int
:param n: count of items
:return: True iff n items are available
:rtype: bool
"""
for _, item in enumerate(self.available_iter()):
if _+1 >= n:
return True
return False
def __len__(self):
"""
:return: number of items immediately available withouth blocking
:rtype: int
"""
return sum(1 for _ in self.available_iter())