Module

supervisor

Supervisor — spawns, monitors, and restarts workers.

The supervisor sits between theServer and the Workerlayer. It detects the GIL state at startup and spawns workers as either threads (on nogil / 3.14t) or processes (on GIL builds). The worker implementation is identical in both modes — only the spawning mechanism differs.

Responsibilities:

  • Spawn N workers with their sockets
  • Monitor worker health (is_alive check on a watchdog loop)
  • Restart crashed workers (up tomax_restartsper window)
  • Coordinate graceful shutdown viathreading.Event
  • Forward SIGINT/SIGTERM to workers

Classes

TCPWorker 4
Structural contract for TCP workers (Worker and SyncWorker). Both async and sync workers implement…

Structural contract for TCP workers (Worker and SyncWorker).

Both async and sync workers implement this interface. The supervisor uses it for lifecycle management: spawning, draining, and idle checks.

Methods

run 0
def run(self) -> None
set_lifespan_state 1
def set_lifespan_state(self, state: dict[str, Any]) -> None
Parameters
Name Type Description
state
start_draining 0
def start_draining(self) -> None
is_idle 0 bool
def is_idle(self) -> bool
Returns
bool
_WorkerHandle 1
Metadata about a running worker (thread or process).

Metadata about a running worker (thread or process).

Methods

Internal Methods 1
__init__ 4
def __init__(self, worker_id: int, target: threading.Thread | multiprocessing.Process, worker: TCPWorker | None, generation: int = 0) -> None
Parameters
Name Type Description
worker_id
target
worker
generation Default:0
_H3WorkerHandle 1
Metadata about a running H3 worker (UDP/QUIC).

Metadata about a running H3 worker (UDP/QUIC).

Methods

Internal Methods 1
__init__ 2
def __init__(self, worker_id: int, target: threading.Thread | multiprocessing.Process) -> None
Parameters
Name Type Description
worker_id
target
Supervisor 20
Spawn and supervise N workers as threads or processes. The supervisor detects the GIL state and pi…

Spawn and supervise N workers as threads or processes.

The supervisor detects the GIL state and picks the appropriate spawning strategy automatically. Workers share the frozen ServerConfigand (in thread mode) the ASGI app reference.

Methods

mode 0 WorkerMode
The active worker mode (``"thread"`` or ``"process"``).
property
def mode(self) -> WorkerMode
Returns
WorkerMode
worker_count 0 int
Number of workers the supervisor manages.
property
def worker_count(self) -> int
Returns
int
set_lifespan_state 1
Set the lifespan state dict to be shared with all workers.
def set_lifespan_state(self, state: dict[str, Any]) -> None
Parameters
Name Type Description
state

The state dict populated during lifespan startup.

run 2
Start all workers and block until shutdown. Installs signal handlers, spawns w…
def run(self, sockets: list[socket.socket], udp_sockets: list[socket.socket] | None = None) -> None

Start all workers and block until shutdown.

Installs signal handlers, spawns workers, runs the health-check loop, then joins all workers on shutdown.

Parameters
Name Type Description
sockets

One TCP socket per worker, created bycreate_listeners().

udp_sockets

Optional UDP sockets for HTTP/3 workers.

Default:None
shutdown 0
Signal all workers to stop (non-blocking).
def shutdown(self) -> None
restart_workers 0
Gracefully restart all workers (for dev reload). Signals all running workers t…
def restart_workers(self) -> None

Gracefully restart all workers (for dev reload).

Signals all running workers to stop, waits for them to drain, clears the shutdown event, and spawns fresh workers.

Serialized with graceful_reload and watch-loop respawns via _lifecycle_lock. Skips if a reload is already in progress.

When anapp_pathwas provided and workers run as threads, the app module is reimported so that code changes on disk take effect. Process-based workers get fresh imports automatically on fork and don't need explicit reimport.

graceful_reload 0
Perform zero-downtime rolling restart of all workers. This method implements a…
def graceful_reload(self) -> None

Perform zero-downtime rolling restart of all workers.

This method implements a rolling restart strategy:

  1. Reimport the app (thread mode only)
  2. Spawn new worker generation
  3. Mark old workers for draining (finish existing, reject new connections)
  4. Wait for old workers to become idle
  5. Shut down old workers

This ensures zero dropped requests during code reload.

Note: Only works in thread mode. In process mode, falls back to restart_workers() which has brief downtime.

Internal Methods 13
__init__ 7
def __init__(self, config: ServerConfig, app: ASGIApp, *, mode: WorkerMode | None = None, ssl_context: ssl.SSLContext | None = None, lifecycle_collector: LifecycleCollector | None = None, app_path: str | None = None, sync_app: SyncApp | None = None) -> None
Parameters
Name Type Description
config
app
mode Default:None
ssl_context Default:None
lifecycle_collector Default:None
app_path Default:None
sync_app Default:None
_signal_workers_start_draining 0
Mark async workers as draining (503 new connections) during shutdown. Thread-m…
def _signal_workers_start_draining(self) -> None

Mark async workers as draining (503 new connections) during shutdown.

Thread-mode workers expose aWorker / SyncWorkerinstance; process workers do not (handle.workeris None).

_restart_workers_impl 0
Internal implementation of restart_workers (no lock).
def _restart_workers_impl(self) -> None
_graceful_reload_impl 0
Internal implementation of graceful_reload (no lock).
def _graceful_reload_impl(self) -> None
_setup_sync_infrastructure 0
Create AsyncPool and AcceptDistributor for sync worker mode.
def _setup_sync_infrastructure(self) -> None
_create_worker 2 Worker | SyncWorker
Create a Worker or SyncWorker based on the execution mode.
def _create_worker(self, worker_id: int, socket_index: int) -> Worker | SyncWorker
Parameters
Name Type Description
worker_id
socket_index
Returns
Worker | SyncWorker
_spawn_worker 1
Create and start a single worker.
def _spawn_worker(self, worker_id: int) -> None
Parameters
Name Type Description
worker_id
_spawn_h3_worker 1
Create and start a single H3 (HTTP/3) worker.
def _spawn_h3_worker(self, worker_id: int) -> None
Parameters
Name Type Description
worker_id
_respawn_worker 1
Restart a crashed worker if within restart budget. Serialized with restart_wor…
def _respawn_worker(self, worker_id: int) -> None

Restart a crashed worker if within restart budget.

Serialized with restart_workers/graceful_reload via _lifecycle_lock. Skips if a reload is in progress (avoids overlapping restarts).

Parameters
Name Type Description
worker_id
_watch 0
Health-check loop — detects crashed workers and restarts them.
def _watch(self) -> None
_drain 0
Wait for all workers to finish draining connections, then clean up. Signals sh…
def _drain(self) -> None

Wait for all workers to finish draining connections, then clean up.

Signals shutdown to all workers, waits for them to finish processing active connections (seeshutdown_timeout on ServerConfig), then force-terminates process workers that haven't stopped.

Workers will reject new connections but finish existing ones for clean shutdown (important for Kubernetes graceful termination).

_force_stop 2
Force-terminate a worker that did not drain in time. Process workers receive S…
def _force_stop(self, handle: _WorkerHandle, join_timeout: float) -> None

Force-terminate a worker that did not drain in time.

Process workers receive SIGTERM then SIGKILL. Thread workers cannot be terminated from Python; they are daemon threads and may outlive this join.

Parameters
Name Type Description
handle
join_timeout
_install_signals 0
Install SIGINT/SIGTERM handlers to trigger graceful shutdown. Only effective w…
def _install_signals(self) -> None

Install SIGINT/SIGTERM handlers to trigger graceful shutdown.

Only effective when the supervisor runs on the main thread (e.g., direct testing). In production the supervisor runs inside a run_in_executor thread, so signal.signal()will fail silently.Serverinstalls asyncio signal handlers that call supervisor.shutdown()instead.

Functions

_parallel_join_targets 2 None
Join each worker thread/process in parallel with its own timeout. ``shutdown_t…
def _parallel_join_targets(targets: list[threading.Thread | multiprocessing.Process], timeout_per: float) -> None

Join each worker thread/process in parallel with its own timeout.

shutdown_timeoutis applied per worker (not split across N workers). Wall-clock time is roughlytimeout_perwhen all workers finish together, instead of one shared deadline that starved later workers in the join order.

Parameters
Name Type Description
targets list[threading.Thread | multiprocessing.Process]

Threads or processes tojoin.

timeout_per float

Maximum seconds to wait for each target.

_target_id 1 str
Return an identifier string for a thread or process.
def _target_id(target: threading.Thread | multiprocessing.Process) -> str
Parameters
Name Type Description
target threading.Thread | multiprocessing.Process
Returns
str