Reactive System

Push fresh HTML to every viewer the moment your data changes — no polling, no manual fan-out

Page actions AI-ready formats and sharing
Open LLM text
Share with AI
Ask Claude Ask ChatGPT Ask Gemini Ask Copilot

Overview

The reactive system pushes fresh HTML to connected browsers the moment your data changes — no client polling, no manual fan-out. You emit a change event after a mutation. Chirp figures out which template blocks display the data that changed and re-renders only those, streaming them over Server-Sent Events to every viewer of that resource.

Reach for it when many users watch the same live thing — a shared document, a kanban board, a price ticker — and a server-side change should appear for all of them at once.

Three pieces work together. You will touch all three in the example below.

Piece What it does
ReactiveBus A thread-safe pub/sub channel. You emitChangeEvents on it from any thread; subscribers receive them per scope.
DependencyIndex Maps context paths (like"tasks") to the template blocks that display them. Built once at startup.
reactive_stream() Subscribes a route to the bus, runs each change through the index, and returns anEventStreamof re-rendered fragments.

Minimal working example

The shortest end-to-end loop: register blocks against context paths, return areactive_stream() from an SSE route, and emit a ChangeEventafter each mutation. Open the page in two tabs — change a task in one, watch the other update within a second.

The full runnable app lives at examples/standalone/reactive_tasks.

Emitting changes

AChangeEventis a frozen dataclass you emit after a mutation. Four fields:

@dataclass(frozen=True, slots=True)
class ChangeEvent:
    scope: str                          # e.g. a document or board ID
    changed_paths: frozenset[str]       # e.g. {"doc.content", "doc.version"}
    origin: str | None = None           # who caused this change
    audience: frozenset[str] | None = None
  • scope routes delivery. Subscribers receive only events for their scope.
  • changed_paths tells the DependencyIndexwhich blocks need re-rendering.
  • origin enables self-suppression: reactive_stream() skips events whose originmatches the current connection, so the client that caused a change is not re-notified of it.
  • audience narrows delivery to subscribers whose ConnectionInfo.user_id is in the set. Nonebroadcasts to everyone in the scope.

Emit from any thread — a background worker, a sync POST handler, anywhere:

from chirp.pages.reactive import ChangeEvent

bus.emit_sync(ChangeEvent(
    scope="doc-42",
    changed_paths=frozenset({"doc.content", "doc.version"}),
    origin="user-7",   # skip notifying the author of their own edit
))

Emit only what actually mutated. Blocks that display computed values stay in sync automatically — see Derived paths below.

Mapping paths to blocks

TheDependencyIndexanswers one question: given these changed paths, which template blocks re-render? The common case needs nothing manual — register a template and kida's static block analysis extracts each block's dependencies for you:

from chirp.pages.reactive import DependencyIndex

index = DependencyIndex()
index.register_template(env, "board.html")

For finer control, register a single block against a path by hand:

from chirp.pages.reactive import BlockRef

index.register("tasks", BlockRef("board.html", "task_list"))
index.register("tasks", BlockRef("board.html", "task_count", dom_id="count"))

Prefix matching is built in: changing "doc" affects blocks that depend on "doc.version", and changing "doc.version" affects blocks that depend on "doc".

Derived paths

Declare computed relationships so a single source change fans out to everything downstream. When a source path changes, derived paths join the affected set automatically — your mutation code only emits what it actually touched:

index.derive("doc.word_count", from_paths={"doc.content"})
index.derive("doc.summary", from_paths={"doc.content", "doc.title"})

Derivations are transitive: if A derives from B and B derives from C, changing C expands to {C, B, A}. A word_count block re-renders when doc.contentchanges, with no extra wiring at the emit site.

Tracking who's connected

Pass aConnectionInfowhen you build the stream to unlock presence and audience filtering. Without it, a connection is anonymous — it still receives broadcast events, but it is invisible to presence and skipped by audience-filtered events.

from chirp.middleware.auth import get_user
from chirp.pages.reactive import ConnectionInfo

user = get_user()  # AnonymousUser when not signed in
connection = ConnectionInfo(
    session_id=session_id,
    user_id=user.id or None,  # "" for anonymous → None
)

session_id is required; user_id is None for anonymous viewers; connected_at is captured with time.monotonic()at construction.

Read presence off the bus by scope:

viewers = bus.presence("doc-42")
viewer_count = len(viewers)

presence() returns only the connections that supplied a ConnectionInfo. Use on_disconnect(below) to emit a presence-only change event when a tab closes.

Howreactive_stream()runs

reactive_stream() is the one call that ties the bus, the index, and the connection together into an EventStreamyou return from a route:

from chirp import EventStream
from chirp.middleware.auth import get_user
from chirp.middleware.sessions import get_session
from chirp.pages.reactive import ConnectionInfo, reactive_stream

@app.route("/doc/{doc_id}/live", referenced=True)
def live(doc_id: str) -> EventStream:
    session_id = get_session()["sid"]
    user = get_user()  # AnonymousUser when not signed in
    return reactive_stream(
        bus,
        scope=doc_id,
        index=dep_index,
        context_builder=lambda paths: build_doc_context(doc_id, paths),
        origin=session_id,
        connection=ConnectionInfo(session_id=session_id, user_id=user.id or None),
        on_disconnect=lambda scope, conn: notify_left(scope),
    )

For each ChangeEventon the scope, the stream:

  1. 1

    Skips self-caused events

    Iforigin is set and matches the event's origin, the event is skipped.

  2. 2

    Applies audience filtering

    When the event has anaudience, only connections whose user_idis in it receive it.

  3. 3

    Looks up affected blocks

    DependencyIndex.affected_blocks(changed_paths)expands derived/prefix paths and returns the blocks to re-render.

  4. 4

    Builds fresh context

    Callscontext_builder(changed_paths) when the builder takes one argument, or context_builder()for the zero-argument form.

  5. 5

    Yields a fragment per block

    Each affected block streams as aFragmenttargeting its DOM id.

Selective context. The one-argumentcontext_builder receives the exact frozenset[str]of changed paths (after index expansion), so an expensive page can load only what changed:

def build_doc_context(changed_paths: frozenset[str]) -> dict:
    if "doc.comments" in changed_paths:
        return {"comments": store.load_comments()}
    return {"doc": store.load_doc()}

Error boundary. If context_builder()raises, that one event is skipped and the stream stays alive; the next change retries with fresh data.

Disconnect hook. Passon_disconnectwhen presence or cleanup must run when a client drops the SSE connection. The callback receives(scope, connection)— the sameConnectionInfo passed to reactive_stream(..., connection=...). Typical use: emit a presence-onlyChangeEventso remaining tabs refresh their viewer count.

Changed in 0.8

Audience-filtered notifications

Setaudiencewhen a change matters to only some viewers:

bus.emit_sync(ChangeEvent(
    scope="doc-42",
    changed_paths=frozenset({"notifications"}),
    audience=frozenset({"alice", "bob"}),
))

Subscribers without a ConnectionInfo, or with a user_id outside the audience, do not receive it. Broadcast events keep audience=None.

Gotchas

Callingbus.close("doc-42") signals every subscriber on that scope to stop; bus.close()with no argument closes all scopes.

Advanced

ReactiveBus is fully thread-safe and DependencyIndexis read-only after construction, so you build the index once at startup and share both across every handler. See Thread Safety for the guarantees behind that.