Pipeline Orchestration

Declarative build pipelines with phases, dependency graphs, and Store integration.

2 min read 481 words

Milo's pipeline system orchestrates multi-phase workflows through the Store/saga architecture. Each phase is a function with declared dependencies, and the pipeline resolves execution order, runs parallel phases concurrently, and tracks progress through observable state.

Defining a pipeline

from milo import Pipeline, Phase

pipeline = Pipeline(
    "build",
    Phase("discover", handler=discover),
    Phase("parse", handler=parse, depends_on=("discover",)),
    Phase("render", handler=render, depends_on=("parse",), parallel=True),
    Phase("assets", handler=copy_assets, depends_on=("parse",), parallel=True),
    Phase("write", handler=write, depends_on=("render", "assets")),
)

Each Phasehas:

Field Purpose
name Unique phase identifier
handler Callable that does the work
depends_on Tuple of phase names that must complete first
parallel IfTrue, can run concurrently with other parallel phases
weight Progress weight (default: 1)
description Human-readable description

Dependency resolution

The pipeline resolves phases in topological order. Given the example above:

discover → parse → [render, assets] → write

render and assets both depend on parse and are marked parallel=True, so they execute concurrently via Fork. writewaits for both to complete.

pipeline.execution_order()
# ["discover", "parse", "assets", "render", "write"]

Extending pipelines

Use the>>operator to append phases:

pipeline = pipeline >> Phase(
    "health",
    handler=check_links,
    depends_on=("write",),
    description="Verify internal links",
)

This returns a new Pipeline— the original is unchanged.

Store integration

The pipeline generates a reducer and saga that work with the Store for observable, testable execution.

Generated reducer

reducer = pipeline.build_reducer()
store = Store(reducer, initial_state=None)

The reducer handles these action types:

Action Effect
@@PIPELINE_START Sets status to"running"
@@PHASE_START Marks a phase as"running"
@@PHASE_COMPLETE Marks a phase as"completed", updates progress
@@PHASE_FAILED Marks a phase as"failed", sets pipeline status to "failed"
@@PIPELINE_COMPLETE Sets status to"completed", progress to 1.0

Generated saga

saga = pipeline.build_saga()

The saga walks the dependency graph, yielding Put actions for state transitions, Call effects for phase handlers, and Fork effects for parallel phases. Wire it into the Store with a ReducerResultor run it through the saga runner directly.

Observable state

PipelineStategives you a real-time view of pipeline progress:

state: PipelineState = store.get_state()
state.status        # "running" | "completed" | "failed" | "pending"
state.progress      # 0.0 to 1.0 based on phase weights
state.current_phase # name of the currently executing phase
state.elapsed       # total elapsed time
state.phases        # tuple of PhaseStatus objects

Each PhaseStatustracks:

phase.name       # "render"
phase.status     # "pending" | "running" | "completed" | "failed" | "skipped"
phase.started_at # monotonic timestamp
phase.elapsed    # seconds
phase.error      # error message if failed

Tip

Subscribe a listener to the Store to render a live progress display as phases complete. The weighted progress calculation gives accurate estimates even when phases have different durations.