Milo's pipeline system orchestrates multi-phase workflows through the Store/saga architecture. Each phase is a function with declared dependencies, and the pipeline resolves execution order, runs parallel phases concurrently, and tracks progress through observable state.
Defining a pipeline
from milo import Pipeline, Phase
pipeline = Pipeline(
"build",
Phase("discover", handler=discover),
Phase("parse", handler=parse, depends_on=("discover",)),
Phase("render", handler=render, depends_on=("parse",), parallel=True),
Phase("assets", handler=copy_assets, depends_on=("parse",), parallel=True),
Phase("write", handler=write, depends_on=("render", "assets")),
)
Each Phasehas:
| Field | Purpose |
|---|---|
name |
Unique phase identifier |
handler |
Callable that does the work |
depends_on |
Tuple of phase names that must complete first |
parallel |
IfTrue, can run concurrently with other parallel phases |
weight |
Progress weight (default: 1) |
description |
Human-readable description |
Dependency resolution
The pipeline resolves phases in topological order. Given the example above:
discover → parse → [render, assets] → write
render and assets both depend on parse and are marked parallel=True, so they execute concurrently via Fork. writewaits for both to complete.
pipeline.execution_order()
# ["discover", "parse", "assets", "render", "write"]
Extending pipelines
Use the>>operator to append phases:
pipeline = pipeline >> Phase(
"health",
handler=check_links,
depends_on=("write",),
description="Verify internal links",
)
This returns a new Pipeline— the original is unchanged.
Store integration
The pipeline generates a reducer and saga that work with the Store for observable, testable execution.
Generated reducer
reducer = pipeline.build_reducer()
store = Store(reducer, initial_state=None)
The reducer handles these action types:
| Action | Effect |
|---|---|
@@PIPELINE_START |
Sets status to"running" |
@@PHASE_START |
Marks a phase as"running" |
@@PHASE_COMPLETE |
Marks a phase as"completed", updates progress |
@@PHASE_FAILED |
Marks a phase as"failed", sets pipeline status to "failed" |
@@PIPELINE_COMPLETE |
Sets status to"completed", progress to 1.0 |
Generated saga
saga = pipeline.build_saga()
The saga walks the dependency graph, yielding Put actions for state transitions, Call effects for phase handlers, and Fork effects for parallel phases. Wire it into the Store with a ReducerResultor run it through the saga runner directly.
Observable state
PipelineStategives you a real-time view of pipeline progress:
state: PipelineState = store.get_state()
state.status # "running" | "completed" | "failed" | "pending"
state.progress # 0.0 to 1.0 based on phase weights
state.current_phase # name of the currently executing phase
state.elapsed # total elapsed time
state.phases # tuple of PhaseStatus objects
Each PhaseStatustracks:
phase.name # "render"
phase.status # "pending" | "running" | "completed" | "failed" | "skipped"
phase.started_at # monotonic timestamp
phase.elapsed # seconds
phase.error # error message if failed
Tip
Subscribe a listener to the Store to render a live progress display as phases complete. The weighted progress calculation gives accurate estimates even when phases have different durations.