Classes
PhasePolicy
5
▼
Failure policy for a pipeline phase.
Controls what happens when a phase's handler raises an except…
PhasePolicy
5
▼
Failure policy for a pipeline phase.
Controls what happens when a phase's handler raises an exception.
Default behavior (on_fail="stop") matches the original fail-fast semantics.
Attributes
| Name | Type | Description |
|---|---|---|
on_fail |
str
|
— |
max_retries |
int
|
— |
retry_delay |
float
|
— |
retry_backoff |
str
|
— |
Methods
Internal Methods 1 ▼
__post_init__
0
▼
__post_init__
0
▼
def __post_init__(self) -> None
PhaseLog
3
▼
A single captured output line from a phase handler.
PhaseLog
3
▼
A single captured output line from a phase handler.
Attributes
| Name | Type | Description |
|---|---|---|
line |
str
|
— |
stream |
str
|
— |
timestamp |
float
|
— |
Phase
8
▼
A named pipeline phase.
Phase
8
▼
A named pipeline phase.
Attributes
| Name | Type | Description |
|---|---|---|
name |
str
|
— |
handler |
Callable[..., Any]
|
— |
description |
str
|
— |
depends_on |
tuple[str, ...]
|
— |
parallel |
bool
|
— |
weight |
int
|
— |
policy |
PhasePolicy
|
— |
max_logs |
int
|
— |
PhaseStatus
7
▼
Runtime status of a single phase.
PhaseStatus
7
▼
Runtime status of a single phase.
Attributes
| Name | Type | Description |
|---|---|---|
name |
str
|
— |
status |
str
|
— |
started_at |
float
|
— |
elapsed |
float
|
— |
error |
str
|
— |
attempt |
int
|
— |
logs |
tuple[PhaseLog, ...]
|
— |
PipelineState
7
▼
Observable state for a running pipeline.
PipelineState
7
▼
Observable state for a running pipeline.
Attributes
| Name | Type | Description |
|---|---|---|
name |
str
|
— |
phases |
tuple[PhaseStatus, ...]
|
— |
current_phase |
str
|
— |
started_at |
float
|
— |
elapsed |
float
|
— |
progress |
float
|
— |
status |
str
|
— |
PipelineViewState
6
▼
UI state for the pipeline detail TUI (wraps PipelineState).
This is a view-layer wrapper — the pip…
PipelineViewState
6
▼
UI state for the pipeline detail TUI (wraps PipelineState).
This is a view-layer wrapper — the pipeline reducer remains pure. The TUI reducer wraps it and handles @@KEY actions for navigation.
Attributes
| Name | Type | Description |
|---|---|---|
pipeline |
PipelineState
|
— |
selected_phase |
int
|
— |
expanded |
bool
|
— |
log_scroll |
int
|
— |
auto_follow |
bool
|
— |
log_height |
int
|
— |
CycleError
0
▼
Raised when a pipeline dependency graph contains a cycle.
CycleError
0
▼
Raised when a pipeline dependency graph contains a cycle.
Pipeline
5
▼
Declarative build pipeline that executes through the Store.
Usage::
pipeline = Pipeline(
…
Pipeline
5
▼
Declarative build pipeline that executes through the Store.
Usage::
pipeline = Pipeline(
"build",
Phase("discover", handler=discover),
Phase("parse", handler=parse, depends_on=("discover",)),
Phase("render", handler=render, depends_on=("parse",), parallel=True),
Phase("assets", handler=assets, depends_on=("parse",), parallel=True),
Phase("write", handler=write, depends_on=("render", "assets")),
)
# Extend with >>
pipeline = pipeline >> Phase("health", handler=check)
The pipeline generates a saga and reducer that work with the Store for observable, testable execution.
RaisesCycleErrorat construction if the dependency graph
contains a cycle.
Methods
build_reducer
0
Callable
▼
Generate a reducer that handles pipeline state transitions.
build_reducer
0
Callable
▼
def build_reducer(self) -> Callable
Returns
Callable
build_saga
0
Callable
▼
Generate a saga that executes all phases in dependency order.
Phase handlers t…
build_saga
0
Callable
▼
def build_saga(self) -> Callable
Generate a saga that executes all phases in dependency order.
Phase handlers that accept acontextparameter receive a dict
mapping dependency names to their results.
Whencapture_output=True, handler stdout/stderr is captured and
dispatched as@@PHASE_LOGactions.
Returns
Callable
execution_order
0
list[str]
▼
Return the topological execution order of phases.
execution_order
0
list[str]
▼
def execution_order(self) -> list[str]
Returns
list[str]
Internal Methods 2 ▼
__init__
4
▼
__init__
4
▼
def __init__(self, name: str, *phases: Phase, capture_output: bool = False, fail_fast: bool = False) -> None
Parameters
| Name | Type | Description |
|---|---|---|
name |
— |
|
*phases |
— |
|
capture_output |
— |
Default:False
|
fail_fast |
— |
Default:False
|
__rshift__
1
Pipeline
▼
Extend the pipeline: ``pipeline >> Phase(...)``.
__rshift__
1
Pipeline
▼
def __rshift__(self, phase: Phase) -> Pipeline
Parameters
| Name | Type | Description |
|---|---|---|
phase |
— |
Returns
Pipeline
_CaptureProxy
8
▼
Proxy that routes writes to a per-phase buffer when capture is active.
When ``_phase_buffer`` cont…
_CaptureProxy
8
▼
Proxy that routes writes to a per-phase buffer when capture is active.
When_phase_buffercontextvar holds a list, writes are buffered there.
Otherwise writes pass through to the original stream. Thread-safe because
contextvars are per-thread.
Methods
encoding
0
str
▼
property
encoding
0
str
▼
def encoding(self) -> str
Returns
str
write
1
int
▼
write
1
int
▼
def write(self, s: str) -> int
Parameters
| Name | Type | Description |
|---|---|---|
s |
— |
Returns
int
flush
0
▼
flush
0
▼
def flush(self) -> None
fileno
0
int
▼
fileno
0
int
▼
def fileno(self) -> int
Returns
int
isatty
0
bool
▼
isatty
0
bool
▼
def isatty(self) -> bool
Returns
bool
readable
0
bool
▼
readable
0
bool
▼
def readable(self) -> bool
Returns
bool
writable
0
bool
▼
writable
0
bool
▼
def writable(self) -> bool
Returns
bool
Internal Methods 1 ▼
__init__
2
▼
__init__
2
▼
def __init__(self, original: Any, stream_name: str = 'stdout') -> None
Parameters
| Name | Type | Description |
|---|---|---|
original |
— |
|
stream_name |
— |
Default:'stdout'
|
Functions
set_active_pipeline
1
None
▼
Publish a PipelineState for the milo://pipeline/timeline resource.
Call this f…
set_active_pipeline
1
None
▼
def set_active_pipeline(state: PipelineState | None) -> None
Publish a PipelineState for the milo://pipeline/timeline resource.
Call this from a Store listener to keep the resource up to date::
store.subscribe(lambda: set_active_pipeline(store.state))
Parameters
| Name | Type | Description |
|---|---|---|
state |
PipelineState | None |
get_active_pipeline
0
PipelineState | None
▼
Return the most recently published PipelineState, or None.
get_active_pipeline
0
PipelineState | None
▼
def get_active_pipeline() -> PipelineState | None
Returns
PipelineState | None
pipeline_to_timeline
1
dict[str, Any]
▼
Serialize a PipelineState to a timeline dict for MCP.
Returns structured JSON …
pipeline_to_timeline
1
dict[str, Any]
▼
def pipeline_to_timeline(state: PipelineState) -> dict[str, Any]
Serialize a PipelineState to a timeline dict for MCP.
Returns structured JSON matching the milo://pipeline/timeline schema::
{
"pipeline": "build",
"status": "completed",
"elapsed": 2.34,
"progress": 1.0,
"phases": [
{"name": "discover", "status": "completed", "elapsed": 0.52, "attempt": 1, "log_count": 42},
...
]
}
Parameters
| Name | Type | Description |
|---|---|---|
state |
PipelineState |
Returns
dict[str, Any]
make_detail_reducer
1
Callable[[PipelineViewSt…
▼
Create a wrapping reducer for the interactive pipeline detail view.
Handles @@…
make_detail_reducer
1
Callable[[PipelineViewSt…
▼
def make_detail_reducer(pipeline_reducer: Callable) -> Callable[[PipelineViewState, Action], PipelineViewState | Quit]
Create a wrapping reducer for the interactive pipeline detail view.
Handles @@KEY actions for cursor navigation, expansion/collapse, log scrolling, and auto-follow. Delegates all other actions to the inner pipeline reducer.
Usage::
reducer = make_detail_reducer(pipeline.build_reducer())
Parameters
| Name | Type | Description |
|---|---|---|
pipeline_reducer |
Callable |
Returns
Callable[[PipelineViewState, Action], PipelineViewState | Quit]
_validate_dependencies
1
None
▼
Validate the dependency graph upfront. Raises CycleError if a cycle exists.
_validate_dependencies
1
None
▼
def _validate_dependencies(phases: list[Phase]) -> None
Parameters
| Name | Type | Description |
|---|---|---|
phases |
list[Phase] |
_acquire_proxy
0
None
▼
Install capture proxies on sys.stdout/stderr (ref-counted).
_acquire_proxy
0
None
▼
def _acquire_proxy() -> None
_release_proxy
0
None
▼
Remove capture proxies when last consumer is done (ref-counted).
_release_proxy
0
None
▼
def _release_proxy() -> None
_call_handler_captured
2
tuple[Any, list[tuple[st…
▼
Call a handler with stdout/stderr capture. Returns (result, log_entries).
On e…
_call_handler_captured
2
tuple[Any, list[tuple[st…
▼
def _call_handler_captured(handler: Callable, context: dict[str, Any]) -> tuple[Any, list[tuple[str, str, float]]]
Call a handler with stdout/stderr capture. Returns (result, log_entries).
On exception, the captured logs are attached to the exception as
__captured_logs__so the caller can flush them before emitting
PHASE_FAILED / PHASE_RETRY / PHASE_SKIPPED.
Parameters
| Name | Type | Description |
|---|---|---|
handler |
Callable |
|
context |
dict[str, Any] |
Returns
tuple[Any, list[tuple[str, str, float]]]
_flush_logs
2
Any
▼
Yield Put(PHASE_LOG) for each captured line.
_flush_logs
2
Any
▼
def _flush_logs(name: str, logs: list[tuple[str, str, float]]) -> Any
Parameters
| Name | Type | Description |
|---|---|---|
name |
str |
|
logs |
list[tuple[str, str, float]] |
Returns
Any
_handler_wants_context
1
bool
▼
Return True if the handler has a ``context`` parameter.
_handler_wants_context
1
bool
▼
def _handler_wants_context(handler: Callable) -> bool
Parameters
| Name | Type | Description |
|---|---|---|
handler |
Callable |
Returns
bool
_build_context
2
dict[str, Any]
▼
Build the context dict for a phase from its dependency results.
_build_context
2
dict[str, Any]
▼
def _build_context(phase: Phase, results: dict[str, Any]) -> dict[str, Any]
Parameters
| Name | Type | Description |
|---|---|---|
phase |
Phase |
|
results |
dict[str, Any] |
Returns
dict[str, Any]
_call_handler
2
Any
▼
Call a handler, passing context if it accepts one.
_call_handler
2
Any
▼
def _call_handler(handler: Callable, context: dict[str, Any]) -> Any
Parameters
| Name | Type | Description |
|---|---|---|
handler |
Callable |
|
context |
dict[str, Any] |
Returns
Any
_retry_delay_for
2
float
▼
Calculate retry delay for the given attempt number.
_retry_delay_for
2
float
▼
def _retry_delay_for(policy: PhasePolicy, attempt: int) -> float
Parameters
| Name | Type | Description |
|---|---|---|
policy |
PhasePolicy |
|
attempt |
int |
Returns
float
_run_phase_inline
5
Any
▼
Run a sequential phase inline in the main saga. Returns True if pipeline should…
_run_phase_inline
5
Any
▼
def _run_phase_inline(name: str, phase: Phase, context: dict[str, Any], results: dict[str, Any], capture: bool = False) -> Any
Run a sequential phase inline in the main saga. Returns True if pipeline should stop.
Parameters
| Name | Type | Description |
|---|---|---|
name |
str |
|
phase |
Phase |
|
context |
dict[str, Any] |
|
results |
dict[str, Any] |
|
capture |
bool |
Default:False
|
Returns
Any
_make_phase_saga
7
Callable
▼
Create a saga for a single phase (used by All for parallel phases).
When *fail…
_make_phase_saga
7
Callable
▼
def _make_phase_saga(name: str, handler: Callable, policy: PhasePolicy, context: dict[str, Any], results: dict[str, Any], capture: bool = False, *, fail_fast: bool = False) -> Callable
Create a saga for a single phase (used by All for parallel phases).
When fail_fast is True and the phase fails withon_fail="stop"
(the default), the saga re-raises after dispatchingPHASE_FAILED
so thatAllcan cancel sibling sagas.
Parameters
| Name | Type | Description |
|---|---|---|
name |
str |
|
handler |
Callable |
|
policy |
PhasePolicy |
|
context |
dict[str, Any] |
|
results |
dict[str, Any] |
|
capture |
bool |
Default:False
|
fail_fast |
bool |
Default:False
|
Returns
Callable