Module

ai.streaming

AI streaming helpers for SSE + Fragment pattern.

Provides ergonomic wrappers around the core pattern of streaming LLM tokens as re-rendered HTML fragments via Server-Sent Events.

The fundamental pattern::

async def generate():
    text = ""
    async for token in llm.stream(prompt):
        text += token
        yield Fragment("chat.html", "response", text=text)
return EventStream(generate())

This module wraps that pattern into reusable helpers so common cases are one-liners while keeping the underlying primitives accessible.

Functions

stream_to_fragments 5 AsyncIterator[Any]
Wrap an LLM token stream as a Fragment-yielding async generator. Accumulates t…
def stream_to_fragments(tokens: AsyncIterator[str], template_name: str, block_name: str, /, *, context_key: str = 'text', extra_context: dict[str, Any] | None = None) -> AsyncIterator[Any]

Wrap an LLM token stream as a Fragment-yielding async generator.

Accumulates tokens and yields aFragmentwith the accumulated text after each token. The Fragment re-renders the named block with the current text, which htmx swaps into the DOM.

Usage::

from chirp import EventStream
from chirp.ai import LLM
from chirp.ai.streaming import stream_to_fragments

llm = LLM("anthropic:claude-sonnet-4-20250514")

@app.route("/chat", methods=["POST"])
async def chat(request: Request):
    prompt = (await request.form())["prompt"]
    fragments = stream_to_fragments(
        llm.stream(prompt),
        "chat.html", "response",
    )
    return EventStream(fragments)
Parameters
Name Type Description
tokens AsyncIterator[str]

Async iterator of string tokens (fromllm.stream()).

template_name str

Kida template containing the target block.

block_name str

Name of the block to re-render with each update.

context_key str

Template variable name for the accumulated text. Defaults to"text".

Default:'text'
extra_context dict[str, Any] | None

Additional template variables passed to every Fragment render (e.g., user info, metadata).

Default:None
Returns
AsyncIterator[Any]
_stream_fragments 6 AsyncIterator[Any]
Internal generator that accumulates tokens and yields Fragments.
async
async def _stream_fragments(tokens: AsyncIterator[str], template_name: str, block_name: str, *, context_key: str, extra_context: dict[str, Any], fragment_cls: type) -> AsyncIterator[Any]
Parameters
Name Type Description
tokens AsyncIterator[str]
template_name str
block_name str
context_key str
extra_context dict[str, Any]
fragment_cls type
Returns
AsyncIterator[Any]
stream_with_sources 7 AsyncIterator[Any]
Stream LLM tokens as fragments, optionally prefixed with a sources block. Comm…
def stream_with_sources(tokens: AsyncIterator[str], template_name: str, *, response_block: str = 'response', sources_block: str | None = None, sources: Any = None, context_key: str = 'text', extra_context: dict[str, Any] | None = None) -> AsyncIterator[Any]

Stream LLM tokens as fragments, optionally prefixed with a sources block.

Common RAG pattern: send retrieved sources first (immediate), then stream the AI response progressively.

Usage::

@app.route("/ask", methods=["POST"])
async def ask(request: Request):
    question = (await request.form())["question"]
    docs = await db.fetch(Document, "SELECT ... WHERE match(?)", question)
    return EventStream(stream_with_sources(
        llm.stream(f"Context: {docs}\nQ: {question}"),
        "ask.html",
        sources_block="sources",
        sources=docs,
        response_block="answer",
    ))
Parameters
Name Type Description
tokens AsyncIterator[str]

Async iterator of string tokens (fromllm.stream()).

template_name str

Kida template containing the target blocks.

response_block str

Block name for the streaming response.

Default:'response'
sources_block str | None

Block name for the sources (rendered once, first).

Default:None
sources Any

Context value passed to the sources block.

Default:None
context_key str

Template variable name for accumulated text.

Default:'text'
extra_context dict[str, Any] | None

Additional context for all Fragment renders.

Default:None
Returns
AsyncIterator[Any]
_stream_with_sources_impl 8 AsyncIterator[Any]
Internal: yield sources fragment, then stream response fragments.
async
async def _stream_with_sources_impl(tokens: AsyncIterator[str], template_name: str, *, response_block: str, sources_block: str | None, sources: Any, context_key: str, extra_context: dict[str, Any], fragment_cls: type) -> AsyncIterator[Any]
Parameters
Name Type Description
tokens AsyncIterator[str]
template_name str
response_block str
sources_block str | None
sources Any
context_key str
extra_context dict[str, Any]
fragment_cls type
Returns
AsyncIterator[Any]