Source code for aiter.stoppable_aiter
import asyncio
[docs]class stoppable_aiter:
"""
A wrapper around an iterator that supports a manual shut-off.
"""
def __init__(self, aiter):
self._open_aiter = aiter.__aiter__()
self._is_stopping = False
self._semaphore = asyncio.Semaphore()
def __aiter__(self):
return self
async def __anext__(self):
if self._is_stopping:
raise StopAsyncIteration
async with self._semaphore:
return await self._open_aiter.__anext__()
def stop(self):
self._is_stopping = True