# pipeline

URL: /milo-cli/api/milo/pipeline/
Section: milo
Description: Pipeline orchestration with observable state through the Store/saga system.

---

> For a complete page index, fetch /milo-cli/llms.txt.

Open LLM text
(/milo-cli/api/milo/pipeline/index.txt)

Share with AI

Ask Claude
(https://claude.ai/new?q=Please%20help%20me%20understand%20this%20documentation%3A%20%2Fmilo-cli%2Fapi%2Fmilo%2Fpipeline%2Findex.txt)

Ask ChatGPT
(https://chatgpt.com/?q=Please%20help%20me%20understand%20this%20documentation%3A%20%2Fmilo-cli%2Fapi%2Fmilo%2Fpipeline%2Findex.txt)

Ask Gemini
(https://gemini.google.com/app?q=Please%20help%20me%20understand%20this%20documentation%3A%20%2Fmilo-cli%2Fapi%2Fmilo%2Fpipeline%2Findex.txt)

Ask Copilot
(https://copilot.microsoft.com/?q=Please%20help%20me%20understand%20this%20documentation%3A%20%2Fmilo-cli%2Fapi%2Fmilo%2Fpipeline%2Findex.txt)

Module

#
`pipeline`

Pipeline orchestration with observable state through the Store/saga system.

9Classes15Functions

## Classes

`PhasePolicy`

5

▼

Failure policy for a pipeline phase.

Controls what happens when a phase's handler raises an except…

Failure policy for a pipeline phase.

Controls what happens when a phase's handler raises an exception.
Default behavior (`on_fail="stop"`) matches the original fail-fast semantics.

#### Attributes

Name
Type
Description

`on_fail`

`str`

—

`max_retries`

`int`

—

`retry_delay`

`float`

—

`retry_backoff`

`str`

—

#### Methods

Internal Methods
1

▼

`__post_init__`

0

▼

`def __post_init__(self) -> None`

`PhaseLog`

3

▼

A single captured output line from a phase handler.

A single captured output line from a phase handler.

#### Attributes

Name
Type
Description

`line`

`str`

—

`stream`

`str`

—

`timestamp`

`float`

—

`Phase`

8

▼

A named pipeline phase.

A named pipeline phase.

#### Attributes

Name
Type
Description

`name`

`str`

—

`handler`

`Callable[..., Any]`

—

`description`

`str`

—

`depends_on`

`tuple[str, ...]`

—

`parallel`

`bool`

—

`weight`

`int`

—

`policy`

`PhasePolicy`

—

`max_logs`

`int`

—

`PhaseStatus`

7

▼

Runtime status of a single phase.

Runtime status of a single phase.

#### Attributes

Name
Type
Description

`name`

`str`

—

`status`

`str`

—

`started_at`

`float`

—

`elapsed`

`float`

—

`error`

`str`

—

`attempt`

`int`

—

`logs`

`tuple[PhaseLog, ...]`

—

`PipelineState`

7

▼

Observable state for a running pipeline.

Observable state for a running pipeline.

#### Attributes

Name
Type
Description

`name`

`str`

—

`phases`

`tuple[PhaseStatus, ...]`

—

`current_phase`

`str`

—

`started_at`

`float`

—

`elapsed`

`float`

—

`progress`

`float`

—

`status`

`str`

—

`PipelineViewState`

6

▼

UI state for the pipeline detail TUI (wraps PipelineState).

This is a view-layer wrapper — the pip…

UI state for the pipeline detail TUI (wraps PipelineState).

This is a view-layer wrapper — the pipeline reducer remains pure.
The TUI reducer wraps it and handles @@KEY actions for navigation.

#### Attributes

Name
Type
Description

`pipeline`

`PipelineState`

—

`selected_phase`

`int`

—

`expanded`

`bool`

—

`log_scroll`

`int`

—

`auto_follow`

`bool`

—

`log_height`

`int`

—

`CycleError`

0

▼

Raised when a pipeline dependency graph contains a cycle.

Raised when a pipeline dependency graph contains a cycle.

`Pipeline`

5

▼

Declarative build pipeline that executes through the Store.

Usage::

pipeline = Pipeline(
…

Declarative build pipeline that executes through the Store.

Usage::

```
pipeline = Pipeline(
    "build",
    Phase("discover", handler=discover),
    Phase("parse", handler=parse, depends_on=("discover",)),
    Phase("render", handler=render, depends_on=("parse",), parallel=True),
    Phase("assets", handler=assets, depends_on=("parse",), parallel=True),
    Phase("write", handler=write, depends_on=("render", "assets")),
)

# Extend with >>
pipeline = pipeline >> Phase("health", handler=check)
```

The pipeline generates a saga and reducer that work with the Store
for observable, testable execution.

Raises`CycleError`at construction if the dependency graph
contains a cycle.

#### Methods

`build_reducer`

0

`Callable`

▼

Generate a reducer that handles pipeline state transitions.

`def build_reducer(self) -> Callable`

##### Returns

`Callable`

`build_saga`

0

`Callable`

▼

Generate a saga that executes all phases in dependency order.

Phase handlers t…

`def build_saga(self) -> Callable`

Generate a saga that executes all phases in dependency order.

Phase handlers that accept a`context`parameter receive a dict
mapping dependency names to their results.

When`capture_output=True`, handler stdout/stderr is captured and
dispatched as`@@PHASE_LOG`actions.

##### Returns

`Callable`

`execution_order`

0

`list[str]`

▼

Return the topological execution order of phases.

`def execution_order(self) -> list[str]`

##### Returns

`list[str]`

Internal Methods
2

▼

`__init__`

4

▼

`def __init__(self, name: str, *phases: Phase, capture_output: bool = False, fail_fast: bool = False) -> None`

##### Parameters

Name
Type
Description

`name`
`—`

`*phases`
`—`

`capture_output`
`—`

Default:`False`

`fail_fast`
`—`

Default:`False`

`__rshift__`

1

`Pipeline`

▼

Extend the pipeline: ``pipeline >> Phase(...)``.

`def __rshift__(self, phase: Phase) -> Pipeline`

##### Parameters

Name
Type
Description

`phase`
`—`

##### Returns

`Pipeline`

`_CaptureProxy`

8

▼

Proxy that routes writes to a per-phase buffer when capture is active.

When ``_phase_buffer`` cont…

Proxy that routes writes to a per-phase buffer when capture is active.

When`_phase_buffer`contextvar holds a list, writes are buffered there.
Otherwise writes pass through to the original stream. Thread-safe because
contextvars are per-thread.

#### Methods

`encoding`

0

`str`

▼

property

`def encoding(self) -> str`

##### Returns

`str`

`write`

1

`int`

▼

`def write(self, s: str) -> int`

##### Parameters

Name
Type
Description

`s`
`—`

##### Returns

`int`

`flush`

0

▼

`def flush(self) -> None`

`fileno`

0

`int`

▼

`def fileno(self) -> int`

##### Returns

`int`

`isatty`

0

`bool`

▼

`def isatty(self) -> bool`

##### Returns

`bool`

`readable`

0

`bool`

▼

`def readable(self) -> bool`

##### Returns

`bool`

`writable`

0

`bool`

▼

`def writable(self) -> bool`

##### Returns

`bool`

Internal Methods
1

▼

`__init__`

2

▼

`def __init__(self, original: Any, stream_name: str = 'stdout') -> None`

##### Parameters

Name
Type
Description

`original`
`—`

`stream_name`
`—`

Default:`'stdout'`

## Functions

`set_active_pipeline`

1

`None`

▼

Publish a PipelineState for the milo://pipeline/timeline resource.

Call this f…

`def set_active_pipeline(state: PipelineState | None) -> None`

Publish a PipelineState for the milo://pipeline/timeline resource.

Call this from a Store listener to keep the resource up to date::

```
store.subscribe(lambda: set_active_pipeline(store.state))
```

##### Parameters

Name
Type
Description

`state`
`PipelineState | None`

`get_active_pipeline`

0

`PipelineState | None`

▼

Return the most recently published PipelineState, or None.

`def get_active_pipeline() -> PipelineState | None`

##### Returns

`PipelineState | None`

`pipeline_to_timeline`

1

`dict[str, Any]`

▼

Serialize a PipelineState to a timeline dict for MCP.

Returns structured JSON …

`def pipeline_to_timeline(state: PipelineState) -> dict[str, Any]`

Serialize a PipelineState to a timeline dict for MCP.

Returns structured JSON matching the milo://pipeline/timeline schema::

```
{
    "pipeline": "build",
    "status": "completed",
    "elapsed": 2.34,
    "progress": 1.0,
    "phases": [
        {"name": "discover", "status": "completed", "elapsed": 0.52, "attempt": 1, "log_count": 42},
        ...
    ]
}
```

##### Parameters

Name
Type
Description

`state`
`PipelineState`

##### Returns

`dict[str, Any]`

`make_detail_reducer`

1

`Callable[[PipelineViewSt…`

▼

Create a wrapping reducer for the interactive pipeline detail view.

Handles @@…

`def make_detail_reducer(pipeline_reducer: Callable) -> Callable[[PipelineViewState, Action], PipelineViewState | Quit]`

Create a wrapping reducer for the interactive pipeline detail view.

Handles @@KEY actions for cursor navigation, expansion/collapse,
log scrolling, and auto-follow. Delegates all other actions to
the inner pipeline reducer.

Usage::

```
reducer = make_detail_reducer(pipeline.build_reducer())
```

##### Parameters

Name
Type
Description

`pipeline_reducer`
`Callable`

##### Returns

`Callable[[PipelineViewState, Action], PipelineViewState | Quit]`

`_validate_dependencies`

1

`None`

▼

Validate the dependency graph upfront. Raises CycleError if a cycle exists.

`def _validate_dependencies(phases: list[Phase]) -> None`

##### Parameters

Name
Type
Description

`phases`
`list[Phase]`

`_acquire_proxy`

0

`None`

▼

Install capture proxies on sys.stdout/stderr (ref-counted).

`def _acquire_proxy() -> None`

`_release_proxy`

0

`None`

▼

Remove capture proxies when last consumer is done (ref-counted).

`def _release_proxy() -> None`

`_call_handler_captured`

2

`tuple[Any, list[tuple[st…`

▼

Call a handler with stdout/stderr capture. Returns (result, log_entries).

On e…

`def _call_handler_captured(handler: Callable, context: dict[str, Any]) -> tuple[Any, list[tuple[str, str, float]]]`

Call a handler with stdout/stderr capture. Returns (result, log_entries).

On exception, the captured logs are attached to the exception as
`__captured_logs__`so the caller can flush them before emitting
PHASE_FAILED / PHASE_RETRY / PHASE_SKIPPED.

##### Parameters

Name
Type
Description

`handler`
`Callable`

`context`
`dict[str, Any]`

##### Returns

`tuple[Any, list[tuple[str, str, float]]]`

`_flush_logs`

2

`Any`

▼

Yield Put(PHASE_LOG) for each captured line.

`def _flush_logs(name: str, logs: list[tuple[str, str, float]]) -> Any`

##### Parameters

Name
Type
Description

`name`
`str`

`logs`
`list[tuple[str, str, float]]`

##### Returns

`Any`

`_handler_wants_context`

1

`bool`

▼

Return True if the handler has a ``context`` parameter.

`def _handler_wants_context(handler: Callable) -> bool`

##### Parameters

Name
Type
Description

`handler`
`Callable`

##### Returns

`bool`

`_build_context`

2

`dict[str, Any]`

▼

Build the context dict for a phase from its dependency results.

`def _build_context(phase: Phase, results: dict[str, Any]) -> dict[str, Any]`

##### Parameters

Name
Type
Description

`phase`
`Phase`

`results`
`dict[str, Any]`

##### Returns

`dict[str, Any]`

`_call_handler`

2

`Any`

▼

Call a handler, passing context if it accepts one.

`def _call_handler(handler: Callable, context: dict[str, Any]) -> Any`

##### Parameters

Name
Type
Description

`handler`
`Callable`

`context`
`dict[str, Any]`

##### Returns

`Any`

`_retry_delay_for`

2

`float`

▼

Calculate retry delay for the given attempt number.

`def _retry_delay_for(policy: PhasePolicy, attempt: int) -> float`

##### Parameters

Name
Type
Description

`policy`
`PhasePolicy`

`attempt`
`int`

##### Returns

`float`

`_run_phase_inline`

5

`Any`

▼

Run a sequential phase inline in the main saga. Returns True if pipeline should…

`def _run_phase_inline(name: str, phase: Phase, context: dict[str, Any], results: dict[str, Any], capture: bool = False) -> Any`

Run a sequential phase inline in the main saga. Returns True if pipeline should stop.

##### Parameters

Name
Type
Description

`name`
`str`

`phase`
`Phase`

`context`
`dict[str, Any]`

`results`
`dict[str, Any]`

`capture`
`bool`

Default:`False`

##### Returns

`Any`

`_make_phase_saga`

7

`Callable`

▼

Create a saga for a single phase (used by All for parallel phases).

When *fail…

`def _make_phase_saga(name: str, handler: Callable, policy: PhasePolicy, context: dict[str, Any], results: dict[str, Any], capture: bool = False, *, fail_fast: bool = False) -> Callable`

Create a saga for a single phase (used by All for parallel phases).

When fail_fast is True and the phase fails with`on_fail="stop"`
(the default), the saga re-raises after dispatching`PHASE_FAILED`
so that`All`can cancel sibling sagas.

##### Parameters

Name
Type
Description

`name`
`str`

`handler`
`Callable`

`policy`
`PhasePolicy`

`context`
`dict[str, Any]`

`results`
`dict[str, Any]`

`capture`
`bool`

Default:`False`

`fail_fast`
`bool`

Default:`False`

##### Returns

`Callable`
