Source code for aiter.join_aiters
import asyncio
[docs]async def join_aiters(aiter_of_aiters):
"""
This wrapper takes an aiter of aiters and pipe the items coming out of all of them into a
single aiter.
:type aiter_of_aiters: async iterator
:param aiter_of_aiters: an aiter that yields aiters
:return: an aiter returning elements that come from any of the underlying aiters
:rtype: async iterator
"""
async def _aiter_to_next_job(aiter):
"""
Return two lists: a list of items to yield, and a list of jobs to add to queue.
"""
try:
items = [await aiter.__anext__()]
jobs = [asyncio.ensure_future(_aiter_to_next_job(aiter))]
except StopAsyncIteration:
items = jobs = []
return items, jobs
async def _main_aiter_to_next_job(aiter_of_aiters):
"""
Return two lists: a list of items to yield, and a list of jobs to add to queue.
"""
try:
items = []
new_aiter = await aiter_of_aiters.__anext__()
jobs = [
asyncio.ensure_future(_aiter_to_next_job(new_aiter.__aiter__())),
asyncio.ensure_future(_main_aiter_to_next_job(aiter_of_aiters))]
except StopAsyncIteration:
jobs = []
return items, jobs
jobs = set([_main_aiter_to_next_job(aiter_of_aiters.__aiter__())])
while jobs:
done, jobs = await asyncio.wait(jobs, return_when=asyncio.FIRST_COMPLETED)
for _ in done:
new_items, new_jobs = await _
for _ in new_items:
yield _
jobs.update(_ for _ in new_jobs)