Module

_types

Frozen dataclasses, enums, and type aliases — no internal imports.

Classes

SpecialKey 0
Key 5
Single keypress.

Single keypress.

Attributes

Name Type Description
char str
name SpecialKey | None
ctrl bool
alt bool
shift bool
Action 2
Event dispatched to a reducer.

Event dispatched to a reducer.

Attributes

Name Type Description
type str
payload Any
AppStatus 0
RenderTarget 0
Screen 3
Named screen config: template name + reducer reference.

Named screen config: template name + reducer reference.

Attributes

Name Type Description
name str
template str
reducer Callable
Transition 3
Flow edge between screens.

Flow edge between screens.

Attributes

Name Type Description
from_screen str
to_screen str
on_action str
FieldType 0
FieldSpec 7
Declarative field configuration.

Declarative field configuration.

Attributes

Name Type Description
name str
label str
field_type FieldType
choices tuple[str, ...]
default Any
validator Callable | None
placeholder str
FieldState 5
Runtime state for a single field.

Runtime state for a single field.

Attributes

Name Type Description
value Any
cursor int
error str
focused bool
selected_index int
FormState 4
Full form state.

Full form state.

Attributes

Name Type Description
fields tuple[FieldState, ...]
specs tuple[FieldSpec, ...]
active_index int
submitted bool
Call 3
Call a function, resume saga with its return value.

Call a function, resume saga with its return value.

Attributes

Name Type Description
fn Callable
args tuple
kwargs dict
Put 1
Dispatch an action back to the store.

Dispatch an action back to the store.

Attributes

Name Type Description
action Action
Select 1
Read current state, resume saga with it.

Read current state, resume saga with it.

Attributes

Name Type Description
selector Callable | None
Fork 2
Run another saga concurrently. By default forks are *detached*: the child gets its own cancellatio…

Run another saga concurrently.

By default forks are detached: the child gets its own cancellation scope and is not cancelled when the parent is. Setattached=True to inherit the parent's cancellation scope — cancelling the parent will transitively cancel the child.

Attributes

Name Type Description
saga Callable | Generator
attached bool
Delay 1
Sleep for N seconds.

Sleep for N seconds.

Attributes

Name Type Description
seconds float
Retry 7
Call a function with retry and backoff on failure. Usage in a saga:: result = yield Retry(fet…

Call a function with retry and backoff on failure.

Usage in a saga::

result = yield Retry(fetch_data, args=(url,), max_attempts=3, backoff="exponential")

Attributes

Name Type Description
fn Callable
args tuple
kwargs dict
max_attempts int
backoff str
base_delay float
max_delay float
Timeout 2
Wrap a blocking effect with a deadline. Usage in a saga:: result = yield Timeout(Call(fetch_d…

Wrap a blocking effect with a deadline.

Usage in a saga::

result = yield Timeout(Call(fetch_data, args=(url,)), seconds=5)

RaisesTimeoutErrorif the effect doesn't complete in time. Only wraps blocking effects (Call and Retry).

Attributes

Name Type Description
effect Call | Retry
seconds float
TryCall 3
Call a function, returning (result, None) on success or (None, error) on failure. Unlike ``Call``,…

Call a function, returning (result, None) on success or (None, error) on failure.

UnlikeCall, exceptions do not crash the saga::

result, error = yield TryCall(fn=might_fail)
if error:
    yield Put(Action("FETCH_FAILED", payload=str(error)))
else:
    yield Put(Action("FETCH_OK", payload=result))

Attributes

Name Type Description
fn Callable
args tuple
kwargs dict
Race 1
Run multiple sagas concurrently, return the first result. Losers are cancelled via their cancel ev…

Run multiple sagas concurrently, return the first result.

Losers are cancelled via their cancel events as soon as a winner completes. If all racers fail, the first error is thrown into the parent saga::

winner = yield Race(sagas=(fetch_primary(), fetch_fallback()))

RaisesStateErrorif sagas is empty.

Attributes

Name Type Description
sagas tuple
All 1
Run multiple sagas concurrently, wait for all to complete. Returns a tuple of results in the same …

Run multiple sagas concurrently, wait for all to complete.

Returns a tuple of results in the same order as the input sagas. Fail-fast: if any saga raises, remaining sagas are cancelled and the error is thrown into the parent::

a, b = yield All(sagas=(fetch_users(), fetch_roles()))

An empty tuple returns()immediately.

Attributes

Name Type Description
sagas tuple
Take 3
Pause the saga until a matching action is dispatched. Waits for *future* actions only — actions di…

Pause the saga until a matching action is dispatched.

Waits for future actions only — actions dispatched before the Take is yielded are not matched. Returns the fullAction object so the saga can inspect both type and payload::

action = yield Take("USER_CONFIRMED")
name = action.payload["name"]

An optional timeout (in seconds) raisesTimeoutErrorif the action is not dispatched in time.

Attributes

Name Type Description
action_type str
timeout float | None

Methods

Internal Methods 1
__post_init__ 0
def __post_init__(self) -> None
Debounce 2
Delay-then-fork: start a timer, fork *saga* when it expires. If the parent saga yields another ``D…

Delay-then-fork: start a timer, fork saga when it expires.

If the parent saga yields anotherDebouncebefore the timer fires, the previous timer is cancelled and restarted. The parent continues immediately (non-blocking)::

# In a keystroke handler saga:
while True:
    key = yield Take("@@KEY")
    yield Debounce(seconds=0.3, saga=search_saga)

The debounced saga runs independently; useTakeif the parent needs the result.

Attributes

Name Type Description
seconds float
saga Callable
TakeEvery 2
Fork a new saga for every matching action (auto-restart pattern). Blocks the parent saga until can…

Fork a new saga for every matching action (auto-restart pattern).

Blocks the parent saga until cancelled. For each dispatched action whose type matches action_type, a new saga is forked with the action as argument::

# Fork a handler for every CLICK action:
yield TakeEvery("CLICK", handle_click)

def handle_click(action):
    url = action.payload["url"]
    result = yield Call(fetch, args=(url,))
    yield Put(Action("FETCHED", payload=result))

The watcher loop runs until the parent saga is cancelled.

Attributes

Name Type Description
action_type str
saga Callable
TakeLatest 2
Fork a saga for the latest matching action, cancelling the previous. Like `TakeEvery` but only the…

Fork a saga for the latest matching action, cancelling the previous.

LikeTakeEverybut only the most recent saga runs — when a new matching action arrives, the previous fork is cancelled before the new one starts::

# Only the latest search runs:
yield TakeLatest("SEARCH", run_search)

Useful for typeahead/autocomplete patterns where earlier results are obsolete.

Attributes

Name Type Description
action_type str
saga Callable
Cmd 1
Lightweight command: a thunk that returns an Action (or None). Simpler than sagas for one-shot eff…

Lightweight command: a thunk that returns an Action (or None).

Simpler than sagas for one-shot effects::

def fetch_cmd():
    data = fetch_json(url)
    return Action("FETCH_DONE", payload=data)

return ReducerResult(state, cmds=(Cmd(fetch_cmd),))

Attributes

Name Type Description
fn Callable
Batch 1
Run commands concurrently with no ordering guarantees. Usage:: return ReducerResult(state, cm…

Run commands concurrently with no ordering guarantees.

Usage::

return ReducerResult(state, cmds=(Batch(cmd_a, cmd_b, cmd_c),))

Attributes

Name Type Description
cmds tuple[Cmd | Batch | Sequence, ...]
Sequence 1
Run commands serially, in order. Each command's result is dispatched before the next starts:: …

Run commands serially, in order.

Each command's result is dispatched before the next starts::

return ReducerResult(state, cmds=(Sequence(cmd_a, cmd_b),))

Attributes

Name Type Description
cmds tuple[Cmd | Batch | Sequence, ...]
TickCmd 1
Schedule a single @@TICK after *interval* seconds. Return from a reducer to start ticking. Return …

Schedule a single @@TICK after interval seconds.

Return from a reducer to start ticking. Return another TickCmd when you receive @@TICK to keep the loop going; omit to stop::

case "@@TICK":
    if state.loading:
        return ReducerResult(new_state, cmds=(TickCmd(0.15),))
    return new_state

Attributes

Name Type Description
interval float
ViewState 4
Declarative terminal state returned alongside rendered content. The renderer diffs previous vs. cu…

Declarative terminal state returned alongside rendered content.

The renderer diffs previous vs. current ViewState and applies only the changes. Attach to ReducerResult to control terminal features from your reducer::

return ReducerResult(state, view=ViewState(cursor_visible=True))

Attributes

Name Type Description
alt_screen bool | None
cursor_visible bool | None
window_title str | None
mouse_mode bool | None
ReducerResult 4
Reducer can return this to trigger side effects.

Reducer can return this to trigger side effects.

Attributes

Name Type Description
state Any
sagas tuple[Callable, ...]
cmds tuple[Cmd | Batch | Sequence | TickCmd, ...]
view ViewState | None
Quit 5
Signal the app to exit. Return from a reducer to stop the event loop.

Signal the app to exit. Return from a reducer to stop the event loop.

Attributes

Name Type Description
state Any
code int
sagas tuple[Callable, ...]
cmds tuple[Cmd | Batch | Sequence | TickCmd, ...]
view ViewState | None

Functions

compact_cmds 1 tuple
Strip None entries and simplify command tuples. Returns an empty tuple for no …
def compact_cmds(*cmds: Cmd | Batch | Sequence | TickCmd | None) -> tuple

Strip None entries and simplify command tuples.

Returns an empty tuple for no commands, a single-element tuple for one, and the full tuple otherwise. Avoids unnecessary allocations for the common zero-or-one-command case.

Parameters
Name Type Description
*cmds Cmd | Batch | Sequence | TickCmd | None
Returns
tuple