# state

URL: /milo-cli/api/milo/state/
Section: milo
Description: Store, dispatch, saga runner, combine_reducers.

---

> For a complete page index, fetch /milo-cli/llms.txt.

Open LLM text
(/milo-cli/api/milo/state/index.txt)

Share with AI

Ask Claude
(https://claude.ai/new?q=Please%20help%20me%20understand%20this%20documentation%3A%20%2Fmilo-cli%2Fapi%2Fmilo%2Fstate%2Findex.txt)

Ask ChatGPT
(https://chatgpt.com/?q=Please%20help%20me%20understand%20this%20documentation%3A%20%2Fmilo-cli%2Fapi%2Fmilo%2Fstate%2Findex.txt)

Ask Gemini
(https://gemini.google.com/app?q=Please%20help%20me%20understand%20this%20documentation%3A%20%2Fmilo-cli%2Fapi%2Fmilo%2Fstate%2Findex.txt)

Ask Copilot
(https://copilot.microsoft.com/?q=Please%20help%20me%20understand%20this%20documentation%3A%20%2Fmilo-cli%2Fapi%2Fmilo%2Fstate%2Findex.txt)

Module

#
`state`

Store, dispatch, saga runner, combine_reducers.

3Classes6Functions

## Classes

`EffectResult`

5

▼

What an effect handler returns to tell the saga runner how to advance the generator.

What an effect handler returns to tell the saga runner how to advance the generator.

#### Methods

`send`

1

`EffectResult`

▼

Resume the saga with ``saga.send(value)``.

classmethod

`def send(cls, value: Any) -> EffectResult`

##### Parameters

Name
Type
Description

`value`
`—`

##### Returns

`EffectResult`

`next`

0

`EffectResult`

▼

Advance the saga with ``next(saga)``.

classmethod

`def next(cls) -> EffectResult`

##### Returns

`EffectResult`

`throw`

1

`EffectResult`

▼

Throw an exception into the saga with ``saga.throw(error)``.

classmethod

`def throw(cls, error: Exception) -> EffectResult`

##### Parameters

Name
Type
Description

`error`
`—`

##### Returns

`EffectResult`

`cont`

0

`EffectResult`

▼

Loop back to the cancellation check at the top of the runner.

classmethod

`def cont(cls) -> EffectResult`

##### Returns

`EffectResult`

Internal Methods
1

▼

`__init__`

3

▼

`def __init__(self, action: str, value: Any = None, error: Exception | None = None) -> None`

##### Parameters

Name
Type
Description

`action`
`—`

`value`
`—`

Default:`None`

`error`
`—`

Default:`None`

`SagaContext`

5

▼

Runtime context for a running saga — threading identity + cancellation scope.

Provides structured …

Runtime context for a running saga — threading identity + cancellation scope.

Provides structured cancellation: when a parent context is cancelled,
all child contexts are cancelled transitively via`cancel_tree`().

This is mutable runtime state (not Store state), so it uses a regular
class with`__slots__`rather than a frozen dataclass.

#### Methods

`is_cancelled`

0

`bool`

▼

True if this context's cancel event has been set.

property

`def is_cancelled(self) -> bool`

##### Returns

`bool`

`cancel_tree`

0

▼

Cancel this context and all descendants transitively.

`def cancel_tree(self) -> None`

`child`

1

`SagaContext`

▼

Create a child context that inherits this cancel scope.

`def child(self, saga_id: str | None = None) -> SagaContext`

##### Parameters

Name
Type
Description

`saga_id`
`—`

Default:`None`

##### Returns

`SagaContext`

`detached_child`

1

`SagaContext`

▼

Create a child with its own independent cancel scope.

`def detached_child(self, saga_id: str | None = None) -> SagaContext`

##### Parameters

Name
Type
Description

`saga_id`
`—`

Default:`None`

##### Returns

`SagaContext`

Internal Methods
1

▼

`__init__`

3

▼

`def __init__(self, saga_id: str | None = None, cancel: threading.Event | None = None, parent: SagaContext | None = None) -> None`

##### Parameters

Name
Type
Description

`saga_id`
`—`

Default:`None`

`cancel`
`—`

Default:`None`

`parent`
`—`

Default:`None`

`Store`

24

▼

Centralized state container with saga support.

Thread-safety: reads are lock-free (frozen state).
…

Centralized state container with saga support.

Thread-safety: reads are lock-free (frozen state).
Dispatch serializes through a lock.
Sagas run on a ThreadPoolExecutor.

`max_workers` defaults to `None`, which auto-sizes via
`kida.get_optimal_workers`under the IO_BOUND profile — returning the
OS-aware pool size (capped at 8). Sagas execute side-effect code (Call,
Take, Retry) which is I/O-shaped, not CPU-bound rendering. Pass an
explicit integer to override when you know the workload (e.g.
`max_workers=N`for benchmarks that fire N concurrent blocking sagas).

#### Methods

`state`

0

`Any`

▼

property

`def state(self) -> Any`

##### Returns

`Any`

`pool_active`

0

`int`

▼

Number of currently active tasks in the thread pool.

property

`def pool_active(self) -> int`

##### Returns

`int`

`view_state`

0

`Any`

▼

Latest ViewState from a reducer, or None.

property

`def view_state(self) -> Any`

##### Returns

`Any`

`quit_requested`

0

`bool`

▼

True if a reducer returned Quit.

property

`def quit_requested(self) -> bool`

##### Returns

`bool`

`exit_code`

0

`int`

▼

Exit code from the Quit signal (default 0).

property

`def exit_code(self) -> int`

##### Returns

`int`

`recording`

0

`list[dict] | None`

▼

Get session recording if enabled.

property

`def recording(self) -> list[dict] | None`

##### Returns

`list[dict] | None`

`dispatch`

1

▼

Dispatch action through middleware -> reducer.

`def dispatch(self, action: Action) -> None`

##### Parameters

Name
Type
Description

`action`
`—`

`run_saga`

3

`SagaContext`

▼

Schedule a saga on the thread pool.

`def run_saga(self, saga: Any, cancel: threading.Event | None = None, context: SagaContext | None = None) -> SagaContext`

##### Parameters

Name
Type
Description

`saga`
`—`

Generator saga to execute.

`cancel`
`—`

Optional cancellation event (legacy). Prefer context.

Default:`None`

`context`
`—`

Optional :class:`SagaContext`for structured cancellation. If neither cancel nor context is provided, a fresh context is created automatically.

Default:`None`

##### Returns

`SagaContext`

The :class:`SagaContext` assigned to this saga (useful for
cancellation and debugging).

`subscribe`

1

`Callable[[], None]`

▼

Register state-change listener. Returns unsubscribe callable.

`def subscribe(self, listener: Callable) -> Callable[[], None]`

##### Parameters

Name
Type
Description

`listener`
`—`

##### Returns

`Callable[[], None]`

`shutdown`

0

▼

Cancel root sagas and shut down the thread pool.

`def shutdown(self) -> None`

Internal Methods
14

▼

`__init__`

7

▼

`def __init__(self, reducer: Callable, initial_state: Any, middleware: tuple[Callable, ...] = (), *, record: bool | str | Path = False, max_workers: int | None = None, on_pool_pressure: Callable[[int, int], None] | None = None, pool_pressure_threshold: float = 0.8) -> None`

##### Parameters

Name
Type
Description

`reducer`
`—`

`initial_state`
`—`

`middleware`
`—`

Default:`()`

`record`
`—`

Default:`False`

`max_workers`
`—`

Default:`None`

`on_pool_pressure`
`—`

Default:`None`

`pool_pressure_threshold`
`—`

Default:`0.8`

`_tracked_submit`

2

▼

Submit work to the pool, tracking active tasks for pressure detection.

`def _tracked_submit(self, fn, *args)`

##### Parameters

Name
Type
Description

`fn`
`—`

`*args`
`—`

`_base_dispatch`

1

▼

Core dispatch: reducer + saga scheduling + cmd execution + recording.

`def _base_dispatch(self, action: Action) -> None`

##### Parameters

Name
Type
Description

`action`
`—`

`_run_saga`

2

▼

Step through a generator saga, executing effects via handler registry.

Catches…

`def _run_saga(self, saga: Any, context: SagaContext | None = None) -> None`

Step through a generator saga, executing effects via handler registry.

Catches unhandled exceptions and dispatches @@SAGA_ERROR so the
reducer can react gracefully. The error is never swallowed silently.

##### Parameters

Name
Type
Description

`saga`
`—`

`context`
`—`

Default:`None`

`_execute_timeout`

2

`Any`

▼

Execute a blocking effect with a timeout deadline.

Uses a dedicated thread (no…

`def _execute_timeout(self, effect: Call | Retry, seconds: float) -> Any`

Execute a blocking effect with a timeout deadline.

Uses a dedicated thread (not the shared pool) to avoid deadlock
when the saga itself is already running on the pool.

##### Parameters

Name
Type
Description

`effect`
`—`

`seconds`
`—`

##### Returns

`Any`

`_execute_effect`

1

`Any`

▼

Execute a single blocking effect and return its result.

staticmethod

`def _execute_effect(effect: Call | Retry) -> Any`

##### Parameters

Name
Type
Description

`effect`
`—`

##### Returns

`Any`

`_run_saga_capturing`

5

▼

Step through a saga via ``_run_saga``, capturing the return value.

Wraps *saga…

`def _run_saga_capturing(self, saga: Any, context: SagaContext, result_box: list, error_box: list, done: threading.Event) -> None`

Step through a saga via`_run_saga`, capturing the return value.

Wraps saga in a thin`yield from`generator so that
`_run_saga`handles all effect types (including nested
Race/All/Take/Debounce). On success the return value is
appended to result_box; on error the exception goes into
error_box. done is set in all cases.

##### Parameters

Name
Type
Description

`saga`
`—`

`context`
`—`

`result_box`
`—`

`error_box`
`—`

`done`
`—`

`_execute_race`

2

`Any`

▼

Run sagas concurrently, return the first result. Cancel losers.

`def _execute_race(self, child_sagas: tuple, parent_context: SagaContext) -> Any`

##### Parameters

Name
Type
Description

`child_sagas`
`—`

`parent_context`
`—`

##### Returns

`Any`

`_execute_all`

2

`tuple`

▼

Run sagas concurrently, wait for all. Fail-fast on first error.

`def _execute_all(self, child_sagas: tuple, parent_context: SagaContext) -> tuple`

##### Parameters

Name
Type
Description

`child_sagas`
`—`

`parent_context`
`—`

##### Returns

`tuple`

`_exec_cmd`

1

▼

Execute a Cmd, Batch, Sequence, or TickCmd.

`def _exec_cmd(self, cmd: Any) -> None`

##### Parameters

Name
Type
Description

`cmd`
`—`

`_run_cmd`

1

▼

Run a single Cmd thunk and dispatch its result.

`def _run_cmd(self, fn: Any) -> None`

##### Parameters

Name
Type
Description

`fn`
`—`

`_submit_batch`

1

▼

Submit batch commands with a single pressure check.

Performs one pressure chec…

`def _submit_batch(self, cmds: tuple) -> None`

Submit batch commands with a single pressure check.

Performs one pressure check and bulk-increments the task counter,
then submits each Cmd directly to the executor. This avoids
per-Cmd pressure checks and lock acquisitions.

##### Parameters

Name
Type
Description

`cmds`
`—`

`_run_sequence`

1

▼

Run commands serially, dispatching each result before the next.

`def _run_sequence(self, cmds: tuple) -> None`

##### Parameters

Name
Type
Description

`cmds`
`—`

`_run_tick`

1

▼

Schedule a single @@TICK after *interval* seconds.

`def _run_tick(self, interval: float) -> None`

##### Parameters

Name
Type
Description

`interval`
`—`

## Functions

`_handle_take_every`

3

`EffectResult`

▼

Block until cancelled, forking a new saga for every matching action.

`def _handle_take_every(effect: TakeEvery, context: SagaContext, store: Store) -> EffectResult`

##### Parameters

Name
Type
Description

`effect`
`TakeEvery`

`context`
`SagaContext`

`store`
`Store`

##### Returns

`EffectResult`

`_handle_take_latest`

3

`EffectResult`

▼

Block until cancelled, forking a saga for the latest matching action only.

`def _handle_take_latest(effect: TakeLatest, context: SagaContext, store: Store) -> EffectResult`

##### Parameters

Name
Type
Description

`effect`
`TakeLatest`

`context`
`SagaContext`

`store`
`Store`

##### Returns

`EffectResult`

`_cleanup_take_waiter`

3

`None`

▼

Remove an unconsumed Take waiter from the store.

`def _cleanup_take_waiter(store: Store, action_type: str, waiter_event: threading.Event) -> None`

##### Parameters

Name
Type
Description

`store`
`Store`

`action_type`
`str`

`waiter_event`
`threading.Event`

`combine_reducers`

1

`Callable`

▼

Combine multiple reducers into one that manages a dict state.

Each reducer man…

`def combine_reducers(**reducers: Callable) -> Callable`

Combine multiple reducers into one that manages a dict state.

Each reducer manages a slice of state under its keyword name.
Sagas, cmds, and view state from ReducerResult and Quit are collected
and propagated.

##### Parameters

Name
Type
Description

`**reducers`
`Callable`

##### Returns

`Callable`

`_merge_view`

2

`ViewState`

▼

Merge two ViewStates: explicitly-set fields in *new* override *prev*.

`def _merge_view(prev: ViewState | None, new: ViewState) -> ViewState`

##### Parameters

Name
Type
Description

`prev`
`ViewState | None`

`new`
`ViewState`

##### Returns

`ViewState`

`_execute_retry`

7

`Any`

▼

Execute a function with retry and backoff.

`def _execute_retry(fn: Any, args: tuple, kwargs: dict, max_attempts: int, backoff: str, base_delay: float, max_delay: float) -> Any`

##### Parameters

Name
Type
Description

`fn`
`Any`

`args`
`tuple`

`kwargs`
`dict`

`max_attempts`
`int`

`backoff`
`str`

`base_delay`
`float`

`max_delay`
`float`

##### Returns

`Any`
