Sagas handle side effects in Milo — network requests, timers, file I/O, and anything else that isn't a pure state transformation. They're generator functions that yield effect descriptors, keeping your reducers pure.
How sagas work
A saga is a generator that yields effect objects. The saga runner interprets each effect, executes it, and sends the result back into the generator.
from milo import Call, Put, Select, Action
def fetch_data_saga():
url = yield Select(lambda s: s["api_url"])
data = yield Call(fetch_json, (url,))
yield Put(Action("DATA_LOADED", payload=data))
Sagas run on a ThreadPoolExecutor, leveraging Python 3.14t free-threading for true parallelism.
Triggering sagas from reducers
Return aReducerResultto schedule sagas after a state transition:
from milo import ReducerResult
def reducer(state, action):
if action.type == "FETCH_REQUESTED":
return ReducerResult(
{**state, "loading": True},
sagas=(fetch_data_saga,),
)
if action.type == "DATA_LOADED":
return {**state, "loading": False, "data": action.payload}
return state
Note
The store dispatches the state change first, then schedules the sagas. This means your template will render theloading: Truestate before the saga begins executing.
Effect types
Execute a function and receive its return value:
result = yield Call(my_function, (arg1, arg2), {"key": "value"})
The saga runner calls my_function(arg1, arg2, key="value")on the thread pool and sends the return value back into the generator.
Dispatch an action back to the store:
yield Put(Action("TASK_COMPLETE", payload=result))
Read current state (or a slice of it):
full_state = yield Select()
url = yield Select(lambda s: s["config"]["api_url"])
Launch a concurrent child saga on the thread pool:
from milo import Fork
yield Fork(background_polling_saga)
Forked sagas run independently. They share the same store and can dispatch actions.
Sleep for a duration:
from milo import Delay
yield Delay(2.0) # Wait 2 seconds
Call a function with automatic retry and backoff on failure:
from milo import Retry
result = yield Retry(fetch_data, args=(url,), max_attempts=3, backoff="exponential")
If fetch_data raises an exception, the saga runner retries up to max_attemptstimes with the chosen backoff strategy.
| Parameter | Default | Description |
|---|---|---|
fn |
(required) | The function to call |
args |
() |
Positional arguments |
kwargs |
{} |
Keyword arguments |
max_attempts |
3 |
Total attempts before propagating the error |
backoff |
"exponential" |
"exponential", "linear", or "fixed" |
base_delay |
1.0 |
Initial delay in seconds between retries |
max_delay |
30.0 |
Cap on delay between retries |
Wrap a blocking effect with a deadline:
from milo import Timeout, Call
result = yield Timeout(Call(fetch_data, args=(url,)), seconds=5)
Raises TimeoutError if the effect doesn't complete in time. Only wraps blocking effects (Call and Retry).
Call a function, returning(result, None) on success or (None, error)on failure — exceptions don't crash the saga:
from milo import TryCall, Put, Action
result, error = yield TryCall(fn=might_fail)
if error:
yield Put(Action("FETCH_FAILED", payload=str(error)))
else:
yield Put(Action("FETCH_OK", payload=result))
Run multiple sagas concurrently, return the first result. Losers are cancelled:
from milo import Race
winner = yield Race(sagas=(fetch_primary(), fetch_fallback()))
If all racers fail, the first error is thrown into the parent saga.
Run multiple sagas concurrently, wait for all to complete:
from milo import All
users, roles = yield All(sagas=(fetch_users(), fetch_roles()))
Returns a tuple of results in the same order as the input sagas. Fail-fast: if any saga raises, remaining sagas are cancelled.
Pause the saga until a matching action is dispatched:
from milo import Take
action = yield Take("USER_CONFIRMED")
name = action.payload["name"]
Waits for future actions only — actions dispatched before the Take is yielded are not matched. An optional timeout (in seconds) raises TimeoutErrorif the action doesn't arrive in time:
action = yield Take("USER_CONFIRMED", timeout=10.0)
Delay-then-fork: start a timer, forksaga when it expires. If another Debounceis yielded before the timer fires, the previous timer is cancelled and restarted. The parent continues immediately (non-blocking):
from milo import Debounce, Take
# In a keystroke handler saga:
while True:
key = yield Take("@@KEY")
yield Debounce(seconds=0.3, saga=search_saga)
Watcher patterns
For recurring event handling, useTakeEvery or TakeLatest instead of manual Takeloops.
Fork a handler for every matching action. All handlers run concurrently:
from milo import TakeEvery
yield TakeEvery("CLICK", handle_click)
def handle_click(action):
url = action.payload["url"]
result = yield Call(fetch, args=(url,))
yield Put(Action("FETCHED", payload=result))
Blocks the parent saga until cancelled. Use this when every event matters (e.g., logging, side effects per click).
LikeTakeEvery, but cancels the previous handler when a new action arrives:
from milo import TakeLatest
yield TakeLatest("SEARCH", run_search)
Use this for typeahead/autocomplete patterns where earlier results are obsolete.
Composing sagas
Delegate to other sagas sequentially:
def setup_saga():
yield from fetch_config_saga()
yield from fetch_user_saga()
yield Put(Action("SETUP_COMPLETE"))
Run sagas in parallel on the thread pool:
def parallel_setup_saga():
yield Fork(fetch_config_saga)
yield Fork(fetch_user_saga)
Under Python 3.14t free-threading, forked sagas execute with true parallelism.
Tip
Keep sagas focused on coordination, not computation. If you need heavy processing, put it in a function andCallit — that way the saga remains readable and the function is independently testable.
Error recovery
If an unhandled exception occurs in a saga, Milo dispatches a@@SAGA_ERRORaction instead of swallowing the error silently. Your reducer can handle it gracefully:
def reducer(state, action):
if action.type == "@@SAGA_ERROR":
return {**state, "error": action.payload["error"]}
return state
The payload contains {"error": "message", "type": "ExceptionTypeName"}.
Note
The store continues working after a saga error — other sagas and dispatches are unaffected. This matches Bubbletea's pattern of recovering from panics in command goroutines.
Sagas vs. Commands
For one-shot effects (fetch a URL, write a file, dispatch the result), consider using Commands instead. Commands are simpler — a plain function instead of a generator — and handle the dispatch-result pattern automatically.
Use sagas when you need multi-step coordination: reading state mid-effect, retrying with backoff, forking child tasks, or sequencing multiple dependent calls.