Module

_hot_reload

Hot reload without connection drops for pounce.

Implements zero-downtime code reloads by gracefully replacing workers without dropping active connections.

Classes

WorkerGeneration 6
Tracks worker generations for hot reload. Each reload creates a new generation of workers. Old gen…

Tracks worker generations for hot reload.

Each reload creates a new generation of workers. Old generations are drained gracefully while new generations handle new requests.

Methods

generation 0 int
Get generation number.
property
def generation(self) -> int
Returns
int Generation number
start_time 0 float
Get worker start time.
property
def start_time(self) -> float
Returns
float Start time (monotonic)
pid 0 int
Get worker process ID.
property
def pid(self) -> int
Returns
int Process ID
uptime 0 float
Get worker uptime in seconds.
property
def uptime(self) -> float
Returns
float Uptime in seconds
is_old_generation 1 bool
Check if this worker is from an old generation.
def is_old_generation(self, current_generation: int) -> bool
Parameters
Name Type Description
current_generation

Current generation number

Returns
bool True if worker is old generation
Internal Methods 1
__init__ 1
Initialize worker generation.
def __init__(self, generation: int = 1) -> None
Parameters
Name Type Description
generation

Generation number (increments with each reload)

Default:1
ReloadCoordinator 8
Coordinates hot reload across supervisor and workers. Manages the reload process: 1. Supervisor in…

Coordinates hot reload across supervisor and workers.

Manages the reload process:

  1. Supervisor increments generation number
  2. New workers start with new generation
  3. Old workers drain gracefully
  4. Supervisor waits for old workers to finish

Methods

current_generation 0 int
Get current generation number.
property
def current_generation(self) -> int
Returns
int Current generation
reload_requested 0 bool
Check if reload was requested.
property
def reload_requested(self) -> bool
Returns
bool True if reload requested
reload_in_progress 0 bool
Check if reload is in progress.
property
def reload_in_progress(self) -> bool
Returns
bool True if reload in progress
request_reload 0
Request a hot reload. This sets a flag that the supervisor checks. When detect…
def request_reload(self) -> None

Request a hot reload.

This sets a flag that the supervisor checks. When detected, the supervisor starts the reload process.

start_reload 0 int
Start reload process and increment generation.
def start_reload(self) -> int
Returns
int New generation number
finish_reload 0
Mark reload as finished.
def finish_reload(self) -> None
cancel_reload 0
Cancel pending reload request.
def cancel_reload(self) -> None
Internal Methods 1
__init__ 0
Initialize reload coordinator.
def __init__(self) -> None

Functions

enable_socket_reuse 1 None
Enable SO_REUSEADDR and SO_REUSEPORT on socket. This allows multiple workers t…
def enable_socket_reuse(sock: socket.socket) -> None

Enable SO_REUSEADDR and SO_REUSEPORT on socket.

This allows multiple workers to bind to the same address/port, enabling zero-downtime reloads.

Parameters
Name Type Description
sock socket.socket

Socket to configure

is_hot_reload_supported 0 bool
Check if hot reload is supported on this platform.
def is_hot_reload_supported() -> bool
Returns
bool
create_reloadable_socket 4 socket.socket
Create a socket configured for hot reload.
def create_reloadable_socket(host: str, port: int, *, backlog: int = 2048, family: socket.AddressFamily = socket.AF_INET) -> socket.socket
Parameters
Name Type Description
host str

Host address to bind

port int

Port to bind

backlog int

Listen backlog

Default:2048
family socket.AddressFamily

Socket family (AF_INET or AF_INET6)

Default:socket.AF_INET
Returns
socket.socket
get_reload_status 0 dict[str, Any]
Get status information about reload capability.
def get_reload_status() -> dict[str, Any]
Returns
dict[str, Any]
should_drain_worker 3 bool
Determine if worker should start draining. Workers from old generations should…
def should_drain_worker(worker_generation: WorkerGeneration, current_generation: int, drain_timeout: float) -> bool

Determine if worker should start draining.

Workers from old generations should drain after new workers start.

Parameters
Name Type Description
worker_generation WorkerGeneration

Worker's generation info

current_generation int

Current generation number

drain_timeout float

Maximum time to wait before forcing drain

Returns
bool
wait_for_workers_to_drain 3 bool
Wait for old workers to drain gracefully.
def wait_for_workers_to_drain(workers: list[Any], timeout: float, *, check_interval: float = 0.5) -> bool
Parameters
Name Type Description
workers list[Any]

List of worker handles

timeout float

Maximum time to wait (seconds)

check_interval float

How often to check worker status (seconds)

Default:0.5
Returns
bool