Source code for asgineer._compat

"""
This module provides compatibility for different async libs. Currently
only supporting asyncio, but should not be too hard to add e.g. Trio,
once Uvicorn has Trio support.
"""

import asyncio


[docs]async def sleep(seconds): """An async sleep function. Uses asyncio. Can be extended to support Trio once we support that. """ if True: # if asyncio await asyncio.sleep(seconds)
Event = asyncio.Event async def wait_for_any_then_cancel_the_rest(*coroutines): """Wait for any of the given coroutines to complete (or fail), and then cancels all the other co-routines. """ # Note: ensure_future == create_task. Less readable, but py36 compatible. if True: # if asyncio tasks = [asyncio.ensure_future(co) for co in coroutines] done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for task in pending: task.cancel()