Sagas

Generator-based side effects with Call, Put, Select, Fork, Delay, Race, All, Take, and more.

5 min read 1086 words

Sagas handle side effects in Milo — network requests, timers, file I/O, and anything else that isn't a pure state transformation. They're generator functions that yield effect descriptors, keeping your reducers pure.

How sagas work

flowchart LR R[Reducer] -->|ReducerResult| Runner[Saga Runner] Runner -->|ThreadPool| Saga[Saga Generator] Saga -->|"yield Call(fn)"| Runner Runner -->|result| Saga Saga -->|"yield Put(action)"| Store[Store]

A saga is a generator that yields effect objects. The saga runner interprets each effect, executes it, and sends the result back into the generator.

from milo import Call, Put, Select, Action

def fetch_data_saga():
    url = yield Select(lambda s: s["api_url"])
    data = yield Call(fetch_json, (url,))
    yield Put(Action("DATA_LOADED", payload=data))

Sagas run on a ThreadPoolExecutor, leveraging Python 3.14t free-threading for true parallelism.

Triggering sagas from reducers

Return aReducerResultto schedule sagas after a state transition:

from milo import ReducerResult

def reducer(state, action):
    if action.type == "FETCH_REQUESTED":
        return ReducerResult(
            {**state, "loading": True},
            sagas=(fetch_data_saga,),
        )
    if action.type == "DATA_LOADED":
        return {**state, "loading": False, "data": action.payload}
    return state

Note

The store dispatches the state change first, then schedules the sagas. This means your template will render theloading: Truestate before the saga begins executing.

Effect types

Execute a function and receive its return value:

result = yield Call(my_function, (arg1, arg2), {"key": "value"})

The saga runner calls my_function(arg1, arg2, key="value")on the thread pool and sends the return value back into the generator.

Dispatch an action back to the store:

yield Put(Action("TASK_COMPLETE", payload=result))

Read current state (or a slice of it):

full_state = yield Select()
url = yield Select(lambda s: s["config"]["api_url"])

Launch a concurrent child saga on the thread pool:

from milo import Fork

yield Fork(background_polling_saga)

Forked sagas run independently. They share the same store and can dispatch actions.

Sleep for a duration:

from milo import Delay

yield Delay(2.0)  # Wait 2 seconds

Call a function with automatic retry and backoff on failure:

from milo import Retry

result = yield Retry(fetch_data, args=(url,), max_attempts=3, backoff="exponential")

If fetch_data raises an exception, the saga runner retries up to max_attemptstimes with the chosen backoff strategy.

Parameter Default Description
fn (required) The function to call
args () Positional arguments
kwargs {} Keyword arguments
max_attempts 3 Total attempts before propagating the error
backoff "exponential" "exponential", "linear", or "fixed"
base_delay 1.0 Initial delay in seconds between retries
max_delay 30.0 Cap on delay between retries

Wrap a blocking effect with a deadline:

from milo import Timeout, Call

result = yield Timeout(Call(fetch_data, args=(url,)), seconds=5)

Raises TimeoutError if the effect doesn't complete in time. Only wraps blocking effects (Call and Retry).

Call a function, returning(result, None) on success or (None, error)on failure — exceptions don't crash the saga:

from milo import TryCall, Put, Action

result, error = yield TryCall(fn=might_fail)
if error:
    yield Put(Action("FETCH_FAILED", payload=str(error)))
else:
    yield Put(Action("FETCH_OK", payload=result))

Run multiple sagas concurrently, return the first result. Losers are cancelled:

from milo import Race

winner = yield Race(sagas=(fetch_primary(), fetch_fallback()))

If all racers fail, the first error is thrown into the parent saga.

Run multiple sagas concurrently, wait for all to complete:

from milo import All

users, roles = yield All(sagas=(fetch_users(), fetch_roles()))

Returns a tuple of results in the same order as the input sagas. Fail-fast: if any saga raises, remaining sagas are cancelled.

Pause the saga until a matching action is dispatched:

from milo import Take

action = yield Take("USER_CONFIRMED")
name = action.payload["name"]

Waits for future actions only — actions dispatched before the Take is yielded are not matched. An optional timeout (in seconds) raises TimeoutErrorif the action doesn't arrive in time:

action = yield Take("USER_CONFIRMED", timeout=10.0)

Delay-then-fork: start a timer, forksaga when it expires. If another Debounceis yielded before the timer fires, the previous timer is cancelled and restarted. The parent continues immediately (non-blocking):

from milo import Debounce, Take

# In a keystroke handler saga:
while True:
    key = yield Take("@@KEY")
    yield Debounce(seconds=0.3, saga=search_saga)

Watcher patterns

For recurring event handling, useTakeEvery or TakeLatest instead of manual Takeloops.

Fork a handler for every matching action. All handlers run concurrently:

from milo import TakeEvery

yield TakeEvery("CLICK", handle_click)

def handle_click(action):
    url = action.payload["url"]
    result = yield Call(fetch, args=(url,))
    yield Put(Action("FETCHED", payload=result))

Blocks the parent saga until cancelled. Use this when every event matters (e.g., logging, side effects per click).

LikeTakeEvery, but cancels the previous handler when a new action arrives:

from milo import TakeLatest

yield TakeLatest("SEARCH", run_search)

Use this for typeahead/autocomplete patterns where earlier results are obsolete.

Composing sagas

Delegate to other sagas sequentially:

def setup_saga():
    yield from fetch_config_saga()
    yield from fetch_user_saga()
    yield Put(Action("SETUP_COMPLETE"))

Run sagas in parallel on the thread pool:

def parallel_setup_saga():
    yield Fork(fetch_config_saga)
    yield Fork(fetch_user_saga)

Under Python 3.14t free-threading, forked sagas execute with true parallelism.

Tip

Keep sagas focused on coordination, not computation. If you need heavy processing, put it in a function andCallit — that way the saga remains readable and the function is independently testable.

Error recovery

If an unhandled exception occurs in a saga, Milo dispatches a@@SAGA_ERRORaction instead of swallowing the error silently. Your reducer can handle it gracefully:

def reducer(state, action):
    if action.type == "@@SAGA_ERROR":
        return {**state, "error": action.payload["error"]}
    return state

The payload contains {"error": "message", "type": "ExceptionTypeName"}.

Note

The store continues working after a saga error — other sagas and dispatches are unaffected. This matches Bubbletea's pattern of recovering from panics in command goroutines.

Sagas vs. Commands

For one-shot effects (fetch a URL, write a file, dispatch the result), consider using Commands instead. Commands are simpler — a plain function instead of a generator — and handle the dispatch-result pattern automatically.

Use sagas when you need multi-step coordination: reading state mid-effect, retrying with backoff, forking child tasks, or sequencing multiple dependent calls.