Source code for asgineer._compat

This module provides compatibility for different async libs. Currently
only supporting asyncio, but should not be too hard to add e.g. Trio,
once Uvicorn has Trio support.

import asyncio

[docs]async def sleep(seconds): """An async sleep function. Uses asyncio. Can be extended to support Trio once we support that. """ if True: # if asyncio await asyncio.sleep(seconds)
Event = asyncio.Event async def wait_for_any_then_cancel_the_rest(*coroutines): """Wait for any of the given coroutines to complete (or fail), and then cancels all the other co-routines. """ # Note: ensure_future == create_task. Less readable, but py36 compatible. if True: # if asyncio tasks = [asyncio.ensure_future(co) for co in coroutines] done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for task in pending: task.cancel()