Return anEventStreamfrom a handler and Chirp holds the HTTP connection open,
formatting whatever your async generator yields as SSE wire events. Yield a
Fragment and the browser swaps
rendered HTML into the page in real time — no client-side render code.
Reach forEventStreamwhen the page needs updates after it loads:
notifications, a live ticker, a dashboard tail. For a one-shot slow page that
should paint progressively on first load, use
Stream or Suspense instead.
EventStream
Return anEventStreamfrom a route handler to start pushing events:
from chirp import EventStream
@app.route("/events", referenced=True)
async def events():
async def stream():
while True:
data = await get_next_update()
yield data
return EventStream(stream())
The generator yields values. Chirp formats them as SSE wire protocol and sends
them to the client. Mark SSE routes withreferenced=True— they are reached by
asse-connectattribute, not a link the contract checker can see, so the flag
keeps them out of the orphan-route report.
New to SSE? How it compares to WebSockets
Server-Sent Events are a standard browser API for receiving a stream of events from the server over a persistent HTTP connection. Unlike WebSockets, SSE is:
- One-directional — server pushes to client
- Plain HTTP — no protocol upgrade, no special infrastructure
- Auto-reconnecting — the browser reconnects automatically
- Text-based — the simple
text/event-streamformat
Chirp leans into SSE over WebSockets on purpose; see Philosophy for the stance.
Yield Types
The generator can yield different types:
from chirp import Fragment, SSEEvent
async def stream():
# String -- sent as the SSE data field
yield "Hello, World!"
# Dict -- JSON-serialized as the SSE data field
yield {"count": 42, "status": "ok"}
# Fragment -- rendered via kida, sent as a named SSE event.
# target= becomes the SSE event name; without a target the event
# has no name and htmx routes it to the default "message" channel.
yield Fragment("components/notification.html", "alert",
target="notification", message="New alert")
# SSEEvent -- full control over event type, id, retry
yield SSEEvent(data="custom", event="ping", id="1")
Yielding a Fragmentcovers the common case: the rendered HTML is the data and
target becomes the event name a sse-swapattribute can match on. Most apps
never constructSSEEventdirectly.
SSEEvent: full field reference
For fine-grained control over the wire event — explicitidfor reconnection,
aretry hint — yield SSEEventobjects:
from chirp import SSEEvent
async def stream():
yield SSEEvent(
data="User joined",
event="user-join", # Event type (client filters on this)
id="evt-42", # Last-Event-ID for reconnection
retry=5000, # Reconnection interval in ms
)
| Field | Type | Notes |
|---|---|---|
data |
str |
Required. The event payload. |
event |
str | None |
Event name;sse-swap and addEventListenerfilter on it. Single-line only. |
id |
str | None |
Echoed in theLast-Event-IDheader on reconnect. Single-line only. |
retry |
int | None |
Reconnection interval in milliseconds. Must be non-negative. |
Chirp rejects CR/LF/NUL characters in theevent and idmetadata fields so
event payloads cannot inject extra wire-protocol lines.
Reconnect And Replay
Browsers automatically reconnect SSE streams and send the last received event
id in theLast-Event-IDrequest header when your stream yields events with
id:. Chirp preserves SSEEvent(id=...)on the wire, but it does not store or
replay missed events for you — replay is a product concern.
Replaying missed events with a durable cursor
Production-critical streams need a product-owned durable cursor: a database sequence, notification id, post id, queue offset, or another value that can be queried after reconnect.
from chirp import EventStream, Request, SSEEvent
@app.route("/notifications/stream", referenced=True)
async def notifications(request: Request):
last_id = request.headers.get("last-event-id")
async def stream():
async for item in missed_notifications_after(last_id):
yield SSEEvent(event="notification", id=str(item.id), data=item.html)
async for item in live_notifications():
yield SSEEvent(event="notification", id=str(item.id), data=item.html)
return EventStream(stream())
If the product cannot replay missed events, make that degradation explicit: send a refresh event for the affected fragment or document that reconnecting clients may need to reload the page.
Real-Time HTML with htmx
The killer pattern: combine SSE with htmx to push rendered HTML fragments in real-time.
Server:
@app.route("/notifications", referenced=True)
async def notifications():
async def stream():
async for event in notification_bus.subscribe():
yield Fragment("components/notification.html", "alert",
target="notification",
message=event.message,
time=event.timestamp,
)
return EventStream(stream())
Client (using htmx SSE extension):
<div hx-ext="sse" sse-connect="/notifications">
<div sse-swap="notification" hx-swap="beforeend">
<!-- Fragments are swapped in here -->
</div>
</div>
The server renders HTML, the browser swaps it in. Zero client-side JavaScript for the rendering logic. See Fragments for how blocks are selected and rendered.
Live Dashboard Example
A more complete example -- a dashboard that streams stats updates:
import asyncio
from chirp import Fragment
@app.route("/dashboard/live", referenced=True)
async def live_stats():
async def stream():
while True:
stats = await get_current_stats()
# Fragment.target becomes the SSE event name.
# No need to wrap in SSEEvent -- chirp handles it.
yield Fragment("dashboard.html", "stats_panel",
target="stats-update", stats=stats)
await asyncio.sleep(5)
return EventStream(stream())
<section hx-ext="sse"
sse-connect="/dashboard/live"
hx-disinherit="hx-target hx-swap">
<div id="stats" sse-swap="stats-update">
{# Initial stats rendered server-side #}
{% block stats_panel %}
...
{% endblock %}
</div>
</section>
Error Boundaries
Chirp isolates rendering failures per-event so one bad block doesn't crash the entire stream.
If aFragmentfails to render:
- Production (
debug=False): the event is silently skipped, the stream continues - Debug (
debug=True): an error event targets the specific block, replacing it with inline error HTML
<!-- In debug mode, a failed "presence" block becomes: -->
<div class="chirp-block-error" data-block="presence_list">
<strong>UndefinedError</strong>: 'users' is undefined
</div>
All other blocks on the page keep updating normally. The next change event that touches the broken block will attempt to re-render it -- natural recovery without retries.
For reactive streams, if
thecontext_builder()function itself raises (e.g., a deleted record), the
entire event is skipped and the stream waits for the next change. See
Error Reference for the full error hierarchy.
Worker Mode
SSE connections are long-lived: the server holds the HTTP connection open and
streams events as they arrive. The one rule to remember — set
worker_mode="async"for any app that uses SSE:
config = AppConfig(worker_mode="async")
Advanced: why worker mode matters for long-lived connections
The defaultworker_mode="auto"selects sync workers on Python 3.14t
(free-threading) and async workers on a GIL build. Sync workers block one thread
per SSE connection, preventing that worker from handling other requests. With
async workers, SSE streams and request handlers run as concurrent tasks in the
same event loop.
This is especially important for bidirectional patterns (SSE + POST) where
in-memory pub-sub usesasyncio.Queue— the subscriber and emitter must share
the same event loop.
| Mode | SSE support | When to use |
|---|---|---|
"async" |
Full | Apps with SSE, streaming, or long-lived connections |
"auto" |
Falls back to ASGI | Simple request-response apps (no SSE) |
"sync" |
Falls back to ASGI | CPU-bound sync handlers only |
See Configuration for the full
worker_mode and AppConfigreference.
Connection Lifecycle
Chirp manages the SSE connection lifecycle automatically:
Testing SSE
Use theTestClient.sse()method:
from chirp.testing import TestClient
async def test_notifications():
async with TestClient(app) as client:
result = await client.sse("/notifications", max_events=3)
assert len(result.events) == 3
assert "notification" in result.events[0].data
See Testing Assertions for SSE testing details.
Next Steps
- SSE patterns — pub/sub, broadcast, and presence recipes.
- Reactive system and signals — server-reactive values that fan out to SSE.
- Streaming HTML — progressive page rendering without a long-lived connection.
- Testing assertions — testing SSE endpoints.