Module

pipeline

Pipeline orchestration with observable state through the Store/saga system.

Classes

Phase 6
A named pipeline phase.

A named pipeline phase.

Attributes

Name Type Description
name str
handler Callable[..., Any]
description str
depends_on tuple[str, ...]
parallel bool
weight int
PhaseStatus 5
Runtime status of a single phase.

Runtime status of a single phase.

Attributes

Name Type Description
name str
status str
started_at float
elapsed float
error str
PipelineState 7
Observable state for a running pipeline.

Observable state for a running pipeline.

Attributes

Name Type Description
name str
phases tuple[PhaseStatus, ...]
current_phase str
started_at float
elapsed float
progress float
status str
Pipeline 5
Declarative build pipeline that executes through the Store. Usage:: pipeline = Pipeline( …

Declarative build pipeline that executes through the Store.

Usage::

pipeline = Pipeline(
    "build",
    Phase("discover", handler=discover),
    Phase("parse", handler=parse, depends_on=("discover",)),
    Phase("render", handler=render, depends_on=("parse",), parallel=True),
    Phase("assets", handler=assets, depends_on=("parse",), parallel=True),
    Phase("write", handler=write, depends_on=("render", "assets")),
)

# Extend with >>
pipeline = pipeline >> Phase("health", handler=check)

The pipeline generates a saga and reducer that work with the Store for observable, testable execution.

Methods

build_reducer 0 Callable
Generate a reducer that handles pipeline state transitions.
def build_reducer(self) -> Callable
Returns
Callable
build_saga 0 Callable
Generate a saga that executes all phases in dependency order.
def build_saga(self) -> Callable
Returns
Callable
execution_order 0 list[str]
Return the topological execution order of phases.
def execution_order(self) -> list[str]
Returns
list[str]
Internal Methods 2
__init__ 2
def __init__(self, name: str, *phases: Phase) -> None
Parameters
Name Type Description
name
*phases
__rshift__ 1 Pipeline
Extend the pipeline: ``pipeline >> Phase(...)``.
def __rshift__(self, phase: Phase) -> Pipeline
Parameters
Name Type Description
phase
Returns
Pipeline

Functions

_make_phase_saga 2 Callable
Create a saga for a single phase (used by Fork for parallel phases).
def _make_phase_saga(name: str, handler: Callable) -> Callable
Parameters
Name Type Description
name str
handler Callable
Returns
Callable