Module

supervisor

Supervisor — spawns, monitors, and restarts workers.

The supervisor sits between theServer and the Workerlayer. It detects the GIL state at startup and spawns workers as either threads (on nogil / 3.14t) or processes (on GIL builds). The worker implementation is identical in both modes — only the spawning mechanism differs.

Responsibilities:

  • Spawn N workers with their sockets
  • Monitor worker health (is_alive check on a watchdog loop)
  • Restart crashed workers (up tomax_restartsper window)
  • Coordinate graceful shutdown viathreading.Event
  • Forward SIGINT/SIGTERM to workers

Classes

_WorkerHandle 1
Metadata about a running worker (thread or process).

Metadata about a running worker (thread or process).

Methods

Internal Methods 1
__init__ 2
def __init__(self, worker_id: int, target: threading.Thread | multiprocessing.Process) -> None
Parameters
Name Type Description
worker_id
target
Supervisor 12
Spawn and supervise N workers as threads or processes. The supervisor detects the GIL state and pi…

Spawn and supervise N workers as threads or processes.

The supervisor detects the GIL state and picks the appropriate spawning strategy automatically. Workers share the frozen ServerConfigand (in thread mode) the ASGI app reference.

Methods

mode 0 WorkerMode
The active worker mode (``"thread"`` or ``"process"``).
property
def mode(self) -> WorkerMode
Returns
WorkerMode
worker_count 0 int
Number of workers the supervisor manages.
property
def worker_count(self) -> int
Returns
int
run 1
Start all workers and block until shutdown. Installs signal handlers, spawns w…
def run(self, sockets: list[socket.socket]) -> None

Start all workers and block until shutdown.

Installs signal handlers, spawns workers, runs the health-check loop, then joins all workers on shutdown.

Parameters
Name Type Description
sockets

One socket per worker, created bycreate_listeners().

shutdown 0
Signal all workers to stop (non-blocking).
def shutdown(self) -> None
restart_workers 0
Gracefully restart all workers (for dev reload). Signals all running workers t…
def restart_workers(self) -> None

Gracefully restart all workers (for dev reload).

Signals all running workers to stop, waits for them to drain, clears the shutdown event, and spawns fresh workers.

When anapp_pathwas provided and workers run as threads, the app module is reimported so that code changes on disk take effect. Process-based workers get fresh imports automatically on fork and don't need explicit reimport.

Internal Methods 7
__init__ 6
def __init__(self, config: ServerConfig, app: ASGIApp, *, mode: WorkerMode | None = None, ssl_context: ssl.SSLContext | None = None, lifecycle_collector: LifecycleCollector | None = None, app_path: str | None = None) -> None
Parameters
Name Type Description
config
app
mode Default:None
ssl_context Default:None
lifecycle_collector Default:None
app_path Default:None
_spawn_worker 1
Create and start a single worker.
def _spawn_worker(self, worker_id: int) -> None
Parameters
Name Type Description
worker_id
_respawn_worker 1
Restart a crashed worker if within restart budget.
def _respawn_worker(self, worker_id: int) -> None
Parameters
Name Type Description
worker_id
_watch 0
Health-check loop — detects crashed workers and restarts them.
def _watch(self) -> None
_drain 0
Wait for all workers to finish, then clean up.
def _drain(self) -> None
_force_stop 1
Force-terminate a worker that did not drain in time.
def _force_stop(self, handle: _WorkerHandle) -> None
Parameters
Name Type Description
handle
_install_signals 0
Install SIGINT/SIGTERM handlers to trigger graceful shutdown. Only effective w…
def _install_signals(self) -> None

Install SIGINT/SIGTERM handlers to trigger graceful shutdown.

Only effective when the supervisor runs on the main thread (e.g., direct testing). In production the supervisor runs inside a run_in_executor thread, so signal.signal()will fail silently.Serverinstalls asyncio signal handlers that call supervisor.shutdown()instead.

Functions

_target_id 1 str
Return an identifier string for a thread or process.
def _target_id(target: threading.Thread | multiprocessing.Process) -> str
Parameters
Name Type Description
target threading.Thread | multiprocessing.Process
Returns
str