Classes
Worker
11
▼
Single-threaded async worker that serves HTTP requests.
Accepts connections from the provided sock…
Worker
11
▼
Single-threaded async worker that serves HTTP requests.
Accepts connections from the provided socket and handles them using asyncio streams. Each connection is processed in its own task.
Methods
run
0
▼
Start the worker's event loop (blocking).
run
0
▼
def run(self) -> None
shutdown
0
▼
Signal the worker to stop accepting connections.
Safe to call from any thread.…
shutdown
0
▼
def shutdown(self) -> None
Signal the worker to stop accepting connections.
Safe to call from any thread. In multi-worker mode the
supervisor sets the sharedthreading.Eventwhich the bridge
task picks up. In single-worker mode we use
call_soon_threadsafeto safely set the asyncio event from
an external thread.
Internal Methods 9 ▼
__init__
8
▼
__init__
8
▼
def __init__(self, config: ServerConfig, app: ASGIApp, sock: socket.socket, *, worker_id: int = 0, shutdown_event: threading.Event | None = None, max_connections: int = 0, ssl_context: ssl.SSLContext | None = None, lifecycle_collector: LifecycleCollector | None = None) -> None
Parameters
| Name | Type | Description |
|---|---|---|
config |
— |
|
app |
— |
|
sock |
— |
|
worker_id |
— |
Default:0
|
shutdown_event |
— |
Default:None
|
max_connections |
— |
Default:0
|
ssl_context |
— |
Default:None
|
lifecycle_collector |
— |
Default:None
|
_serve
0
▼
Accept connections until shutdown is signaled.
async
_serve
0
▼
async def _serve(self) -> None
_bridge_shutdown
1
▼
Poll an external ``threading.Event`` and set the async shutdown.
Runs as a bac…
async
_bridge_shutdown
1
▼
async def _bridge_shutdown(self, ext_event: threading.Event) -> None
Poll an externalthreading.Eventand set the async shutdown.
Runs as a background task inside the worker's event loop. Checks the threading event every 0.25 s — fast enough for responsive shutdown without measurable overhead.
Parameters
| Name | Type | Description |
|---|---|---|
ext_event |
— |
_handle_connection
2
▼
Handle a single TCP connection through request-response cycles.
After TLS hand…
async
_handle_connection
2
▼
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None
Handle a single TCP connection through request-response cycles.
After TLS handshake, checks ALPN to determine protocol:
- "h2" → HTTP/2 multiplexed connection handler
- "http/1.1" or None → HTTP/1.1 keep-alive loop
HTTP/1.1 also supports WebSocket upgrade mid-connection.
Parameters
| Name | Type | Description |
|---|---|---|
reader |
— |
|
writer |
— |
_handle_request
9
▼
Process a single HTTP request through the ASGI pipeline.
async
_handle_request
9
▼
async def _handle_request(self, request: RequestReceived, proto: H1Protocol, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, client: tuple[str, int], server: tuple[str, int], client_str: str, *, initial_body: list[BodyReceived] | None = None, connection_id: int = 0) -> None
Parameters
| Name | Type | Description |
|---|---|---|
request |
— |
|
proto |
— |
|
reader |
— |
|
writer |
— |
|
client |
— |
|
server |
— |
|
client_str |
— |
|
initial_body |
— |
Default:None
|
connection_id |
— |
Default:0
|
_run_with_disconnect_monitor
9
▼
Run the ASGI app with concurrent client disconnect monitoring.
For bodyless re…
async
_run_with_disconnect_monitor
9
▼
async def _run_with_disconnect_monitor(self, scope: dict, receive: Receive, send: Send, send_state: SendState, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, proto: H1Protocol, disconnect: asyncio.Event, *, connection_id: int = 0) -> None
Run the ASGI app with concurrent client disconnect monitoring.
For bodyless requests (GET/HEAD) where the body is already complete. Spawns a monitor task that reads from the socket to detect client disconnect, and cancels the app task when the client drops.
Mirrors the WebSocket handler's concurrent-task pattern.
Parameters
| Name | Type | Description |
|---|---|---|
scope |
— |
|
receive |
— |
|
send |
— |
|
send_state |
— |
|
reader |
— |
|
writer |
— |
|
proto |
— |
|
disconnect |
— |
|
connection_id |
— |
Default:0
|
_run_with_body_reader
9
▼
Run the ASGI app while concurrently reading request body.
Used when the reques…
async
_run_with_body_reader
9
▼
async def _run_with_body_reader(self, scope: dict, receive: Receive, send: Send, send_state: SendState, body_queue: asyncio.Queue[BodyReceived], proto: H1Protocol, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, *, disconnect: asyncio.Event | None = None) -> None
Run the ASGI app while concurrently reading request body.
Used when the request body spans multiple socket reads (large POSTs, chunked uploads). Follows the same concurrent-task pattern as the WebSocket handler.
The actual HTTP status is captured in send_state by the send callable; this method only sets 500 as a fallback when the app raises without having started a response.
Parameters
| Name | Type | Description |
|---|---|---|
scope |
— |
|
receive |
— |
|
send |
— |
|
send_state |
— |
|
body_queue |
— |
|
proto |
— |
|
reader |
— |
|
writer |
— |
|
disconnect |
— |
Optional event to set when the reader detects client disconnect (EOF or connection error). When provided, this signals the disconnect-aware receive callable so the ASGI app receives None
|
_monitor_disconnect
2
▼
Monitor the TCP connection for client disconnect.
Reads from the socket to det…
async
staticmethod
_monitor_disconnect
2
▼
async def _monitor_disconnect(reader: asyncio.StreamReader, disconnect: asyncio.Event) -> None
Monitor the TCP connection for client disconnect.
Reads from the socket to detect when the client closes the
connection (EOF or error). Sets disconnect to signal the
ASGIreceive()callable, which unblocks any app waiting
forhttp.disconnect.
This mirrors the WebSocket handler's frame-reader pattern but only watches for connection close — no data is expected.
Parameters
| Name | Type | Description |
|---|---|---|
reader |
— |
|
disconnect |
— |
_send_error
4
▼
Send a plain-text error response.
async
_send_error
4
▼
async def _send_error(self, writer: asyncio.StreamWriter, proto: H1Protocol, status: int, message: str) -> None
Parameters
| Name | Type | Description |
|---|---|---|
writer |
— |
|
proto |
— |
|
status |
— |
|
message |
— |
Functions
_create_h1_protocol
1
H1Protocol
▼
Create the best available HTTP/1.1 protocol handler.
Uses httptools when insta…
_create_h1_protocol
1
H1Protocol
▼
def _create_h1_protocol(*, max_incomplete_event_size: int | None = None) -> H1Protocol
Create the best available HTTP/1.1 protocol handler.
Uses httptools when installed (pip install pounce[fast]),
falls back to h11 (pure Python) otherwise.
Parameters
| Name | Type | Description |
|---|---|---|
max_incomplete_event_size |
int | None |
Default:None
|
Returns
H1Protocol
_worker_lifecycle_receive
0
dict[str, Any]
▼
Receive callable for worker lifecycle scopes.
Returns ``http.disconnect`` imme…
async
_worker_lifecycle_receive
0
dict[str, Any]
▼
async def _worker_lifecycle_receive() -> dict[str, Any]
Receive callable for worker lifecycle scopes.
Returnshttp.disconnectimmediately so that apps which pass
unrecognised scope types to their HTTP handler (and call
receive()) unblock and return quickly instead of hanging.
Returns
dict[str, Any]
_worker_lifecycle_send
1
None
▼
No-op send for worker lifecycle scopes.
async
_worker_lifecycle_send
1
None
▼
async def _worker_lifecycle_send(message: dict[str, Any]) -> None
Parameters
| Name | Type | Description |
|---|---|---|
message |
dict[str, Any] |
_is_websocket_upgrade
1
bool
▼
Check if the request is a WebSocket upgrade.
Detects ``Connection: Upgrade`` +…
_is_websocket_upgrade
1
bool
▼
def _is_websocket_upgrade(request: RequestReceived) -> bool
Check if the request is a WebSocket upgrade.
DetectsConnection: Upgrade + Upgrade: websocketheaders.
Parameters
| Name | Type | Description |
|---|---|---|
request |
RequestReceived |
Returns
bool
_get_header_from_tuple
2
bytes | None
▼
Get a header value by lowercase name from a headers tuple.
Single linear scan …
_get_header_from_tuple
2
bytes | None
▼
def _get_header_from_tuple(headers: tuple[tuple[bytes, bytes], ...], name: bytes) -> bytes | None
Get a header value by lowercase name from a headers tuple.
Single linear scan — use when only one header is needed. For
multiple lookups, build a dict with_headers_to_dict.
Parameters
| Name | Type | Description |
|---|---|---|
headers |
tuple[tuple[bytes, bytes], ...] |
|
name |
bytes |
Returns
bytes | None