Bengal's reactive dataflow pipeline provides a declarative approach to site building where content flows through transformation streams, and changes automatically propagate to affected outputs.
Overview
Instead of manually tracking dependencies for incremental builds, the pipeline system uses a functional, stream-based architecture:
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
Key Components
StreamItem and StreamKey
Every item flowing through the pipeline is wrapped in aStreamItemwith a uniqueStreamKey:
1 2 3 4 5 6 7 8 9 10 11 | |
Theversionfield enables automatic cache invalidation—when content changes, its hash changes, invalidating cached results.
Stream Transformations
The pipeline provides several stream operators:
| Operator | Description | Example |
|---|---|---|
map |
Transform each item | .map("parse", parse_fn) |
filter |
Keep items matching predicate | .filter("md", lambda f: f.suffix == ".md") |
flat_map |
Transform and flatten | .flat_map("split", split_fn) |
collect |
Gather all items into list | .collect("all_pages") |
combine |
Merge two streams | .combine(other_stream) |
parallel |
Process in parallel | .parallel(workers=4) |
cache |
In-memory memoization | .cache() |
disk_cache |
Persistent caching | .disk_cache(cache) |
Pipeline Builder
ThePipelineclass provides a fluent API for constructing dataflow graphs:
1 2 3 4 5 6 7 8 9 10 11 | |
Bengal-Specific Streams
Bengal provides pre-built streams for common operations:
ContentDiscoveryStream
Discovers and parses content files with frontmatter:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
FileChangeStream
Emits changed files for incremental builds:
1 2 3 4 5 6 7 | |
Build Pipeline Factories
Bengal provides factory functions for common build scenarios:
Full Build Pipeline
1 2 3 4 | |
Incremental Build Pipeline
1 2 3 4 5 | |
Simple Render Pipeline
1 2 3 4 5 | |
Caching
In-Memory Caching
Use.cache()for within-build memoization:
1 2 3 4 5 6 7 | |
Disk Caching
Use.disk_cache()for cross-build persistence:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
On subsequent builds, items with matchingStreamKey.versionare loaded from cache without recomputation.
Cache Statistics
1 2 3 | |
Watch Mode
The pipeline integrates with file watching for development:
FileWatcher
Watches directories with debouncing:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
PipelineWatcher
Combines file watching with pipeline-based rebuilds:
1 2 3 4 | |
Change Classification
WatchBatchautomatically classifies changes:
1 2 3 4 5 | |
Data Flow Diagram
┌─────────────────────────────────────────────────────────────────────────┐
│ Bengal Build Pipeline │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │
│ │ discover │───▶│ filter │───▶│ parse │───▶│ create page │ │
│ │ files │ │ .md │ │ markdown │ │ (parallel) │ │
│ └──────────┘ └──────────┘ └──────────┘ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ collect │ │
│ │ (all pages) │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌─────────────────────────────────┤ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ build navigation │ │ render pages │ │
│ │ │─────────────▶│ (parallel) │ │
│ └──────────────────┘ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ write output │ │
│ │ (for_each) │ │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Performance Characteristics
| Operation | Time | Notes |
|---|---|---|
| Stream overhead | ~1µs per item | Minimal wrapper cost |
| Cache lookup | ~10µs | Hash-based lookup |
| Disk cache read | ~1ms | JSON deserialization |
| Parallel speedup | ~Nx | N = worker threads |
Benchmarks
For a 1,000 page site:
| Build Type | Time | Notes |
|---|---|---|
| Full build | ~15s | All pages rendered |
| Incremental (1 file) | ~0.3s | Only changed file |
| Incremental (cached) | ~0.1s | Cache hit |
Migration from Orchestrators
The pipeline system complements (and can replace) the existing orchestrator-based build system.
When to Use Pipeline
- New builds: Prefer pipeline for cleaner architecture
- Custom workflows: Pipeline is more flexible
- Watch mode: Better incremental support
When to Use Orchestrators
- Existing code: Orchestrators are stable and battle-tested
- Complex dependencies: Manual control over rebuild order
- Legacy integration: Works with existing hooks
Migration Example
Before (Orchestrator):
1 2 3 4 5 6 7 | |
After (Pipeline):
1 2 3 | |
API Reference
Core Types
StreamKey- Unique identifier for stream itemsStreamItem[T]- Item wrapper with key and valueStream[T]- Base class for all streamsPipeline- Builder for constructing pipelinesPipelineResult- Execution result with metrics
Stream Types
SourceStream- Creates stream from producer functionMapStream- Transforms itemsFilterStream- Filters items by predicateFlatMapStream- Transforms and flattensCollectStream- Gathers items into listCombineStream- Merges two streamsParallelStream- Parallel processingCachedStream- In-memory memoizationDiskCachedStream- Persistent caching
Bengal Streams
ContentDiscoveryStream- Discovers content filesFileChangeStream- Emits changed filesParsedContent- Parsed file dataRenderedPage- Rendered output
Watcher Types
FileWatcher- File system watcherPipelineWatcher- Watch + rebuildWatchEvent- Single change eventWatchBatch- Batched changesChangeType- Change type enum
Seealso
- Build Pipeline Concepts - High-level build overview
- Orchestration - Legacy orchestrator docs
- Caching - Cache system details