Pipeline Orchestration

Declarative build pipelines with phases, dependency graphs, retry policies, output capture, and Store integration.

4 min read 773 words

Milo's pipeline system orchestrates multi-phase workflows through the Store/saga architecture. Each phase is a function with declared dependencies, and the pipeline resolves execution order, runs parallel phases concurrently, and tracks progress through observable state.

Defining a pipeline

from milo import Pipeline, Phase

pipeline = Pipeline(
    "build",
    Phase("discover", handler=discover),
    Phase("parse", handler=parse, depends_on=("discover",)),
    Phase("render", handler=render, depends_on=("parse",), parallel=True),
    Phase("assets", handler=copy_assets, depends_on=("parse",), parallel=True),
    Phase("write", handler=write, depends_on=("render", "assets")),
)

Each Phasehas:

Field Purpose
name Unique phase identifier
handler Callable that does the work
depends_on Tuple of phase names that must complete first
parallel IfTrue, can run concurrently with other parallel phases
weight Progress weight (default: 1)
description Human-readable description
policy PhasePolicycontrolling failure behavior (default: stop on failure)
max_logs Max captured log lines per phase (default: 200)

Dependency resolution

The pipeline resolves phases in topological order. Given the example above:

discover → parse → [render, assets] → write

render and assets both depend on parse and are marked parallel=True, so they execute concurrently via Fork. writewaits for both to complete.

pipeline.execution_order()
# ["discover", "parse", "assets", "render", "write"]

Extending pipelines

Use the>>operator to append phases:

pipeline = pipeline >> Phase(
    "health",
    handler=check_links,
    depends_on=("write",),
    description="Verify internal links",
)

This returns a new Pipeline— the original is unchanged.

Store integration

The pipeline generates a reducer and saga that work with the Store for observable, testable execution.

Generated reducer

reducer = pipeline.build_reducer()
store = Store(reducer, initial_state=None)

The reducer handles these action types:

Action Effect
@@PIPELINE_START Sets status to"running"
@@PHASE_START Marks a phase as"running"
@@PHASE_COMPLETE Marks a phase as"completed", updates progress
@@PHASE_FAILED Marks a phase as"failed", sets pipeline status to "failed"
@@PIPELINE_COMPLETE Sets status to"completed", progress to 1.0

Generated saga

saga = pipeline.build_saga()

The saga walks the dependency graph, yielding Put actions for state transitions, Call effects for phase handlers, and Fork effects for parallel phases. Wire it into the Store with a ReducerResultor run it through the saga runner directly.

Observable state

PipelineStategives you a real-time view of pipeline progress:

state: PipelineState = store.get_state()
state.status        # "running" | "completed" | "failed" | "pending"
state.progress      # 0.0 to 1.0 based on phase weights
state.current_phase # name of the currently executing phase
state.elapsed       # total elapsed time
state.phases        # tuple of PhaseStatus objects

Each PhaseStatustracks:

phase.name       # "render"
phase.status     # "pending" | "running" | "completed" | "failed" | "skipped"
phase.started_at # monotonic timestamp
phase.elapsed    # seconds
phase.error      # error message if failed

Failure policies

Control what happens when a phase raises an exception withPhasePolicy:

from milo import Phase, PhasePolicy

Phase(
    "deploy",
    handler=deploy,
    policy=PhasePolicy(on_fail="retry", max_retries=3, retry_backoff="exponential"),
)
Field Default Description
on_fail "stop" "stop" halts the pipeline, "skip" continues, "retry"retries
max_retries 0 Number of retry attempts (only whenon_fail="retry")
retry_delay 1.0 Initial delay in seconds between retries
retry_backoff "fixed" "fixed" or "exponential"(capped at 30s)

The pipeline reducer tracks"retrying" as a distinct phase status alongside "pending", "running", "completed", "failed", and "skipped".

Output capture

Enablecapture_output=Trueto collect stdout/stderr from each phase handler:

pipeline = Pipeline("build", *phases, capture_output=True)

Captured output is dispatched as @@PHASE_LOG actions with {"name", "line", "stream", "timestamp"} payloads. The pipeline reducer stores logs on each PhaseStatus:

phase = state.phases[0]
for log in phase.logs:
    print(f"[{log.stream}] {log.line}")

Detail TUI

For interactive pipeline visualization, wrap the pipeline reducer withmake_detail_reducer:

from milo import App, Store
from milo.pipeline import make_detail_reducer

reducer = make_detail_reducer(pipeline.build_reducer())
app = App(template="pipeline.kida", reducer=reducer, initial_state=None)

The detail reducer adds keyboard navigation over PipelineViewState:

Key Action
/ Select phase
Enter Expand/collapse phase logs
g / G Scroll to top/bottom of logs
f Toggle auto-follow
q Quit

MCP timeline resource

When a pipeline is active, themilo://pipeline/timelineresource exposes the execution timeline as JSON. AI agents can read this to monitor pipeline progress in real time.

Tip

Subscribe a listener to the Store to render a live progress display as phases complete. The weighted progress calculation gives accurate estimates even when phases have different durations.