Classes
Server
15
▼
Top-level server that orchestrates the full lifecycle.
Creates the socket(s), runs lifespan events…
Server
15
▼
Top-level server that orchestrates the full lifecycle.
Creates the socket(s), runs lifespan events, starts workers (via the supervisor when multi-worker), and handles shutdown signals.
Methods
bound_addr
0
tuple[str, int] | None
▼
The actual (host, port) the server bound to, or ``None`` if not yet started.
property
bound_addr
0
tuple[str, int] | None
▼
def bound_addr(self) -> tuple[str, int] | None
Returns
tuple[str, int] | None
run
0
▼
Start the server (blocking).
Lifecycle:
1. Configure logging
2. Resolve effect…
run
0
▼
def run(self) -> None
Start the server (blocking).
Lifecycle:
- Configure logging
- Resolve effective worker count and detect worker mode
- Create TLS context (if configured)
- Print startup banner
- Bind socket(s)
- Run ASGI lifespan startup (once, in the main thread)
- Start worker(s) — single-worker fast path or supervisor
- Wait for shutdown signal
- Run ASGI lifespan shutdown
- Close socket(s)
shutdown
0
▼
Request graceful shutdown. Thread-safe and idempotent.
Can be called from any …
shutdown
0
▼
def shutdown(self) -> None
Request graceful shutdown. Thread-safe and idempotent.
Can be called from any thread to stop a running server. In
single-worker mode this wakes the asyncio event loop via
call_soon_threadsafe. In multi-worker mode this delegates
to the supervisor's shutdown.
Safe to call beforerun()— the server will exit immediately
on startup. Safe to call multiple times.
Internal Methods 12 ▼
__init__
5
▼
__init__
5
▼
def __init__(self, config: ServerConfig, app: ASGIApp, *, app_path: str | None = None, lifecycle_collector: LifecycleCollector | None = None, sync_app: SyncApp | None = None) -> None
Parameters
| Name | Type | Description |
|---|---|---|
config |
— |
|
app |
— |
|
app_path |
— |
Default:None
|
lifecycle_collector |
— |
Default:None
|
sync_app |
— |
Default:None
|
_run_single
0
▼
Run with a single worker — no supervisor, minimal overhead.
_run_single
0
▼
def _run_single(self) -> None
_run_single_with_reload
0
▼
Single-worker mode with auto-reload on source changes.
Runs the worker in a lo…
_run_single_with_reload
0
▼
def _run_single_with_reload(self) -> None
Single-worker mode with auto-reload on source changes.
Runs the worker in a loop. When the file watcher detects changes
it sets_reload_requestedand signals shutdown, causing the
asyncio loop to exit. The outer loop then restarts the worker
with a fresh socket and event loop.
_run_single_async
2
▼
Async entry point for single-worker mode.
async
_run_single_async
2
▼
async def _run_single_async(self, sock: socket.socket, udp_sock: socket.socket | None = None) -> None
Parameters
| Name | Type | Description |
|---|---|---|
sock |
— |
|
udp_sock |
— |
Default:None
|
_run_multi
2
▼
Run with multiple workers managed by the supervisor.
Lifespan runs once in the…
_run_multi
2
▼
def _run_multi(self, effective_workers: int, mode: WorkerMode) -> None
Run with multiple workers managed by the supervisor.
Lifespan runs once in the main process/thread before workers are spawned. Workers do not run lifespan.
When--reloadis active, a file watcher thread runs alongside
the supervisor and triggersrestart_workers()on changes.
Parameters
| Name | Type | Description |
|---|---|---|
effective_workers |
— |
|
mode |
— |
_run_lifespan_then_supervise
3
▼
Run lifespan in the main thread, then hand off to supervisor.
Installs asyncio…
async
_run_lifespan_then_supervise
3
▼
async def _run_lifespan_then_supervise(self, supervisor: Supervisor, sockets: list[socket.socket], udp_sockets: list[socket.socket] | None = None) -> None
Run lifespan in the main thread, then hand off to supervisor.
Installs asyncio signal handlers so SIGINT/SIGTERM trigger a
coordinated graceful shutdown through the supervisor instead of
a rawKeyboardInterrupt. On the first signal the supervisor
drains workers; a second signal removes the handler so the
defaultKeyboardInterruptforces an immediate exit.
Parameters
| Name | Type | Description |
|---|---|---|
supervisor |
— |
|
sockets |
— |
|
udp_sockets |
— |
Default:None
|
_run_single_h3
1
▼
Run HTTP/3 datagram endpoint in single-worker mode.
async
_run_single_h3
1
▼
async def _run_single_h3(self, udp_sock: socket.socket) -> None
Parameters
| Name | Type | Description |
|---|---|---|
udp_sock |
— |
_apply_integrations
0
▼
Configure optional integrations and wrap the app.
Wrapping order (outermost fi…
_apply_integrations
0
▼
def _apply_integrations(self) -> None
Configure optional integrations and wrap the app.
Wrapping order (outermost first):
- Sentry (catches exceptions from all inner layers)
- Request queue (load shedding)
- Rate limiter (per-IP throttling)
- Metrics endpoint (intercepts /metrics)
- App (innermost)
_create_udp_listener_if_h3
1
socket.socket | None
▼
Create a single UDP listener for HTTP/3 if configured and available.
_create_udp_listener_if_h3
1
socket.socket | None
▼
def _create_udp_listener_if_h3(self, actual_addr: tuple[str, int]) -> socket.socket | None
Parameters
| Name | Type | Description |
|---|---|---|
actual_addr |
— |
Returns
socket.socket | None
_create_udp_listeners_if_h3
2
list[socket.socket]
▼
Create multiple UDP listeners for HTTP/3 if configured and available.
_create_udp_listeners_if_h3
2
list[socket.socket]
▼
def _create_udp_listeners_if_h3(self, actual_addr: tuple[str, int], count: int) -> list[socket.socket]
Parameters
| Name | Type | Description |
|---|---|---|
actual_addr |
— |
|
count |
— |
Returns
list[socket.socket]
_print_banner
2
▼
Print the startup banner to stderr.
In JSON mode, emits a single structured JS…
_print_banner
2
▼
def _print_banner(self, effective_workers: int, mode: WorkerMode) -> None
Print the startup banner to stderr.
In JSON mode, emits a single structured JSON log line. In text/pretty mode, prints a human-friendly ASCII banner.
Parameters
| Name | Type | Description |
|---|---|---|
effective_workers |
— |
|
mode |
— |
_close_sockets
1
▼
Close all sockets, deduplicating shared-fd sockets.
On macOS (no SO_REUSEPORT)…
staticmethod
_close_sockets
1
▼
def _close_sockets(sockets: list[socket.socket]) -> None
Close all sockets, deduplicating shared-fd sockets.
On macOS (no SO_REUSEPORT) all workers share the same socket fd.
Deduplicate by fd and guard against already-closed fds to avoid
ValueError: Invalid file descriptor: -1on shutdown.
Parameters
| Name | Type | Description |
|---|---|---|
sockets |
— |
Functions
_get_version
0
str
▼
Get the pounce version string.
_get_version
0
str
▼
def _get_version() -> str
Returns
str