Source code for aiter.map_aiter

import inspect

from .parallel_map_aiter import parallel_map_aiter
from .simple_map_aiter import simple_map_aiter


[docs]def map_aiter(map_f, aiter, worker_count=1): """ Take an async iterator and a map function, and apply the function to everything coming out of the iterator before passing it on. In this case, the map_f must return a list, which will be flattened. Empty lists are okay, so you can filter items by excluding them from the list. Note that since there are multiple workers, the order or processed elements might not match the input order. :type aiter: async iterator :param aiter: an aiter :type map_f: a function, regular or async, that accepts a single parameter and returns a list (or other iterable) :param map_f: the mapping function :type worker_count: int :param worker_count: the number of worker tasks that pull items out of aiter :return: an aiter returning transformed items that have been processed through map_f :rtype: an async iterator """ if (worker_count > 1 and not inspect.iscoroutinefunction(map_f) and not inspect.isasyncgenfunction(map_f)): raise ValueError( "map_f is not a coroutine, which makes " "it pointless to use more than 1 worker") if worker_count > 1: return parallel_map_aiter(map_f, aiter, worker_count) return simple_map_aiter(map_f, aiter)