Classes
EffectResult
5
▼
What an effect handler returns to tell the saga runner how to advance the generator.
EffectResult
5
▼
What an effect handler returns to tell the saga runner how to advance the generator.
Methods
send
1
EffectResult
▼
Resume the saga with ``saga.send(value)``.
classmethod
send
1
EffectResult
▼
def send(cls, value: Any) -> EffectResult
Parameters
| Name | Type | Description |
|---|---|---|
value |
— |
Returns
EffectResult
next
0
EffectResult
▼
Advance the saga with ``next(saga)``.
classmethod
next
0
EffectResult
▼
def next(cls) -> EffectResult
Returns
EffectResult
throw
1
EffectResult
▼
Throw an exception into the saga with ``saga.throw(error)``.
classmethod
throw
1
EffectResult
▼
def throw(cls, error: Exception) -> EffectResult
Parameters
| Name | Type | Description |
|---|---|---|
error |
— |
Returns
EffectResult
cont
0
EffectResult
▼
Loop back to the cancellation check at the top of the runner.
classmethod
cont
0
EffectResult
▼
def cont(cls) -> EffectResult
Returns
EffectResult
Internal Methods 1 ▼
__init__
3
▼
__init__
3
▼
def __init__(self, action: str, value: Any = None, error: Exception | None = None) -> None
Parameters
| Name | Type | Description |
|---|---|---|
action |
— |
|
value |
— |
Default:None
|
error |
— |
Default:None
|
SagaContext
5
▼
Runtime context for a running saga — threading identity + cancellation scope.
Provides structured …
SagaContext
5
▼
Runtime context for a running saga — threading identity + cancellation scope.
Provides structured cancellation: when a parent context is cancelled,
all child contexts are cancelled transitively viacancel_tree().
This is mutable runtime state (not Store state), so it uses a regular
class with__slots__rather than a frozen dataclass.
Methods
is_cancelled
0
bool
▼
True if this context's cancel event has been set.
property
is_cancelled
0
bool
▼
def is_cancelled(self) -> bool
Returns
bool
cancel_tree
0
▼
Cancel this context and all descendants transitively.
cancel_tree
0
▼
def cancel_tree(self) -> None
child
1
SagaContext
▼
Create a child context that inherits this cancel scope.
child
1
SagaContext
▼
def child(self, saga_id: str | None = None) -> SagaContext
Parameters
| Name | Type | Description |
|---|---|---|
saga_id |
— |
Default:None
|
Returns
SagaContext
detached_child
1
SagaContext
▼
Create a child with its own independent cancel scope.
detached_child
1
SagaContext
▼
def detached_child(self, saga_id: str | None = None) -> SagaContext
Parameters
| Name | Type | Description |
|---|---|---|
saga_id |
— |
Default:None
|
Returns
SagaContext
Internal Methods 1 ▼
__init__
3
▼
__init__
3
▼
def __init__(self, saga_id: str | None = None, cancel: threading.Event | None = None, parent: SagaContext | None = None) -> None
Parameters
| Name | Type | Description |
|---|---|---|
saga_id |
— |
Default:None
|
cancel |
— |
Default:None
|
parent |
— |
Default:None
|
Store
24
▼
Centralized state container with saga support.
Thread-safety: reads are lock-free (frozen state).
…
Store
24
▼
Centralized state container with saga support.
Thread-safety: reads are lock-free (frozen state). Dispatch serializes through a lock. Sagas run on a ThreadPoolExecutor.
max_workers defaults to None, which auto-sizes via
kida.get_optimal_workersunder the IO_BOUND profile — returning the
OS-aware pool size (capped at 8). Sagas execute side-effect code (Call,
Take, Retry) which is I/O-shaped, not CPU-bound rendering. Pass an
explicit integer to override when you know the workload (e.g.
max_workers=Nfor benchmarks that fire N concurrent blocking sagas).
Methods
state
0
Any
▼
property
state
0
Any
▼
def state(self) -> Any
Returns
Any
pool_active
0
int
▼
Number of currently active tasks in the thread pool.
property
pool_active
0
int
▼
def pool_active(self) -> int
Returns
int
view_state
0
Any
▼
Latest ViewState from a reducer, or None.
property
view_state
0
Any
▼
def view_state(self) -> Any
Returns
Any
quit_requested
0
bool
▼
True if a reducer returned Quit.
property
quit_requested
0
bool
▼
def quit_requested(self) -> bool
Returns
bool
exit_code
0
int
▼
Exit code from the Quit signal (default 0).
property
exit_code
0
int
▼
def exit_code(self) -> int
Returns
int
recording
0
list[dict] | None
▼
Get session recording if enabled.
property
recording
0
list[dict] | None
▼
def recording(self) -> list[dict] | None
Returns
list[dict] | None
dispatch
1
▼
Dispatch action through middleware -> reducer.
dispatch
1
▼
def dispatch(self, action: Action) -> None
Parameters
| Name | Type | Description |
|---|---|---|
action |
— |
run_saga
3
SagaContext
▼
Schedule a saga on the thread pool.
run_saga
3
SagaContext
▼
def run_saga(self, saga: Any, cancel: threading.Event | None = None, context: SagaContext | None = None) -> SagaContext
Parameters
| Name | Type | Description |
|---|---|---|
saga |
— |
Generator saga to execute. |
cancel |
— |
Optional cancellation event (legacy). Prefer context. Default:None
|
context |
— |
Optional :class: None
|
Returns
SagaContext
The :class:`SagaContext` assigned to this saga (useful for
cancellation and debugging).
subscribe
1
Callable[[], None]
▼
Register state-change listener. Returns unsubscribe callable.
subscribe
1
Callable[[], None]
▼
def subscribe(self, listener: Callable) -> Callable[[], None]
Parameters
| Name | Type | Description |
|---|---|---|
listener |
— |
Returns
Callable[[], None]
shutdown
0
▼
Cancel root sagas and shut down the thread pool.
shutdown
0
▼
def shutdown(self) -> None
Internal Methods 14 ▼
__init__
7
▼
__init__
7
▼
def __init__(self, reducer: Callable, initial_state: Any, middleware: tuple[Callable, ...] = (), *, record: bool | str | Path = False, max_workers: int | None = None, on_pool_pressure: Callable[[int, int], None] | None = None, pool_pressure_threshold: float = 0.8) -> None
Parameters
| Name | Type | Description |
|---|---|---|
reducer |
— |
|
initial_state |
— |
|
middleware |
— |
Default:()
|
record |
— |
Default:False
|
max_workers |
— |
Default:None
|
on_pool_pressure |
— |
Default:None
|
pool_pressure_threshold |
— |
Default:0.8
|
_tracked_submit
2
▼
Submit work to the pool, tracking active tasks for pressure detection.
_tracked_submit
2
▼
def _tracked_submit(self, fn, *args)
Parameters
| Name | Type | Description |
|---|---|---|
fn |
— |
|
*args |
— |
_base_dispatch
1
▼
Core dispatch: reducer + saga scheduling + cmd execution + recording.
_base_dispatch
1
▼
def _base_dispatch(self, action: Action) -> None
Parameters
| Name | Type | Description |
|---|---|---|
action |
— |
_run_saga
2
▼
Step through a generator saga, executing effects via handler registry.
Catches…
_run_saga
2
▼
def _run_saga(self, saga: Any, context: SagaContext | None = None) -> None
Step through a generator saga, executing effects via handler registry.
Catches unhandled exceptions and dispatches @@SAGA_ERROR so the reducer can react gracefully. The error is never swallowed silently.
Parameters
| Name | Type | Description |
|---|---|---|
saga |
— |
|
context |
— |
Default:None
|
_execute_timeout
2
Any
▼
Execute a blocking effect with a timeout deadline.
Uses a dedicated thread (no…
_execute_timeout
2
Any
▼
def _execute_timeout(self, effect: Call | Retry, seconds: float) -> Any
Execute a blocking effect with a timeout deadline.
Uses a dedicated thread (not the shared pool) to avoid deadlock when the saga itself is already running on the pool.
Parameters
| Name | Type | Description |
|---|---|---|
effect |
— |
|
seconds |
— |
Returns
Any
_execute_effect
1
Any
▼
Execute a single blocking effect and return its result.
staticmethod
_execute_effect
1
Any
▼
def _execute_effect(effect: Call | Retry) -> Any
Parameters
| Name | Type | Description |
|---|---|---|
effect |
— |
Returns
Any
_run_saga_capturing
5
▼
Step through a saga via ``_run_saga``, capturing the return value.
Wraps *saga…
_run_saga_capturing
5
▼
def _run_saga_capturing(self, saga: Any, context: SagaContext, result_box: list, error_box: list, done: threading.Event) -> None
Step through a saga via_run_saga, capturing the return value.
Wraps saga in a thinyield fromgenerator so that
_run_sagahandles all effect types (including nested
Race/All/Take/Debounce). On success the return value is
appended to result_box; on error the exception goes into
error_box. done is set in all cases.
Parameters
| Name | Type | Description |
|---|---|---|
saga |
— |
|
context |
— |
|
result_box |
— |
|
error_box |
— |
|
done |
— |
_execute_race
2
Any
▼
Run sagas concurrently, return the first result. Cancel losers.
_execute_race
2
Any
▼
def _execute_race(self, child_sagas: tuple, parent_context: SagaContext) -> Any
Parameters
| Name | Type | Description |
|---|---|---|
child_sagas |
— |
|
parent_context |
— |
Returns
Any
_execute_all
2
tuple
▼
Run sagas concurrently, wait for all. Fail-fast on first error.
_execute_all
2
tuple
▼
def _execute_all(self, child_sagas: tuple, parent_context: SagaContext) -> tuple
Parameters
| Name | Type | Description |
|---|---|---|
child_sagas |
— |
|
parent_context |
— |
Returns
tuple
_exec_cmd
1
▼
Execute a Cmd, Batch, Sequence, or TickCmd.
_exec_cmd
1
▼
def _exec_cmd(self, cmd: Any) -> None
Parameters
| Name | Type | Description |
|---|---|---|
cmd |
— |
_run_cmd
1
▼
Run a single Cmd thunk and dispatch its result.
_run_cmd
1
▼
def _run_cmd(self, fn: Any) -> None
Parameters
| Name | Type | Description |
|---|---|---|
fn |
— |
_submit_batch
1
▼
Submit batch commands with a single pressure check.
Performs one pressure chec…
_submit_batch
1
▼
def _submit_batch(self, cmds: tuple) -> None
Submit batch commands with a single pressure check.
Performs one pressure check and bulk-increments the task counter, then submits each Cmd directly to the executor. This avoids per-Cmd pressure checks and lock acquisitions.
Parameters
| Name | Type | Description |
|---|---|---|
cmds |
— |
_run_sequence
1
▼
Run commands serially, dispatching each result before the next.
_run_sequence
1
▼
def _run_sequence(self, cmds: tuple) -> None
Parameters
| Name | Type | Description |
|---|---|---|
cmds |
— |
_run_tick
1
▼
Schedule a single @@TICK after *interval* seconds.
_run_tick
1
▼
def _run_tick(self, interval: float) -> None
Parameters
| Name | Type | Description |
|---|---|---|
interval |
— |
Functions
_handle_take_every
3
EffectResult
▼
Block until cancelled, forking a new saga for every matching action.
_handle_take_every
3
EffectResult
▼
def _handle_take_every(effect: TakeEvery, context: SagaContext, store: Store) -> EffectResult
Parameters
| Name | Type | Description |
|---|---|---|
effect |
TakeEvery |
|
context |
SagaContext |
|
store |
Store |
Returns
EffectResult
_handle_take_latest
3
EffectResult
▼
Block until cancelled, forking a saga for the latest matching action only.
_handle_take_latest
3
EffectResult
▼
def _handle_take_latest(effect: TakeLatest, context: SagaContext, store: Store) -> EffectResult
Parameters
| Name | Type | Description |
|---|---|---|
effect |
TakeLatest |
|
context |
SagaContext |
|
store |
Store |
Returns
EffectResult
_cleanup_take_waiter
3
None
▼
Remove an unconsumed Take waiter from the store.
_cleanup_take_waiter
3
None
▼
def _cleanup_take_waiter(store: Store, action_type: str, waiter_event: threading.Event) -> None
Parameters
| Name | Type | Description |
|---|---|---|
store |
Store |
|
action_type |
str |
|
waiter_event |
threading.Event |
combine_reducers
1
Callable
▼
Combine multiple reducers into one that manages a dict state.
Each reducer man…
combine_reducers
1
Callable
▼
def combine_reducers(**reducers: Callable) -> Callable
Combine multiple reducers into one that manages a dict state.
Each reducer manages a slice of state under its keyword name. Sagas, cmds, and view state from ReducerResult and Quit are collected and propagated.
Parameters
| Name | Type | Description |
|---|---|---|
**reducers |
Callable |
Returns
Callable
_merge_view
2
ViewState
▼
Merge two ViewStates: explicitly-set fields in *new* override *prev*.
_merge_view
2
ViewState
▼
def _merge_view(prev: ViewState | None, new: ViewState) -> ViewState
Parameters
| Name | Type | Description |
|---|---|---|
prev |
ViewState | None |
|
new |
ViewState |
Returns
ViewState
_execute_retry
7
Any
▼
Execute a function with retry and backoff.
_execute_retry
7
Any
▼
def _execute_retry(fn: Any, args: tuple, kwargs: dict, max_attempts: int, backoff: str, base_delay: float, max_delay: float) -> Any
Parameters
| Name | Type | Description |
|---|---|---|
fn |
Any |
|
args |
tuple |
|
kwargs |
dict |
|
max_attempts |
int |
|
backoff |
str |
|
base_delay |
float |
|
max_delay |
float |
Returns
Any