Module

state

Store, dispatch, saga runner, combine_reducers.

Classes

EffectResult 5
What an effect handler returns to tell the saga runner how to advance the generator.

What an effect handler returns to tell the saga runner how to advance the generator.

Methods

send 1 EffectResult
Resume the saga with ``saga.send(value)``.
classmethod
def send(cls, value: Any) -> EffectResult
Parameters
Name Type Description
value
Returns
EffectResult
next 0 EffectResult
Advance the saga with ``next(saga)``.
classmethod
def next(cls) -> EffectResult
Returns
EffectResult
throw 1 EffectResult
Throw an exception into the saga with ``saga.throw(error)``.
classmethod
def throw(cls, error: Exception) -> EffectResult
Parameters
Name Type Description
error
Returns
EffectResult
cont 0 EffectResult
Loop back to the cancellation check at the top of the runner.
classmethod
def cont(cls) -> EffectResult
Returns
EffectResult
Internal Methods 1
__init__ 3
def __init__(self, action: str, value: Any = None, error: Exception | None = None) -> None
Parameters
Name Type Description
action
value Default:None
error Default:None
SagaContext 5
Runtime context for a running saga — threading identity + cancellation scope. Provides structured …

Runtime context for a running saga — threading identity + cancellation scope.

Provides structured cancellation: when a parent context is cancelled, all child contexts are cancelled transitively viacancel_tree().

This is mutable runtime state (not Store state), so it uses a regular class with__slots__rather than a frozen dataclass.

Methods

is_cancelled 0 bool
True if this context's cancel event has been set.
property
def is_cancelled(self) -> bool
Returns
bool
cancel_tree 0
Cancel this context and all descendants transitively.
def cancel_tree(self) -> None
child 1 SagaContext
Create a child context that inherits this cancel scope.
def child(self, saga_id: str | None = None) -> SagaContext
Parameters
Name Type Description
saga_id Default:None
Returns
SagaContext
detached_child 1 SagaContext
Create a child with its own independent cancel scope.
def detached_child(self, saga_id: str | None = None) -> SagaContext
Parameters
Name Type Description
saga_id Default:None
Returns
SagaContext
Internal Methods 1
__init__ 3
def __init__(self, saga_id: str | None = None, cancel: threading.Event | None = None, parent: SagaContext | None = None) -> None
Parameters
Name Type Description
saga_id Default:None
cancel Default:None
parent Default:None
Store 24
Centralized state container with saga support. Thread-safety: reads are lock-free (frozen state). …

Centralized state container with saga support.

Thread-safety: reads are lock-free (frozen state). Dispatch serializes through a lock. Sagas run on a ThreadPoolExecutor.

max_workers defaults to None, which auto-sizes via kida.get_optimal_workersunder the IO_BOUND profile — returning the OS-aware pool size (capped at 8). Sagas execute side-effect code (Call, Take, Retry) which is I/O-shaped, not CPU-bound rendering. Pass an explicit integer to override when you know the workload (e.g. max_workers=Nfor benchmarks that fire N concurrent blocking sagas).

Methods

state 0 Any
property
def state(self) -> Any
Returns
Any
pool_active 0 int
Number of currently active tasks in the thread pool.
property
def pool_active(self) -> int
Returns
int
view_state 0 Any
Latest ViewState from a reducer, or None.
property
def view_state(self) -> Any
Returns
Any
quit_requested 0 bool
True if a reducer returned Quit.
property
def quit_requested(self) -> bool
Returns
bool
exit_code 0 int
Exit code from the Quit signal (default 0).
property
def exit_code(self) -> int
Returns
int
recording 0 list[dict] | None
Get session recording if enabled.
property
def recording(self) -> list[dict] | None
Returns
list[dict] | None
dispatch 1
Dispatch action through middleware -> reducer.
def dispatch(self, action: Action) -> None
Parameters
Name Type Description
action
run_saga 3 SagaContext
Schedule a saga on the thread pool.
def run_saga(self, saga: Any, cancel: threading.Event | None = None, context: SagaContext | None = None) -> SagaContext
Parameters
Name Type Description
saga

Generator saga to execute.

cancel

Optional cancellation event (legacy). Prefer context.

Default:None
context

Optional :class:SagaContextfor structured cancellation. If neither cancel nor context is provided, a fresh context is created automatically.

Default:None
Returns
SagaContext The :class:`SagaContext` assigned to this saga (useful for cancellation and debugging).
subscribe 1 Callable[[], None]
Register state-change listener. Returns unsubscribe callable.
def subscribe(self, listener: Callable) -> Callable[[], None]
Parameters
Name Type Description
listener
Returns
Callable[[], None]
shutdown 0
Cancel root sagas and shut down the thread pool.
def shutdown(self) -> None
Internal Methods 14
__init__ 7
def __init__(self, reducer: Callable, initial_state: Any, middleware: tuple[Callable, ...] = (), *, record: bool | str | Path = False, max_workers: int | None = None, on_pool_pressure: Callable[[int, int], None] | None = None, pool_pressure_threshold: float = 0.8) -> None
Parameters
Name Type Description
reducer
initial_state
middleware Default:()
record Default:False
max_workers Default:None
on_pool_pressure Default:None
pool_pressure_threshold Default:0.8
_tracked_submit 2
Submit work to the pool, tracking active tasks for pressure detection.
def _tracked_submit(self, fn, *args)
Parameters
Name Type Description
fn
*args
_base_dispatch 1
Core dispatch: reducer + saga scheduling + cmd execution + recording.
def _base_dispatch(self, action: Action) -> None
Parameters
Name Type Description
action
_run_saga 2
Step through a generator saga, executing effects via handler registry. Catches…
def _run_saga(self, saga: Any, context: SagaContext | None = None) -> None

Step through a generator saga, executing effects via handler registry.

Catches unhandled exceptions and dispatches @@SAGA_ERROR so the reducer can react gracefully. The error is never swallowed silently.

Parameters
Name Type Description
saga
context Default:None
_execute_timeout 2 Any
Execute a blocking effect with a timeout deadline. Uses a dedicated thread (no…
def _execute_timeout(self, effect: Call | Retry, seconds: float) -> Any

Execute a blocking effect with a timeout deadline.

Uses a dedicated thread (not the shared pool) to avoid deadlock when the saga itself is already running on the pool.

Parameters
Name Type Description
effect
seconds
Returns
Any
_execute_effect 1 Any
Execute a single blocking effect and return its result.
staticmethod
def _execute_effect(effect: Call | Retry) -> Any
Parameters
Name Type Description
effect
Returns
Any
_run_saga_capturing 5
Step through a saga via ``_run_saga``, capturing the return value. Wraps *saga…
def _run_saga_capturing(self, saga: Any, context: SagaContext, result_box: list, error_box: list, done: threading.Event) -> None

Step through a saga via_run_saga, capturing the return value.

Wraps saga in a thinyield fromgenerator so that _run_sagahandles all effect types (including nested Race/All/Take/Debounce). On success the return value is appended to result_box; on error the exception goes into error_box. done is set in all cases.

Parameters
Name Type Description
saga
context
result_box
error_box
done
_execute_race 2 Any
Run sagas concurrently, return the first result. Cancel losers.
def _execute_race(self, child_sagas: tuple, parent_context: SagaContext) -> Any
Parameters
Name Type Description
child_sagas
parent_context
Returns
Any
_execute_all 2 tuple
Run sagas concurrently, wait for all. Fail-fast on first error.
def _execute_all(self, child_sagas: tuple, parent_context: SagaContext) -> tuple
Parameters
Name Type Description
child_sagas
parent_context
Returns
tuple
_exec_cmd 1
Execute a Cmd, Batch, Sequence, or TickCmd.
def _exec_cmd(self, cmd: Any) -> None
Parameters
Name Type Description
cmd
_run_cmd 1
Run a single Cmd thunk and dispatch its result.
def _run_cmd(self, fn: Any) -> None
Parameters
Name Type Description
fn
_submit_batch 1
Submit batch commands with a single pressure check. Performs one pressure chec…
def _submit_batch(self, cmds: tuple) -> None

Submit batch commands with a single pressure check.

Performs one pressure check and bulk-increments the task counter, then submits each Cmd directly to the executor. This avoids per-Cmd pressure checks and lock acquisitions.

Parameters
Name Type Description
cmds
_run_sequence 1
Run commands serially, dispatching each result before the next.
def _run_sequence(self, cmds: tuple) -> None
Parameters
Name Type Description
cmds
_run_tick 1
Schedule a single @@TICK after *interval* seconds.
def _run_tick(self, interval: float) -> None
Parameters
Name Type Description
interval

Functions

_handle_take_every 3 EffectResult
Block until cancelled, forking a new saga for every matching action.
def _handle_take_every(effect: TakeEvery, context: SagaContext, store: Store) -> EffectResult
Parameters
Name Type Description
effect TakeEvery
context SagaContext
store Store
Returns
EffectResult
_handle_take_latest 3 EffectResult
Block until cancelled, forking a saga for the latest matching action only.
def _handle_take_latest(effect: TakeLatest, context: SagaContext, store: Store) -> EffectResult
Parameters
Name Type Description
effect TakeLatest
context SagaContext
store Store
Returns
EffectResult
_cleanup_take_waiter 3 None
Remove an unconsumed Take waiter from the store.
def _cleanup_take_waiter(store: Store, action_type: str, waiter_event: threading.Event) -> None
Parameters
Name Type Description
store Store
action_type str
waiter_event threading.Event
combine_reducers 1 Callable
Combine multiple reducers into one that manages a dict state. Each reducer man…
def combine_reducers(**reducers: Callable) -> Callable

Combine multiple reducers into one that manages a dict state.

Each reducer manages a slice of state under its keyword name. Sagas, cmds, and view state from ReducerResult and Quit are collected and propagated.

Parameters
Name Type Description
**reducers Callable
Returns
Callable
_merge_view 2 ViewState
Merge two ViewStates: explicitly-set fields in *new* override *prev*.
def _merge_view(prev: ViewState | None, new: ViewState) -> ViewState
Parameters
Name Type Description
prev ViewState | None
new ViewState
Returns
ViewState
_execute_retry 7 Any
Execute a function with retry and backoff.
def _execute_retry(fn: Any, args: tuple, kwargs: dict, max_attempts: int, backoff: str, base_delay: float, max_delay: float) -> Any
Parameters
Name Type Description
fn Any
args tuple
kwargs dict
max_attempts int
backoff str
base_delay float
max_delay float
Returns
Any