# Server-Sent Events URL: /chirp/docs/build-apps/streaming-updates/server-sent-events/ Section: streaming-updates Tags: sse, real-time, events, htmx -------------------------------------------------------------------------------- What Are SSE? Server-Sent Events (SSE) are a standard browser API for receiving a stream of events from the server over a persistent HTTP connection. Unlike WebSockets, SSE is: One-directional -- server pushes to client Plain HTTP -- no protocol upgrade, no special infrastructure Auto-reconnecting -- the browser reconnects automatically Text-based -- simple text/event-stream format EventStream Return an EventStream from a route handler to start pushing events: from chirp import EventStream @app.route("/events") async def events(): async def stream(): while True: data = await get_next_update() yield data return EventStream(stream()) The generator yields values. Chirp formats them as SSE wire protocol and sends them to the client. Yield Types The generator can yield different types: async def stream(): # String -- sent as SSE data field yield "Hello, World!" # Dict -- JSON-serialized as SSE data yield {"count": 42, "status": "ok"} # Fragment -- rendered via kida, sent as named SSE event # target= becomes the SSE event name (default: "fragment") yield Fragment("components/notification.html", "alert", target="notification", message="New alert") # SSEEvent -- full control over event type, id, retry yield SSEEvent(data="custom", event="ping", id="1") SSEEvent For fine-grained control, yield SSEEvent objects: from chirp import SSEEvent async def stream(): yield SSEEvent( data="User joined", event="user-join", # Event type (client filters on this) id="evt-42", # Last-Event-ID for reconnection retry=5000, # Reconnection interval in ms ) event and id must be single-line values, and retry must be non-negative. Chirp rejects CR/LF/NUL characters in SSE metadata fields so event payloads cannot inject extra wire-protocol lines. Reconnect And Replay Browsers automatically reconnect SSE streams and send the last received event id in the Last-Event-ID request header when your stream yields events with id:. Chirp preserves SSEEvent(id=...) on the wire, but it does not store or replay missed events for you. Production-critical streams need a product-owned durable cursor: a database sequence, notification id, post id, queue offset, or another value that can be queried after reconnect. from chirp import EventStream, SSEEvent @app.route("/notifications/stream") async def notifications(request): last_id = request.headers.get("last-event-id") async def stream(): async for item in missed_notifications_after(last_id): yield SSEEvent(event="notification", id=str(item.id), data=item.html) async for item in live_notifications(): yield SSEEvent(event="notification", id=str(item.id), data=item.html) return EventStream(stream()) If the product cannot replay missed events, make that degradation explicit: send a refresh event for the affected fragment or document that reconnecting clients may need to reload the page. Real-Time HTML with htmx The killer pattern: combine SSE with htmx to push rendered HTML fragments in real-time. Server: @app.route("/notifications") async def notifications(): async def stream(): async for event in notification_bus.subscribe(): yield Fragment("components/notification.html", "alert", target="notification", message=event.message, time=event.timestamp, ) return EventStream(stream()) Client (using htmx SSE extension): <div hx-ext="sse" sse-connect="/notifications"> <div sse-swap="notification" hx-swap="beforeend"> <!-- Fragments are swapped in here --> </div> </div> Important: sse-swap must be on a child of the sse-connect element, not the same element. htmx uses querySelectorAll internally, which does not include the root element itself. The server renders HTML, the browser swaps it in. Zero client-side JavaScript for the rendering logic. Live Dashboard Example A more complete example -- a dashboard that streams stats updates: @app.route("/dashboard/live") async def live_stats(): async def stream(): while True: stats = await get_current_stats() # Fragment.target becomes the SSE event name. # No need to wrap in SSEEvent -- chirp handles it. yield Fragment("dashboard.html", "stats_panel", target="stats-update", stats=stats) await asyncio.sleep(5) return EventStream(stream()) <section hx-ext="sse" sse-connect="/dashboard/live" hx-disinherit="hx-target hx-swap"> <div id="stats" sse-swap="stats-update"> {# Initial stats rendered server-side #} {% block stats_panel %} ... {% endblock %} </div> </section> Tip: Prevent layout-level hx-target from bleeding into SSE swaps. Use either hx-disinherit="hx-target hx-swap" on the sse-connect element, or add hx-target="this" on the sse-connect element (safe_target middleware can auto-inject this). Error Boundaries Chirp isolates rendering failures per-event so one bad block doesn't crash the entire stream. If a Fragment fails to render: Production (debug=False): the event is silently skipped, the stream continues Debug (debug=True): an error event targets the specific block, replacing it with inline error HTML <!-- In debug mode, a failed "presence" block becomes: --> <div class="chirp-block-error" data-block="presence_list"> <strong>UndefinedError</strong>: &#x27;users&#x27; is undefined </div> All other blocks on the page keep updating normally. The next change event that touches the broken block will attempt to re-render it -- natural recovery without retries. For reactive streams, if the context_builder() function itself raises (e.g., a deleted record), the entire event is skipped and the stream waits for the next change. See Error Reference for the full error hierarchy. Worker Mode SSE connections are long-lived -- the server holds the HTTP connection open and streams events as they arrive. This has implications for worker configuration. Use worker_mode="async" for any app that uses SSE or streaming responses: config = AppConfig(worker_mode="async") The default worker_mode="auto" selects sync workers on Python 3.14t (free-threading). Sync workers block one thread per SSE connection, preventing that worker from handling other requests. With async workers, SSE streams and request handlers run as concurrent tasks in the same event loop. This is especially important for bidirectional patterns (SSE + POST) where in-memory pub-sub uses asyncio.Queue -- the subscriber and emitter must share the same event loop. Mode SSE support When to use "async" Full Apps with SSE, streaming, or long-lived connections "auto" Falls back to ASGI Simple request-response apps (no SSE) "sync" Falls back to ASGI CPU-bound sync handlers only Connection Lifecycle Chirp manages the SSE connection lifecycle automatically: 1Event producerConsumes the generator, formats events, sends as ASGI body chunks. 2Disconnect monitorWatches for http.disconnect, cancels the producer when the client disconnects. 3HeartbeatSends : heartbeat comments on idle to keep the connection alive. Testing SSE Use the TestClient.sse() method: from chirp.testing import TestClient async def test_notifications(): async with TestClient(app) as client: result = await client.sse("/notifications", max_events=3) assert len(result.events) == 3 assert "notification" in result.events[0].data See Testing Assertions for SSE testing details. Next Steps Streaming HTML -- Progressive page rendering Fragments -- How fragments are rendered Assertions -- Testing SSE endpoints -------------------------------------------------------------------------------- Metadata: - Word Count: 957 - Reading Time: 5 minutes