Module

pipeline

Pipeline orchestration with observable state through the Store/saga system.

Classes

PhasePolicy 5
Failure policy for a pipeline phase. Controls what happens when a phase's handler raises an except…

Failure policy for a pipeline phase.

Controls what happens when a phase's handler raises an exception. Default behavior (on_fail="stop") matches the original fail-fast semantics.

Attributes

Name Type Description
on_fail str
max_retries int
retry_delay float
retry_backoff str

Methods

Internal Methods 1
__post_init__ 0
def __post_init__(self) -> None
PhaseLog 3
A single captured output line from a phase handler.

A single captured output line from a phase handler.

Attributes

Name Type Description
line str
stream str
timestamp float
Phase 8
A named pipeline phase.

A named pipeline phase.

Attributes

Name Type Description
name str
handler Callable[..., Any]
description str
depends_on tuple[str, ...]
parallel bool
weight int
policy PhasePolicy
max_logs int
PhaseStatus 7
Runtime status of a single phase.

Runtime status of a single phase.

Attributes

Name Type Description
name str
status str
started_at float
elapsed float
error str
attempt int
logs tuple[PhaseLog, ...]
PipelineState 7
Observable state for a running pipeline.

Observable state for a running pipeline.

Attributes

Name Type Description
name str
phases tuple[PhaseStatus, ...]
current_phase str
started_at float
elapsed float
progress float
status str
PipelineViewState 6
UI state for the pipeline detail TUI (wraps PipelineState). This is a view-layer wrapper — the pip…

UI state for the pipeline detail TUI (wraps PipelineState).

This is a view-layer wrapper — the pipeline reducer remains pure. The TUI reducer wraps it and handles @@KEY actions for navigation.

Attributes

Name Type Description
pipeline PipelineState
selected_phase int
expanded bool
log_scroll int
auto_follow bool
log_height int
CycleError 0
Raised when a pipeline dependency graph contains a cycle.

Raised when a pipeline dependency graph contains a cycle.

Pipeline 5
Declarative build pipeline that executes through the Store. Usage:: pipeline = Pipeline( …

Declarative build pipeline that executes through the Store.

Usage::

pipeline = Pipeline(
    "build",
    Phase("discover", handler=discover),
    Phase("parse", handler=parse, depends_on=("discover",)),
    Phase("render", handler=render, depends_on=("parse",), parallel=True),
    Phase("assets", handler=assets, depends_on=("parse",), parallel=True),
    Phase("write", handler=write, depends_on=("render", "assets")),
)

# Extend with >>
pipeline = pipeline >> Phase("health", handler=check)

The pipeline generates a saga and reducer that work with the Store for observable, testable execution.

RaisesCycleErrorat construction if the dependency graph contains a cycle.

Methods

build_reducer 0 Callable
Generate a reducer that handles pipeline state transitions.
def build_reducer(self) -> Callable
Returns
Callable
build_saga 0 Callable
Generate a saga that executes all phases in dependency order. Phase handlers t…
def build_saga(self) -> Callable

Generate a saga that executes all phases in dependency order.

Phase handlers that accept acontextparameter receive a dict mapping dependency names to their results.

Whencapture_output=True, handler stdout/stderr is captured and dispatched as@@PHASE_LOGactions.

Returns
Callable
execution_order 0 list[str]
Return the topological execution order of phases.
def execution_order(self) -> list[str]
Returns
list[str]
Internal Methods 2
__init__ 4
def __init__(self, name: str, *phases: Phase, capture_output: bool = False, fail_fast: bool = False) -> None
Parameters
Name Type Description
name
*phases
capture_output Default:False
fail_fast Default:False
__rshift__ 1 Pipeline
Extend the pipeline: ``pipeline >> Phase(...)``.
def __rshift__(self, phase: Phase) -> Pipeline
Parameters
Name Type Description
phase
Returns
Pipeline
_CaptureProxy 8
Proxy that routes writes to a per-phase buffer when capture is active. When ``_phase_buffer`` cont…

Proxy that routes writes to a per-phase buffer when capture is active.

When_phase_buffercontextvar holds a list, writes are buffered there. Otherwise writes pass through to the original stream. Thread-safe because contextvars are per-thread.

Methods

encoding 0 str
property
def encoding(self) -> str
Returns
str
write 1 int
def write(self, s: str) -> int
Parameters
Name Type Description
s
Returns
int
flush 0
def flush(self) -> None
fileno 0 int
def fileno(self) -> int
Returns
int
isatty 0 bool
def isatty(self) -> bool
Returns
bool
readable 0 bool
def readable(self) -> bool
Returns
bool
writable 0 bool
def writable(self) -> bool
Returns
bool
Internal Methods 1
__init__ 2
def __init__(self, original: Any, stream_name: str = 'stdout') -> None
Parameters
Name Type Description
original
stream_name Default:'stdout'

Functions

set_active_pipeline 1 None
Publish a PipelineState for the milo://pipeline/timeline resource. Call this f…
def set_active_pipeline(state: PipelineState | None) -> None

Publish a PipelineState for the milo://pipeline/timeline resource.

Call this from a Store listener to keep the resource up to date::

store.subscribe(lambda: set_active_pipeline(store.state))
Parameters
Name Type Description
state PipelineState | None
get_active_pipeline 0 PipelineState | None
Return the most recently published PipelineState, or None.
def get_active_pipeline() -> PipelineState | None
Returns
PipelineState | None
pipeline_to_timeline 1 dict[str, Any]
Serialize a PipelineState to a timeline dict for MCP. Returns structured JSON …
def pipeline_to_timeline(state: PipelineState) -> dict[str, Any]

Serialize a PipelineState to a timeline dict for MCP.

Returns structured JSON matching the milo://pipeline/timeline schema::

{
    "pipeline": "build",
    "status": "completed",
    "elapsed": 2.34,
    "progress": 1.0,
    "phases": [
        {"name": "discover", "status": "completed", "elapsed": 0.52, "attempt": 1, "log_count": 42},
        ...
    ]
}
Parameters
Name Type Description
state PipelineState
Returns
dict[str, Any]
make_detail_reducer 1 Callable[[PipelineViewSt…
Create a wrapping reducer for the interactive pipeline detail view. Handles @@…
def make_detail_reducer(pipeline_reducer: Callable) -> Callable[[PipelineViewState, Action], PipelineViewState | Quit]

Create a wrapping reducer for the interactive pipeline detail view.

Handles @@KEY actions for cursor navigation, expansion/collapse, log scrolling, and auto-follow. Delegates all other actions to the inner pipeline reducer.

Usage::

reducer = make_detail_reducer(pipeline.build_reducer())
Parameters
Name Type Description
pipeline_reducer Callable
Returns
Callable[[PipelineViewState, Action], PipelineViewState | Quit]
_validate_dependencies 1 None
Validate the dependency graph upfront. Raises CycleError if a cycle exists.
def _validate_dependencies(phases: list[Phase]) -> None
Parameters
Name Type Description
phases list[Phase]
_acquire_proxy 0 None
Install capture proxies on sys.stdout/stderr (ref-counted).
def _acquire_proxy() -> None
_release_proxy 0 None
Remove capture proxies when last consumer is done (ref-counted).
def _release_proxy() -> None
_call_handler_captured 2 tuple[Any, list[tuple[st…
Call a handler with stdout/stderr capture. Returns (result, log_entries). On e…
def _call_handler_captured(handler: Callable, context: dict[str, Any]) -> tuple[Any, list[tuple[str, str, float]]]

Call a handler with stdout/stderr capture. Returns (result, log_entries).

On exception, the captured logs are attached to the exception as __captured_logs__so the caller can flush them before emitting PHASE_FAILED / PHASE_RETRY / PHASE_SKIPPED.

Parameters
Name Type Description
handler Callable
context dict[str, Any]
Returns
tuple[Any, list[tuple[str, str, float]]]
_flush_logs 2 Any
Yield Put(PHASE_LOG) for each captured line.
def _flush_logs(name: str, logs: list[tuple[str, str, float]]) -> Any
Parameters
Name Type Description
name str
logs list[tuple[str, str, float]]
Returns
Any
_handler_wants_context 1 bool
Return True if the handler has a ``context`` parameter.
def _handler_wants_context(handler: Callable) -> bool
Parameters
Name Type Description
handler Callable
Returns
bool
_build_context 2 dict[str, Any]
Build the context dict for a phase from its dependency results.
def _build_context(phase: Phase, results: dict[str, Any]) -> dict[str, Any]
Parameters
Name Type Description
phase Phase
results dict[str, Any]
Returns
dict[str, Any]
_call_handler 2 Any
Call a handler, passing context if it accepts one.
def _call_handler(handler: Callable, context: dict[str, Any]) -> Any
Parameters
Name Type Description
handler Callable
context dict[str, Any]
Returns
Any
_retry_delay_for 2 float
Calculate retry delay for the given attempt number.
def _retry_delay_for(policy: PhasePolicy, attempt: int) -> float
Parameters
Name Type Description
policy PhasePolicy
attempt int
Returns
float
_run_phase_inline 5 Any
Run a sequential phase inline in the main saga. Returns True if pipeline should…
def _run_phase_inline(name: str, phase: Phase, context: dict[str, Any], results: dict[str, Any], capture: bool = False) -> Any

Run a sequential phase inline in the main saga. Returns True if pipeline should stop.

Parameters
Name Type Description
name str
phase Phase
context dict[str, Any]
results dict[str, Any]
capture bool Default:False
Returns
Any
_make_phase_saga 7 Callable
Create a saga for a single phase (used by All for parallel phases). When *fail…
def _make_phase_saga(name: str, handler: Callable, policy: PhasePolicy, context: dict[str, Any], results: dict[str, Any], capture: bool = False, *, fail_fast: bool = False) -> Callable

Create a saga for a single phase (used by All for parallel phases).

When fail_fast is True and the phase fails withon_fail="stop" (the default), the saga re-raises after dispatchingPHASE_FAILED so thatAllcan cancel sibling sagas.

Parameters
Name Type Description
name str
handler Callable
policy PhasePolicy
context dict[str, Any]
results dict[str, Any]
capture bool Default:False
fail_fast bool Default:False
Returns
Callable