Classes
ToolCallEvent
5
▼
A single tool invocation event.
Emitted by the ``ToolRegistry`` after each successful tool call.
C…
ToolCallEvent
5
▼
A single tool invocation event.
Emitted by theToolRegistryafter each successful tool call.
Consumed by SSE routes for real-time agent dashboards.
Attributes
| Name | Type | Description |
|---|---|---|
tool_name |
str
|
— |
arguments |
dict[str, Any]
|
— |
result |
Any
|
— |
timestamp |
float
|
— |
call_id |
str
|
— |
ToolEventBus
4
▼
Async broadcast channel for tool call events.
Each call to ``subscribe()`` returns an async iterat…
ToolEventBus
4
▼
Async broadcast channel for tool call events.
Each call tosubscribe()returns an async iterator backed by its
ownasyncio.Queue. When emit()is called, the event is placed
into every active subscriber's queue.
Usage in SSE routes::
async def stream():
async for event in app.tool_events.subscribe():
yield Fragment("dashboard.html", "row", event=event)
return EventStream(stream())
Methods
emit
1
▼
Broadcast an event to all active subscribers.
async
emit
1
▼
async def emit(self, event: ToolCallEvent) -> None
Parameters
| Name | Type | Description |
|---|---|---|
event |
— |
subscribe
0
AsyncIterator[ToolCallEv…
▼
Subscribe to tool call events.
Returns an async iterator that yields events as…
async
subscribe
0
AsyncIterator[ToolCallEv…
▼
async def subscribe(self) -> AsyncIterator[ToolCallEvent]
Subscribe to tool call events.
Returns an async iterator that yields events as they are emitted. The subscription is automatically cleaned up when the iterator exits.
Returns
AsyncIterator[ToolCallEvent]
close
0
▼
Signal all subscribers to stop.
Puts ``None`` into every queue, which causes t…
close
0
▼
def close(self) -> None
Signal all subscribers to stop.
PutsNoneinto every queue, which causes the async iterator
to break cleanly.
Internal Methods 1 ▼
__init__
0
▼
__init__
0
▼
def __init__(self) -> None