Classes
TCPWorker
4
▼
Structural contract for TCP workers (Worker and SyncWorker).
Both async and sync workers implement…
TCPWorker
4
▼
Structural contract for TCP workers (Worker and SyncWorker).
Both async and sync workers implement this interface. The supervisor uses it for lifecycle management: spawning, draining, and idle checks.
Methods
run
0
▼
run
0
▼
def run(self) -> None
set_lifespan_state
1
▼
set_lifespan_state
1
▼
def set_lifespan_state(self, state: dict[str, Any]) -> None
Parameters
| Name | Type | Description |
|---|---|---|
state |
— |
start_draining
0
▼
start_draining
0
▼
def start_draining(self) -> None
is_idle
0
bool
▼
is_idle
0
bool
▼
def is_idle(self) -> bool
Returns
bool
_WorkerHandle
1
▼
Metadata about a running worker (thread or process).
_WorkerHandle
1
▼
Metadata about a running worker (thread or process).
Methods
Internal Methods 1 ▼
__init__
4
▼
__init__
4
▼
def __init__(self, worker_id: int, target: threading.Thread | multiprocessing.Process, worker: TCPWorker | None, generation: int = 0) -> None
Parameters
| Name | Type | Description |
|---|---|---|
worker_id |
— |
|
target |
— |
|
worker |
— |
|
generation |
— |
Default:0
|
_H3WorkerHandle
1
▼
Metadata about a running H3 worker (UDP/QUIC).
_H3WorkerHandle
1
▼
Metadata about a running H3 worker (UDP/QUIC).
Methods
Internal Methods 1 ▼
__init__
2
▼
__init__
2
▼
def __init__(self, worker_id: int, target: threading.Thread | multiprocessing.Process) -> None
Parameters
| Name | Type | Description |
|---|---|---|
worker_id |
— |
|
target |
— |
Supervisor
20
▼
Spawn and supervise N workers as threads or processes.
The supervisor detects the GIL state and pi…
Supervisor
20
▼
Spawn and supervise N workers as threads or processes.
The supervisor detects the GIL state and picks the appropriate
spawning strategy automatically. Workers share the frozen
ServerConfigand (in thread mode) the ASGI app reference.
Methods
mode
0
WorkerMode
▼
The active worker mode (``"thread"`` or ``"process"``).
property
mode
0
WorkerMode
▼
def mode(self) -> WorkerMode
Returns
WorkerMode
worker_count
0
int
▼
Number of workers the supervisor manages.
property
worker_count
0
int
▼
def worker_count(self) -> int
Returns
int
set_lifespan_state
1
▼
Set the lifespan state dict to be shared with all workers.
set_lifespan_state
1
▼
def set_lifespan_state(self, state: dict[str, Any]) -> None
Parameters
| Name | Type | Description |
|---|---|---|
state |
— |
The state dict populated during lifespan startup. |
run
2
▼
Start all workers and block until shutdown.
Installs signal handlers, spawns w…
run
2
▼
def run(self, sockets: list[socket.socket], udp_sockets: list[socket.socket] | None = None) -> None
Start all workers and block until shutdown.
Installs signal handlers, spawns workers, runs the health-check loop, then joins all workers on shutdown.
Parameters
| Name | Type | Description |
|---|---|---|
sockets |
— |
One TCP socket per worker, created by |
udp_sockets |
— |
Optional UDP sockets for HTTP/3 workers. Default:None
|
shutdown
0
▼
Signal all workers to stop (non-blocking).
shutdown
0
▼
def shutdown(self) -> None
restart_workers
0
▼
Gracefully restart all workers (for dev reload).
Signals all running workers t…
restart_workers
0
▼
def restart_workers(self) -> None
Gracefully restart all workers (for dev reload).
Signals all running workers to stop, waits for them to drain, clears the shutdown event, and spawns fresh workers.
Serialized with graceful_reload and watch-loop respawns via _lifecycle_lock. Skips if a reload is already in progress.
When anapp_pathwas provided and workers run as threads,
the app module is reimported so that code changes on disk take
effect. Process-based workers get fresh imports automatically
on fork and don't need explicit reimport.
graceful_reload
0
▼
Perform zero-downtime rolling restart of all workers.
This method implements a…
graceful_reload
0
▼
def graceful_reload(self) -> None
Perform zero-downtime rolling restart of all workers.
This method implements a rolling restart strategy:
- Reimport the app (thread mode only)
- Spawn new worker generation
- Mark old workers for draining (finish existing, reject new connections)
- Wait for old workers to become idle
- Shut down old workers
This ensures zero dropped requests during code reload.
Note: Only works in thread mode. In process mode, falls back to restart_workers() which has brief downtime.
Internal Methods 13 ▼
__init__
7
▼
__init__
7
▼
def __init__(self, config: ServerConfig, app: ASGIApp, *, mode: WorkerMode | None = None, ssl_context: ssl.SSLContext | None = None, lifecycle_collector: LifecycleCollector | None = None, app_path: str | None = None, sync_app: SyncApp | None = None) -> None
Parameters
| Name | Type | Description |
|---|---|---|
config |
— |
|
app |
— |
|
mode |
— |
Default:None
|
ssl_context |
— |
Default:None
|
lifecycle_collector |
— |
Default:None
|
app_path |
— |
Default:None
|
sync_app |
— |
Default:None
|
_signal_workers_start_draining
0
▼
Mark async workers as draining (503 new connections) during shutdown.
Thread-m…
_signal_workers_start_draining
0
▼
def _signal_workers_start_draining(self) -> None
Mark async workers as draining (503 new connections) during shutdown.
Thread-mode workers expose aWorker / SyncWorkerinstance; process
workers do not (handle.workeris None).
_restart_workers_impl
0
▼
Internal implementation of restart_workers (no lock).
_restart_workers_impl
0
▼
def _restart_workers_impl(self) -> None
_graceful_reload_impl
0
▼
Internal implementation of graceful_reload (no lock).
_graceful_reload_impl
0
▼
def _graceful_reload_impl(self) -> None
_setup_sync_infrastructure
0
▼
Create AsyncPool and AcceptDistributor for sync worker mode.
_setup_sync_infrastructure
0
▼
def _setup_sync_infrastructure(self) -> None
_create_worker
2
Worker | SyncWorker
▼
Create a Worker or SyncWorker based on the execution mode.
_create_worker
2
Worker | SyncWorker
▼
def _create_worker(self, worker_id: int, socket_index: int) -> Worker | SyncWorker
Parameters
| Name | Type | Description |
|---|---|---|
worker_id |
— |
|
socket_index |
— |
Returns
Worker | SyncWorker
_spawn_worker
1
▼
Create and start a single worker.
_spawn_worker
1
▼
def _spawn_worker(self, worker_id: int) -> None
Parameters
| Name | Type | Description |
|---|---|---|
worker_id |
— |
_spawn_h3_worker
1
▼
Create and start a single H3 (HTTP/3) worker.
_spawn_h3_worker
1
▼
def _spawn_h3_worker(self, worker_id: int) -> None
Parameters
| Name | Type | Description |
|---|---|---|
worker_id |
— |
_respawn_worker
1
▼
Restart a crashed worker if within restart budget.
Serialized with restart_wor…
_respawn_worker
1
▼
def _respawn_worker(self, worker_id: int) -> None
Restart a crashed worker if within restart budget.
Serialized with restart_workers/graceful_reload via _lifecycle_lock. Skips if a reload is in progress (avoids overlapping restarts).
Parameters
| Name | Type | Description |
|---|---|---|
worker_id |
— |
_watch
0
▼
Health-check loop — detects crashed workers and restarts them.
_watch
0
▼
def _watch(self) -> None
_drain
0
▼
Wait for all workers to finish draining connections, then clean up.
Signals sh…
_drain
0
▼
def _drain(self) -> None
Wait for all workers to finish draining connections, then clean up.
Signals shutdown to all workers, waits for them to finish processing
active connections (seeshutdown_timeout on ServerConfig), then
force-terminates process workers that haven't stopped.
Workers will reject new connections but finish existing ones for clean shutdown (important for Kubernetes graceful termination).
_force_stop
2
▼
Force-terminate a worker that did not drain in time.
Process workers receive S…
_force_stop
2
▼
def _force_stop(self, handle: _WorkerHandle, join_timeout: float) -> None
Force-terminate a worker that did not drain in time.
Process workers receive SIGTERM then SIGKILL. Thread workers cannot be terminated from Python; they are daemon threads and may outlive this join.
Parameters
| Name | Type | Description |
|---|---|---|
handle |
— |
|
join_timeout |
— |
_install_signals
0
▼
Install SIGINT/SIGTERM handlers to trigger graceful shutdown.
Only effective w…
_install_signals
0
▼
def _install_signals(self) -> None
Install SIGINT/SIGTERM handlers to trigger graceful shutdown.
Only effective when the supervisor runs on the main thread (e.g.,
direct testing). In production the supervisor runs inside a
run_in_executor thread, so signal.signal()will fail
silently.Serverinstalls asyncio signal handlers that call
supervisor.shutdown()instead.
Functions
_parallel_join_targets
2
None
▼
Join each worker thread/process in parallel with its own timeout.
``shutdown_t…
_parallel_join_targets
2
None
▼
def _parallel_join_targets(targets: list[threading.Thread | multiprocessing.Process], timeout_per: float) -> None
Join each worker thread/process in parallel with its own timeout.
shutdown_timeoutis applied per worker (not split across N workers).
Wall-clock time is roughlytimeout_perwhen all workers finish together,
instead of one shared deadline that starved later workers in the join order.
Parameters
| Name | Type | Description |
|---|---|---|
targets |
list[threading.Thread | multiprocessing.Process] |
Threads or processes to |
timeout_per |
float |
Maximum seconds to wait for each target. |
_target_id
1
str
▼
Return an identifier string for a thread or process.
_target_id
1
str
▼
def _target_id(target: threading.Thread | multiprocessing.Process) -> str
Parameters
| Name | Type | Description |
|---|---|---|
target |
threading.Thread | multiprocessing.Process |
Returns
str