Module

pages.reactive.bus

Reactive event bus for change event broadcasting.

Classes

ReactiveBus 11
Broadcast channel for data change events. Thread-safe. Each call to ``subscribe(scope)`` returns …

Broadcast channel for data change events.

Thread-safe. Each call tosubscribe(scope)returns an async iterator that yieldsChangeEvents for that scope. When emit()is called, the event is placed into every matching subscriber's queue.

Modeled on chirp'sToolEventBusbut scoped per-key.

Methods

emitted_count 0 int
Total number of events emitted (including dropped).
property
def emitted_count(self) -> int
Returns
int
dropped_count 0 int
Total number of events dropped due to full subscriber queues.
property
def dropped_count(self) -> int
Returns
int
subscriber_count 0 int
Total number of active subscribers across all scopes.
property
def subscriber_count(self) -> int
Returns
int
emit_sync 1
Broadcast a change event synchronously (from any thread). Uses ``put_nowait`` …
def emit_sync(self, event: ChangeEvent) -> None

Broadcast a change event synchronously (from any thread).

Usesput_nowaitso it never blocks. Drops the event for a subscriber if its queue is full (back-pressure). Dropped events are logged at WARNING level (throttled per scope).

Ifevent.audienceis set, only delivers to subscribers whoseConnectionInfo.user_idis in the audience set. Subscribers withoutConnectionInfoare skipped when audience filtering is active.

Parameters
Name Type Description
event
emit 1
Broadcast a change event (async version).
async
async def emit(self, event: ChangeEvent) -> None
Parameters
Name Type Description
event
subscribe 3 AsyncIterator[ChangeEven…
Subscribe to change events for a specific scope. Yields ``ChangeEvent`` object…
async
async def subscribe(self, scope: str, *, connection: ConnectionInfo | None = None, on_disconnect: Callable[[str, ConnectionInfo | None], None] | None = None) -> AsyncIterator[ChangeEvent]

Subscribe to change events for a specific scope.

YieldsChangeEventobjects as they are emitted. The subscription is automatically cleaned up when the iterator exits (client disconnects).

Parameters
Name Type Description
scope

Scope key to subscribe to.

connection

Optional identity for this subscriber. Enables audience filtering and presence tracking.

Default:None
on_disconnect

Optional callback invoked when this subscriber exits (normal or exception). Receives(scope, connection).

Default:None
Returns
AsyncIterator[ChangeEvent]
close 1
Signal subscribers to stop. If *scope* is given, only close that scope's subsc…
def close(self, scope: str | None = None) -> None

Signal subscribers to stop.

If scope is given, only close that scope's subscribers. Otherwise close all.

Parameters
Name Type Description
scope Default:None
presence 1 frozenset[ConnectionInfo]
Return all active connections for a scope. Only includes subscribers that prov…
def presence(self, scope: str) -> frozenset[ConnectionInfo]

Return all active connections for a scope.

Only includes subscribers that provided aConnectionInfo at subscribe time.

Parameters
Name Type Description
scope
Returns
frozenset[ConnectionInfo]
Internal Methods 3
__init__ 2
def __init__(self, *, maxsize: int = 256, on_drop: OnDropCallback | None = None) -> None
Parameters
Name Type Description
maxsize Default:256
on_drop Default:None
_schedule_event 3
Enqueue *event* on the queue's owning loop. ``asyncio.Queue`` is not safe to m…
def _schedule_event(self, loop: asyncio.AbstractEventLoop, queue: asyncio.Queue[ChangeEvent | None], event: ChangeEvent) -> None

Enqueue event on the queue's owning loop.

asyncio.Queueis not safe to mutate from arbitrary threads. The bus is explicitly thread-safe, so cross-thread emitters hand delivery to the subscriber's event loop.

Parameters
Name Type Description
loop
queue
event
_log_drop 1
Log a dropped event, throttled to once per scope per interval.
def _log_drop(self, event: ChangeEvent) -> None
Parameters
Name Type Description
event