Classes
ReactiveBus
11
▼
Broadcast channel for data change events.
Thread-safe. Each call to ``subscribe(scope)`` returns …
ReactiveBus
11
▼
Broadcast channel for data change events.
Thread-safe. Each call tosubscribe(scope)returns an async
iterator that yieldsChangeEvents for that scope. When
emit()is called, the event is placed into every matching
subscriber's queue.
Modeled on chirp'sToolEventBusbut scoped per-key.
Methods
emitted_count
0
int
▼
Total number of events emitted (including dropped).
property
emitted_count
0
int
▼
def emitted_count(self) -> int
Returns
int
dropped_count
0
int
▼
Total number of events dropped due to full subscriber queues.
property
dropped_count
0
int
▼
def dropped_count(self) -> int
Returns
int
subscriber_count
0
int
▼
Total number of active subscribers across all scopes.
property
subscriber_count
0
int
▼
def subscriber_count(self) -> int
Returns
int
emit_sync
1
▼
Broadcast a change event synchronously (from any thread).
Uses ``put_nowait`` …
emit_sync
1
▼
def emit_sync(self, event: ChangeEvent) -> None
Broadcast a change event synchronously (from any thread).
Usesput_nowaitso it never blocks. Drops the event for
a subscriber if its queue is full (back-pressure). Dropped
events are logged at WARNING level (throttled per scope).
Ifevent.audienceis set, only delivers to subscribers
whoseConnectionInfo.user_idis in the audience set.
Subscribers withoutConnectionInfoare skipped when
audience filtering is active.
Parameters
| Name | Type | Description |
|---|---|---|
event |
— |
emit
1
▼
Broadcast a change event (async version).
async
emit
1
▼
async def emit(self, event: ChangeEvent) -> None
Parameters
| Name | Type | Description |
|---|---|---|
event |
— |
subscribe
3
AsyncIterator[ChangeEven…
▼
Subscribe to change events for a specific scope.
Yields ``ChangeEvent`` object…
async
subscribe
3
AsyncIterator[ChangeEven…
▼
async def subscribe(self, scope: str, *, connection: ConnectionInfo | None = None, on_disconnect: Callable[[str, ConnectionInfo | None], None] | None = None) -> AsyncIterator[ChangeEvent]
Subscribe to change events for a specific scope.
YieldsChangeEventobjects as they are emitted. The
subscription is automatically cleaned up when the iterator
exits (client disconnects).
Parameters
| Name | Type | Description |
|---|---|---|
scope |
— |
Scope key to subscribe to. |
connection |
— |
Optional identity for this subscriber. Enables audience filtering and presence tracking. Default:None
|
on_disconnect |
— |
Optional callback invoked when this subscriber exits (normal or exception). Receives None
|
Returns
AsyncIterator[ChangeEvent]
close
1
▼
Signal subscribers to stop.
If *scope* is given, only close that scope's subsc…
close
1
▼
def close(self, scope: str | None = None) -> None
Signal subscribers to stop.
If scope is given, only close that scope's subscribers. Otherwise close all.
Parameters
| Name | Type | Description |
|---|---|---|
scope |
— |
Default:None
|
presence
1
frozenset[ConnectionInfo]
▼
Return all active connections for a scope.
Only includes subscribers that prov…
presence
1
frozenset[ConnectionInfo]
▼
def presence(self, scope: str) -> frozenset[ConnectionInfo]
Return all active connections for a scope.
Only includes subscribers that provided aConnectionInfo
at subscribe time.
Parameters
| Name | Type | Description |
|---|---|---|
scope |
— |
Returns
frozenset[ConnectionInfo]
Internal Methods 3 ▼
__init__
2
▼
__init__
2
▼
def __init__(self, *, maxsize: int = 256, on_drop: OnDropCallback | None = None) -> None
Parameters
| Name | Type | Description |
|---|---|---|
maxsize |
— |
Default:256
|
on_drop |
— |
Default:None
|
_schedule_event
3
▼
Enqueue *event* on the queue's owning loop.
``asyncio.Queue`` is not safe to m…
_schedule_event
3
▼
def _schedule_event(self, loop: asyncio.AbstractEventLoop, queue: asyncio.Queue[ChangeEvent | None], event: ChangeEvent) -> None
Enqueue event on the queue's owning loop.
asyncio.Queueis not safe to mutate from arbitrary threads. The
bus is explicitly thread-safe, so cross-thread emitters hand delivery
to the subscriber's event loop.
Parameters
| Name | Type | Description |
|---|---|---|
loop |
— |
|
queue |
— |
|
event |
— |
_log_drop
1
▼
Log a dropped event, throttled to once per scope per interval.
_log_drop
1
▼
def _log_drop(self, event: ChangeEvent) -> None
Parameters
| Name | Type | Description |
|---|---|---|
event |
— |