Reactive System

ReactiveBus, DependencyIndex, and reactive_stream — automatic SSE updates from data changes

5 min read 1001 words

Overview

Chirp's reactive system connects data mutations to SSE-powered UI updates. When data changes, the system figures out which template blocks are affected and pushes re-rendered fragments to connected browsers — automatically.

Three components work together:

Component Role
ReactiveBus Thread-safe pub/sub event bus scoped by key
DependencyIndex Maps context paths to the template blocks that depend on them
reactive_stream() Glues the bus and index into anEventStreamreturn value

ReactiveBus

The bus broadcastsChangeEvents from any thread to async subscribers.

from chirp.pages.reactive import ReactiveBus, ChangeEvent

bus = ReactiveBus()

# Emit from any thread (e.g., a background worker or POST handler)
bus.emit_sync(ChangeEvent(
    scope="doc-42",
    changed_paths=frozenset({"doc.content", "doc.version"}),
    origin="user-7",   # optional: skip notifying the author
))

Subscribers are async iterators scoped by key:

async for event in bus.subscribe("doc-42"):
    print(event.changed_paths)

Calling bus.close("doc-42") signals all subscribers on that scope to stop. bus.close()(no args) closes everything.

Back-Pressure

Each subscriber gets its ownasyncio.Queue. When a subscriber's queue is full, events are silently dropped — the bus never blocks the emitter.

bus = ReactiveBus(maxsize=64)   # default: 256

Monitor back-pressure with the observability counters:

bus.emitted_count      # total events emitted (including dropped)
bus.dropped_count      # events lost to full queues
bus.subscriber_count   # active subscribers across all scopes

All counters are ints maintained by the bus for observability.

ConnectionInfo

Subscriber identity is optional, but it unlocks audience filtering and presence:

from chirp.pages.reactive import ConnectionInfo

connection = ConnectionInfo(session_id=session_id, user_id=current_user.id)

session_id is required. user_id can be Nonefor anonymous viewers. connected_at is captured with time.monotonic()when the dataclass is created.

ChangeEvent

A frozen dataclass emitted after a data mutation:

@dataclass(frozen=True, slots=True)
class ChangeEvent:
    scope: str                       # e.g., a document ID
    changed_paths: frozenset[str]    # e.g., {"doc.content", "doc.version"}
    origin: str | None = None        # who caused this change
    audience: frozenset[str] | None = None
  • scope scopes delivery — subscribers only receive events for their scope.
  • changed_paths tells theDependencyIndexwhich blocks need re-rendering.
  • origin enables self-suppression:reactive_stream()skips events whose origin matches the current connection, so the client that caused the change isn't notified of it.
  • audience narrows delivery to subscribers whoseConnectionInfo.user_id is present in the set.Nonebroadcasts to every subscriber in the scope.

DependencyIndex

Built at app startup from kida's static block analysis. Maps context paths (like"doc.content") to the template blocks that display them.

Registration

Two approaches:

Manual — register specific blocks:

from chirp.pages.reactive.index import DependencyIndex

index = DependencyIndex()
index.register_template(env, "doc/{doc_id}/_layout.html",
    block_names=["title", "content", "word_count"],
    dom_id_map={"title": "doc-title", "content": "doc-body"},
)

Auto from SSE swaps — scan a template for sse-swapelements and register only those blocks:

source = env.loader.get_source(env, "page.html")[0]
index.register_from_sse_swaps(env, "page.html", source,
    exclude_blocks={"editor_content"},  # client-managed, don't re-render
)

Derived Paths

Declare computed relationships between context paths. When a source path changes, derived paths are automatically included in the affected set:

index.derive("doc.word_count", from_paths={"doc.content"})
index.derive("doc.summary", from_paths={"doc.content", "doc.title"})

Derivations are transitive: if A derives from B and B derives from C, changing C invalidates A, B, and C.

The store emits only what actually mutated. Display blocks that depend on computed values update without extra wiring.

Querying

blocks = index.affected_blocks(frozenset({"doc.content"}))
# Returns: [BlockRef(template_name="page.html", block_name="content"), ...]

deps = index.block_dependencies("page.html", "content")
# Returns the context paths that can cause that block to re-render.

Prefix matching is built in — changing "doc" affects blocks that depend on "doc.version", and vice versa.

Debugging

index.explain_affected(frozenset({"doc.content"}))
# {
#   "original_paths": {"doc.content"},
#   "expanded_paths": {"doc.content", "doc.word_count", "doc.summary"},
#   "derived_paths": {"doc.word_count", "doc.summary"},
#   "affected_blocks": [{"template": "page.html", "block": "content", "target": "doc-body"}, ...]
# }

reactive_stream()

The one-liner that ties everything together:

from chirp.pages.reactive import ConnectionInfo, reactive_stream

@app.route("/doc/{doc_id}/live")
def live(doc_id: str) -> EventStream:
    return reactive_stream(
        bus,
        scope=doc_id,
        index=dep_index,
        context_builder=lambda paths: build_doc_context(doc_id, paths),
        origin=session_id,
        connection=ConnectionInfo(session_id=session_id, user_id=current_user.id),
        on_disconnect=lambda scope, connection: audit_disconnect(scope, connection),
    )

What happens on each ChangeEvent:

  1. Skip iforiginmatches (self-suppression)
  2. Deliver only to matchingconnection.user_id when audienceis set
  3. Look up affected blocks viaDependencyIndex
  4. Callcontext_builder(changed_paths)for fresh data when the builder accepts one argument; zero-argument builders still work for older apps
  5. Yield aFragmentper affected block

Error boundary: ifcontext_builder()raises, the event is skipped and the stream continues. The next change event retries with fresh data.

Presence

Connection-aware streams are visible through the bus:

viewers = bus.presence("doc-42")
viewer_count = len(viewers)

Presence only includes subscribers that passed ConnectionInfo. Use on_disconnectfor cleanup or to emit a presence-only change event after a tab closes.

Audience Filtering

Useaudiencewhen a change is relevant to only some connected users:

bus.emit_sync(ChangeEvent(
    scope="doc-42",
    changed_paths=frozenset({"notifications"}),
    audience=frozenset({"alice", "bob"}),
))

Subscribers without ConnectionInfo, or with a user_idoutside the audience, do not receive that event. Broadcast events keepaudience=None.

Contract Validation

chirp checkvalidates the reactive system at startup:

Check Severity What it catches
reactive_block ERROR BlockRefreferences a non-existent template block (typo or renamed block)
reactive_cycle WARNING Derivation graph contains a cycle
reactive_paths WARNING Declared emitted paths are not registered in the dependency index
reactive_audience WARNING Audience-filtered scopes have no connection-aware streams

These checks are only active when the app usesDependencyIndexor declares reactive metadata:

app.set_contract_check_data("reactive_emitted_paths", {"tasks", "presence"})
app.set_contract_check_data("reactive_audience_scopes", {"thread:42"})
app.set_contract_check_data("reactive_connection_scopes", {"thread:42"})

Thread Safety

ReactiveBus is fully thread-safe — emit_sync() is designed to be called from any thread (background workers, sync POST handlers, etc.). The bus uses a single threading.Lockprotecting the subscriber registry and counters.

DependencyIndexis thread-safe after construction (read-only at runtime). Build it during app startup, then share it across all request handlers.

All Lock-protected paths have dedicated concurrency stress tests. See Thread Safety for the full story.

Next Steps