Source code for asgineer._app

"""
This module implements an ASGI application class that forms the adapter
between the user-defined handler function and the ASGI server.
"""

import sys
import json
import logging
import inspect
from . import _request
from ._request import HttpRequest, WebsocketRequest, DisconnectedError

# Initialize the logger
logger = logging.getLogger("asgineer")
logger.propagate = False
logger.setLevel(logging.INFO)
_handler = logging.StreamHandler(sys.stderr)
_handler.setFormatter(
    logging.Formatter(
        fmt="[%(levelname)s %(asctime)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
    )
)
logger.addHandler(_handler)


[docs]def normalize_response(response): """Normalize the given response, by always returning a 3-element tuple (status, headers, body). The body is not "resolved"; it is safe to call this function multiple times on the same response. """ # Get status, headers and body from the response if isinstance(response, tuple): if len(response) == 3: status, headers, body = response elif len(response) == 2: status = 200 headers, body = response elif len(response) == 1: status, headers, body = 200, {}, response[0] else: raise ValueError(f"Handler returned {len(response)}-tuple.") else: status, headers, body = 200, {}, response # Validate status and headers if not isinstance(status, int): raise ValueError(f"Status code must be an int, not {type(status)}") if not isinstance(headers, dict): raise ValueError(f"Headers must be a dict, not {type(headers)}") return status, headers, body
[docs]def guess_content_type_from_body(body): """Guess the content-type based of the body. * "text/html" for str bodies starting with ``<!DOCTYPE html>`` or ``<html>``. * "text/plain" for other str bodies. * "application/json" for dict bodies. * "application/octet-stream" otherwise. """ if isinstance(body, str): if body.startswith(("<!DOCTYPE html>", "<!doctype html>", "<html>")): return "text/html" else: return "text/plain" elif isinstance(body, dict): return "application/json" else: return "application/octet-stream"
[docs]def to_asgi(handler): """Convert a request handler (a coroutine function) to an ASGI application, which can be served with an ASGI server, such as Uvicorn, Hypercorn, Daphne, etc. """ if not inspect.iscoroutinefunction(handler): raise TypeError( "asgineer.to_asgi() handler function must be a coroutine function." ) async def application_wrapper(scope, receive, send): return await asgineer_application(handler, scope, receive, send) application_wrapper.__module__ = handler.__module__ application_wrapper.__name__ = handler.__name__ application_wrapper.__doc__ = handler.__doc__ application_wrapper.asgineer_handler = handler return application_wrapper
async def asgineer_application(handler, scope, receive, send): # server_version = scope["asgi"].get("version", "2.0") # spec_version = scope["asgi"].get("spec_version", "2.0") if scope["type"] == "http": request = HttpRequest(scope, receive, send) await _handle_http(handler, request) elif scope["type"] == "websocket": request = WebsocketRequest(scope, receive, send) await _handle_websocket(handler, request) elif scope["type"] == "lifespan": await _handle_lifespan(receive, send) else: logger.warning(f"Unknown ASGI type {scope['type']}") async def _handle_lifespan(receive, send): while True: message = await receive() if message["type"] == "lifespan.startup": try: # Could do startup stuff here logger.info("Server is starting up") except Exception as err: # pragma: no cover await send({"type": "lifespan.startup.failed", "message": str(err)}) else: await send({"type": "lifespan.startup.complete"}) elif message["type"] == "lifespan.shutdown": try: # Could do shutdown stuff here logger.info("Server is shutting down") except Exception as err: # pragma: no cover await send({"type": "lifespan.shutdown.failed", "message": str(err)}) else: await send({"type": "lifespan.shutdown.complete"}) return else: logger.warning(f"Unknown lifespan message {message['type']}") async def _handle_http(handler, request): try: # Call request handler to get the result where = "request handler" result = await handler(request) if request._app_state == _request.CONNECTING: # Process the handler output where = "processing handler output" status, headers, body = normalize_response(result) # Make sure that there is a content type if "content-type" not in headers: headers["content-type"] = guess_content_type_from_body(body) # Convert the body if isinstance(body, bytes): pass elif isinstance(body, str): body = body.encode() elif isinstance(body, dict): try: body = json.dumps(body).encode() except Exception as err: raise ValueError(f"Could not JSON encode body: {err}") elif inspect.isasyncgen(body): # Returning an async generator used to be THE way to do chunked # responses before version 0.8. We keep it for backwards # compatibility, and because it can be quite nice. pass else: if inspect.isgenerator(body): raise ValueError( "Body cannot be a regular generator, use an async generator." ) elif inspect.iscoroutine(body): raise ValueError("Body cannot be a coroutine, forgot await?") else: raise ValueError(f"Body cannot be {type(body)}.") # Send response. Note that per the spec, if we do not specify # the content-length, the server sets Transfer-Encoding to chunked. if isinstance(body, bytes): where = "sending response" headers.setdefault("content-length", str(len(body))) await request.accept(status, headers) await request.send(body, more=False) else: where = "sending chunked response" accepted = False async for chunk in body: if not isinstance(chunk, (bytes, str)): raise ValueError("Response chunks must be bytes or str.") if not accepted: await request.accept(status, headers) accepted = True await request.send(chunk) else: # If the handler accepted the request, it should use send, not return. if result is not None: raise IOError("Handlers that call request.accept() should return None.") # Mark end of data, if needed if request._app_state == _request.CONNECTED: where = "finalizing response" await request.send(b"", more=False) except DisconnectedError: pass # Not really an error except Exception as err: # Process errors. We log them, and if possible send a 500 error_text = f"{type(err).__name__} in {where}: {str(err)}" logger.error(error_text, exc_info=err) if request._app_state == _request.CONNECTING: await request.accept(500, {}) await request.send(error_text, more=False) elif request._app_state == _request.CONNECTED: await request.send(b"", more=False) # At least close it finally: # Clean up try: await request._destroy() except Exception as err: # pragma: no cover logger.error(f"Error in cleanup: {str(err)}", exc_info=err) async def _handle_websocket(handler, request): try: result = await handler(request) if result is not None: error_text = ( "A websocket handler should return None; " + "use request.send() and request.receive() to communicate." ) raise IOError(error_text) except DisconnectedError: pass # Not really an error except Exception as err: error_text = f"{type(err).__name__} in websocket handler: {str(err)}" logger.error(error_text, exc_info=err) finally: # The ASGI spec specifies that ASGI servers should close the # ws connection when the task ends. At the time of writing # (04-10-2018), only Uvicorn does this. And at 18-08-2020 Daphne # still doesn't. So ... just close for good measure. try: await request.close() except Exception: # pragma: no cover pass # Also clean up try: await request._destroy() except Exception as err: # pragma: no cover logger.error(f"Error in ws cleanup: {str(err)}", exc_info=err)