# Pipeline Orchestration URL: /docs/usage/pipeline/ Section: usage Tags: pipeline, phases, dependencies, build -------------------------------------------------------------------------------- Milo's pipeline system orchestrates multi-phase workflows through the Store/saga architecture. Each phase is a function with declared dependencies, and the pipeline resolves execution order, runs parallel phases concurrently, and tracks progress through observable state. Defining a pipeline from milo import Pipeline, Phase pipeline = Pipeline( "build", Phase("discover", handler=discover), Phase("parse", handler=parse, depends_on=("discover",)), Phase("render", handler=render, depends_on=("parse",), parallel=True), Phase("assets", handler=copy_assets, depends_on=("parse",), parallel=True), Phase("write", handler=write, depends_on=("render", "assets")), ) Each Phase has: Field Purpose name Unique phase identifier handler Callable that does the work depends_on Tuple of phase names that must complete first parallel If True, can run concurrently with other parallel phases weight Progress weight (default: 1) description Human-readable description Dependency resolution The pipeline resolves phases in topological order. Given the example above: discover → parse → [render, assets] → write render and assets both depend on parse and are marked parallel=True, so they execute concurrently via Fork. write waits for both to complete. pipeline.execution_order() # ["discover", "parse", "assets", "render", "write"] Extending pipelines Use the >> operator to append phases: pipeline = pipeline >> Phase( "health", handler=check_links, depends_on=("write",), description="Verify internal links", ) This returns a new Pipeline — the original is unchanged. Store integration The pipeline generates a reducer and saga that work with the Store for observable, testable execution. Generated reducer reducer = pipeline.build_reducer() store = Store(reducer, initial_state=None) The reducer handles these action types: Action Effect @@PIPELINE_START Sets status to "running" @@PHASE_START Marks a phase as "running" @@PHASE_COMPLETE Marks a phase as "completed", updates progress @@PHASE_FAILED Marks a phase as "failed", sets pipeline status to "failed" @@PIPELINE_COMPLETE Sets status to "completed", progress to 1.0 Generated saga saga = pipeline.build_saga() The saga walks the dependency graph, yielding Put actions for state transitions, Call effects for phase handlers, and Fork effects for parallel phases. Wire it into the Store with a ReducerResult or run it through the saga runner directly. Observable state PipelineState gives you a real-time view of pipeline progress: state: PipelineState = store.get_state() state.status # "running" | "completed" | "failed" | "pending" state.progress # 0.0 to 1.0 based on phase weights state.current_phase # name of the currently executing phase state.elapsed # total elapsed time state.phases # tuple of PhaseStatus objects Each PhaseStatus tracks: phase.name # "render" phase.status # "pending" | "running" | "completed" | "failed" | "skipped" phase.started_at # monotonic timestamp phase.elapsed # seconds phase.error # error message if failed Tip Tip Subscribe a listener to the Store to render a live progress display as phases complete. The weighted progress calculation gives accurate estimates even when phases have different durations. -------------------------------------------------------------------------------- Metadata: - Author: lbliii - Word Count: 417 - Reading Time: 2 minutes