Overview
The reactive system pushes fresh HTML to connected browsers the moment your data changes — no client polling, no manual fan-out. You emit a change event after a mutation. Chirp figures out which template blocks display the data that changed and re-renders only those, streaming them over Server-Sent Events to every viewer of that resource.
Reach for it when many users watch the same live thing — a shared document, a kanban board, a price ticker — and a server-side change should appear for all of them at once.
Three pieces work together. You will touch all three in the example below.
| Piece | What it does |
|---|---|
ReactiveBus |
A thread-safe pub/sub channel. You emitChangeEvents on it from any thread; subscribers receive them per scope. |
DependencyIndex |
Maps context paths (like"tasks") to the template blocks that display them. Built once at startup. |
reactive_stream() |
Subscribes a route to the bus, runs each change through the index, and returns anEventStreamof re-rendered fragments. |
Minimal working example
The shortest end-to-end loop: register blocks against context paths, return areactive_stream() from an SSE route, and emit a ChangeEventafter each mutation. Open the page in two tabs — change a task in one, watch the other update within a second.
The full runnable app lives at examples/standalone/reactive_tasks.
Emitting changes
AChangeEventis a frozen dataclass you emit after a mutation. Four fields:
@dataclass(frozen=True, slots=True)
class ChangeEvent:
scope: str # e.g. a document or board ID
changed_paths: frozenset[str] # e.g. {"doc.content", "doc.version"}
origin: str | None = None # who caused this change
audience: frozenset[str] | None = None
scoperoutes delivery. Subscribers receive only events for their scope.changed_pathstells theDependencyIndexwhich blocks need re-rendering.originenables self-suppression:reactive_stream()skips events whoseoriginmatches the current connection, so the client that caused a change is not re-notified of it.audiencenarrows delivery to subscribers whoseConnectionInfo.user_idis in the set.Nonebroadcasts to everyone in the scope.
Emit from any thread — a background worker, a sync POST handler, anywhere:
from chirp.pages.reactive import ChangeEvent
bus.emit_sync(ChangeEvent(
scope="doc-42",
changed_paths=frozenset({"doc.content", "doc.version"}),
origin="user-7", # skip notifying the author of their own edit
))
Emit only what actually mutated. Blocks that display computed values stay in sync automatically — see Derived paths below.
Mapping paths to blocks
TheDependencyIndexanswers one question: given these changed paths, which template blocks re-render? The common case needs nothing manual — register a template and kida's static block analysis extracts each block's dependencies for you:
from chirp.pages.reactive import DependencyIndex
index = DependencyIndex()
index.register_template(env, "board.html")
For finer control, register a single block against a path by hand:
from chirp.pages.reactive import BlockRef
index.register("tasks", BlockRef("board.html", "task_list"))
index.register("tasks", BlockRef("board.html", "task_count", dom_id="count"))
Prefix matching is built in: changing "doc" affects blocks that depend on "doc.version", and changing "doc.version" affects blocks that depend on "doc".
Derived paths
Declare computed relationships so a single source change fans out to everything downstream. When a source path changes, derived paths join the affected set automatically — your mutation code only emits what it actually touched:
index.derive("doc.word_count", from_paths={"doc.content"})
index.derive("doc.summary", from_paths={"doc.content", "doc.title"})
Derivations are transitive: if A derives from B and B derives from C, changing C expands to {C, B, A}. A word_count block re-renders when doc.contentchanges, with no extra wiring at the emit site.
Tracking who's connected
Pass aConnectionInfowhen you build the stream to unlock presence and audience filtering. Without it, a connection is anonymous — it still receives broadcast events, but it is invisible to presence and skipped by audience-filtered events.
from chirp.middleware.auth import get_user
from chirp.pages.reactive import ConnectionInfo
user = get_user() # AnonymousUser when not signed in
connection = ConnectionInfo(
session_id=session_id,
user_id=user.id or None, # "" for anonymous → None
)
session_id is required; user_id is None for anonymous viewers; connected_at is captured with time.monotonic()at construction.
Read presence off the bus by scope:
viewers = bus.presence("doc-42")
viewer_count = len(viewers)
presence() returns only the connections that supplied a ConnectionInfo. Use on_disconnect(below) to emit a presence-only change event when a tab closes.
Howreactive_stream()runs
reactive_stream() is the one call that ties the bus, the index, and the connection together into an EventStreamyou return from a route:
from chirp import EventStream
from chirp.middleware.auth import get_user
from chirp.middleware.sessions import get_session
from chirp.pages.reactive import ConnectionInfo, reactive_stream
@app.route("/doc/{doc_id}/live", referenced=True)
def live(doc_id: str) -> EventStream:
session_id = get_session()["sid"]
user = get_user() # AnonymousUser when not signed in
return reactive_stream(
bus,
scope=doc_id,
index=dep_index,
context_builder=lambda paths: build_doc_context(doc_id, paths),
origin=session_id,
connection=ConnectionInfo(session_id=session_id, user_id=user.id or None),
on_disconnect=lambda scope, conn: notify_left(scope),
)
For each ChangeEventon the scope, the stream:
- 1
Skips self-caused events
If
originis set and matches the event'sorigin, the event is skipped. - 2
Applies audience filtering
When the event has an
audience, only connections whoseuser_idis in it receive it. - 3
Looks up affected blocks
DependencyIndex.affected_blocks(changed_paths)expands derived/prefix paths and returns the blocks to re-render. - 4
Builds fresh context
Calls
context_builder(changed_paths)when the builder takes one argument, orcontext_builder()for the zero-argument form. - 5
Yields a fragment per block
Each affected block streams as a
Fragmenttargeting its DOM id.
Selective context. The one-argumentcontext_builder receives the exact frozenset[str]of changed paths (after index expansion), so an expensive page can load only what changed:
def build_doc_context(changed_paths: frozenset[str]) -> dict:
if "doc.comments" in changed_paths:
return {"comments": store.load_comments()}
return {"doc": store.load_doc()}
Error boundary. If context_builder()raises, that one event is skipped and the stream stays alive; the next change retries with fresh data.
Disconnect hook. Passon_disconnectwhen presence or cleanup must run when
a client drops the SSE connection. The callback receives(scope, connection)—
the sameConnectionInfo passed to reactive_stream(..., connection=...). Typical
use: emit a presence-onlyChangeEventso remaining tabs refresh their viewer
count.
Audience-filtered notifications
Setaudiencewhen a change matters to only some viewers:
bus.emit_sync(ChangeEvent(
scope="doc-42",
changed_paths=frozenset({"notifications"}),
audience=frozenset({"alice", "bob"}),
))
Subscribers without a ConnectionInfo, or with a user_id outside the audience, do not receive it. Broadcast events keep audience=None.
Gotchas
Callingbus.close("doc-42") signals every subscriber on that scope to stop; bus.close()with no argument closes all scopes.
Advanced
Building and inspecting the dependency index by hand
Most apps get the index fromregister_template()at startup. The lower-level surface below is for the rare case where you register, query, or debug the mapping directly.
Register only the blocks behindsse-swapelements. Scans the raw template source and registers just those blocks, with their DOM ids — handy when most of a page is static:
source = env.loader.get_source(env, "page.html")[0]
index.register_from_sse_swaps(env, "page.html", source,
exclude_blocks={"editor_content"}, # client-managed, don't re-render
)
Query the mapping:
blocks = index.affected_blocks(frozenset({"doc.content"}))
# -> [BlockRef(template_name="page.html", block_name="content"), ...]
paths = index.block_dependencies("page.html", "content")
# -> the context paths that can cause that block to re-render
Trace an expansion with explain_affected()— it shows the derived-path chain and the blocks each change resolves to:
index.explain_affected(frozenset({"doc.content"}))
# {
# "original_paths": {"doc.content"},
# "expanded_paths": {"doc.content", "doc.word_count", "doc.summary"},
# "derived_paths": {"doc.word_count", "doc.summary"},
# "affected_blocks": [{"template": "page.html", "block": "content", "target": "doc-body"}, ...]
# }
Tuning queue depth and watching throughput
The bus exposes per-subscriber back-pressure tuning and three observability counters:
bus = ReactiveBus(maxsize=64) # per-subscriber queue depth; default 256
bus.emitted_count # total events emitted (including dropped)
bus.dropped_count # events lost to full subscriber queues
bus.subscriber_count # active subscribers across all scopes
Pass on_drop=callback to ReactiveBus(...) for a custom hook on each drop ((scope, event) -> None). Keep it fast — it runs on the emit path.
Validating the index with app.check()
app.check() (and the chirp checkCLI) validates the reactive system at startup. The checks run only when you register contract data:
app.set_contract_check_data("reactive_index", index)
app.set_contract_check_data("reactive_emitted_paths", {"tasks", "presence"})
app.set_contract_check_data("reactive_connection_scopes", {"board"})
# Add only when you emit ChangeEvent(..., audience=...).
app.set_contract_check_data("reactive_audience_scopes", {"board"})
Contract metadata keys:
| Key | Required when | What it validates |
|---|---|---|
reactive_index |
Always (for reactive apps) | EveryBlockRefpoints at a real template block; derivation graph has no cycles |
reactive_emitted_paths |
You emitChangeEvent(changed_paths=...) |
Every emitted path is registered in the index |
reactive_connection_scopes |
You passconnection=ConnectionInfo(...) to reactive_stream() |
Connection-aware streams exist for declared scopes |
reactive_audience_scopes |
You emitChangeEvent(audience=...) |
Audience-filtered scopes have matching connection-aware streams |
Four categories fire:
| Category | Severity | Catches |
|---|---|---|
reactive_block |
ERROR | ABlockRefpoints at a template block that does not exist (typo or renamed block). |
reactive_cycle |
WARNING | The derivation graph contains a cycle. |
reactive_paths |
WARNING | A declared emitted path is not registered in the index. |
reactive_audience |
WARNING | An audience-filtered scope has no connection-aware stream to match against. |
Keepreactive_emitted_paths in sync with the changed_pathsyour code emits. See contract categories for every category and how severities behave per environment.
ReactiveBus is fully thread-safe and DependencyIndexis read-only after construction, so you build the index once at startup and share both across every handler. See Thread Safety for the guarantees behind that.