Database

Async SQL access that maps rows to frozen dataclasses — SQLite built in, PostgreSQL via one extra.

Page actions AI-ready formats and sharing
Open LLM text
Share with AI
Ask Claude Ask ChatGPT Ask Gemini Ask Copilot

Overview

Chirp's data layer is a thin async query interface, not an ORM: you write SQL, and Chirp maps each row to a frozen dataclass. Reach for it when you want typed reads and writes against SQLite (built in, zero extra dependencies) or PostgreSQL (pip install bengal-chirp[data-pg]) without an object-relational mapper in the way.

Pass a connection URL toApp(db=...) and every handler can run queries via app.db or get_db(). SQLite is the right default for development and single-writer apps; switch the connection string to postgresql://...when you need write concurrency.

When to reach for it

You want… Reach for
Typed reads/writes from raw SQL, no ORM db.fetch / db.execute(this page)
The same query mapped to a model-bound class with named placeholders Shapes
Real-time HTML pushed when a Postgres row changes db.listen()+ Server-Sent Events

Setup

The shortest path: pass a connection URL toApp(). SQLite needs no extra dependency.

from chirp import App, Template

app = App(db="sqlite:///app.db")

The database connects on startup and disconnects on shutdown. Access it via app.dbinside any handler:

@app.route("/users")
async def list_users():
    users = await app.db.fetch(User, "SELECT * FROM users")
    return Template("users.html", users=users)

Pass a Databaseinstance instead of a URL when you need to set the pool size:

from chirp.data import Database

db = Database("postgresql://user:pass@localhost/mydb", pool_size=10)
app = App(db=db)

Standalone usage

UseDatabase directly without an App— for scripts, jobs, or tests:

from chirp.data import Database

async with Database("sqlite:///app.db") as db:
    users = await db.fetch(User, "SELECT * FROM users")

Or manage the lifecycle yourself with connect() / disconnect().

get_db()accessor

When usingApp(db=...), reach the database from any handler without threading it through arguments:

from chirp.data import get_db

@app.route("/users")
async def list_users():
    db = get_db()
    users = await db.fetch(User, "SELECT * FROM users")
    return Template("users.html", users=users)

Data models

Define frozen dataclasses whose fields match your query columns:

from dataclasses import dataclass

@dataclass(frozen=True, slots=True)
class User:
    id: int
    name: str
    email: str

Query results map to these dataclasses automatically. Extra columns are ignored, so SELECT *works even when the dataclass has fewer fields.

Query methods

Five methods cover almost everything. Each takes the SQL placeholder for your driver —? for SQLite, $1for PostgreSQL (see Parameter style below).

fetch— all rows

users = await db.fetch(User, "SELECT * FROM users WHERE active = ?", True)
# Returns: list[User]

fetch_one— single row

user = await db.fetch_one(User, "SELECT * FROM users WHERE id = ?", 42)
# Returns: User | None

fetch_val— scalar value

count = await db.fetch_val("SELECT COUNT(*) FROM users")
# Returns: Any — pass as_type to coerce
count = await db.fetch_val("SELECT COUNT(*) FROM users", as_type=int)
# Returns: int | None

execute— INSERT / UPDATE / DELETE

rows_affected = await db.execute(
    "INSERT INTO users (name, email) VALUES (?, ?)",
    "Alice", "alice@example.com",
)
# Returns: int (rows affected)

stream— cursor-based iteration

For large result sets, stream rows without loading everything into memory:

async for user in db.stream(User, "SELECT * FROM users", batch_size=100):
    process(user)

Transactions

Wrap multiple statements in an atomic transaction. It auto-commits on a clean exit and auto-rolls back on any exception:

async with db.transaction():
    await db.execute(
        "INSERT INTO orders (user_id, total) VALUES (?, ?)",
        user_id, total,
    )
    await db.execute(
        "UPDATE inventory SET stock = stock - ? WHERE product_id = ?",
        quantity, product_id,
    )

Two behaviors worth knowing:

  • Reads see uncommitted writes. Afetch_valinside the block counts rows you just inserted but have not committed.
  • Nesting is transparent. An innertransaction()joins the outer one — there is no nested savepoint, so both commit together.
async with db.transaction():
    await db.execute("INSERT INTO users ...", name, email)
    async with db.transaction():  # joins the outer transaction
        await db.execute("INSERT INTO profiles ...", user_id)
    # both committed together

Common patterns

Build dynamic queries withQuery

Simple queries are fine as raw SQL. But when filters are conditional, string concatenation gets fragile.Query is an immutable builder that follows the same chaining pattern as Response.with_*(): each method returns a new Query, so the original is never mutated.

from chirp.data import Query

@dataclass(frozen=True, slots=True)
class Todo:
    id: int
    text: str
    done: bool

todos = await (
    Query(Todo, "todos")
    .where("done = ?", False)
    .where_if(search, "text LIKE ?", f"%{search}%")  # only added if search is truthy
    .order_by("id DESC")
    .take(20)
    .fetch(db)
)

Because a Queryis frozen, you can define a base at module level and branch from it per request without any shared-state risk:

from chirp import Request, Template

ALL_TODOS = Query(Todo, "todos").order_by("id")  # safe at module scope — frozen

@app.route("/todos")
async def list_todos(request: Request) -> Template:
    search = request.query.get("q")
    todos = await (
        ALL_TODOS
        .where_if(search, "text LIKE ?", f"%{search}%")
        .fetch(app.db)
    )
    return Template("todos.html", todos=todos)

Inspect the exact SQL before it runs — there are no hidden queries:

print(q.sql)     # SELECT * FROM todos WHERE done = ? ORDER BY id DESC LIMIT 20
print(q.params)  # (False,)

Batch inserts

await db.execute_many(
    "INSERT INTO users (name, email) VALUES (?, ?)",
    [("Alice", "a@b.com"), ("Bob", "b@b.com"), ("Carol", "c@b.com")],
)

Log every query

Passecho=Trueto print each statement with its timing to stderr — useful while developing:

db = Database("sqlite:///app.db", echo=True)
# [chirp.data]  0.3ms  SELECT * FROM users WHERE active = ?  params=(True,)

Migrations

Forward-only SQL migrations live as numbered.sqlfiles. Pending ones run at startup.

  1. 1

    Create numbered `.sql` files

    Name filesNNN_description.sql, where NNNis a zero-padded version number. Each file is plain SQL run as a single statement.

    -- migrations/001_create_users.sql
    CREATE TABLE users (
        id    INTEGER PRIMARY KEY AUTOINCREMENT,
        name  TEXT NOT NULL,
        email TEXT NOT NULL UNIQUE
    )
    
  2. 2

    Point the app at the directory

    Pending migrations run automatically on startup, oldest first.

    app = App(db="sqlite:///app.db", migrations="migrations/")
    

    Or run them yourself from a script with migrate():

    from chirp.data import Database, migrate
    
    db = Database("sqlite:///app.db")
    await db.connect()
    result = await migrate(db, "migrations/")
    print(result.summary)
    # "Applied 2 migration(s): 001_create_users, 002_add_email_index"
    
  3. 3

    Let Chirp track what ran

    Applied migrations are recorded in a_chirp_migrations table. Each migration runs in its own transaction — if one fails, it rolls back without affecting the migrations before it. Running migrate()again only applies new files, so it is safe to call on every startup.

Applied migrations are immutable

Chirp records a SHA-256 checksum of each migration's SQL when it applies it (in thechecksum column of _chirp_migrations). On every run it re-hashes the on-disk file and compares. Editing an already-applied NNN_*.sql file fails loud with MigrationError on the next migrate()— naming the file, before applying anything — instead of being silently ignored forever (a real data-corruption footgun when a team "just fixes a typo").

The correct workflow when an applied migration is wrong is to write a new forward migration that corrects it:

-- migrations/003_add_email_index.sql
CREATE INDEX idx_users_email ON users (email);

SQLite vs PostgreSQL

The only thing that changes between drivers is the connection string and the SQL placeholder style.

app = App(db="sqlite:///app.db")
app = App(db="postgresql://user:pass@localhost/mydb", pool_size=10)

Parameter style

SQLite uses? placeholders; PostgreSQL uses $1, $2, and so on. The mapped class and method are otherwise identical:

await db.fetch(User, "SELECT * FROM users WHERE id = ?", 42)
await db.fetch(User, "SELECT * FROM users WHERE id = $1", 42)

JSON columns —json_path

Extracting a key out of a JSON column is the one place the dialects genuinely diverge: SQLite wantsjson_extract(col, '$.key'), PostgreSQL wants col->>'key'. Hand-branching on the driver at every call site is exactly the leak json_path exists to stop. It returns a raw SQL expression fragment for the active dialect — drop it straight into a Query.where()or raw-SQL string:

from chirp.data import Query, json_path

# Free function — supply the dialect explicitly (usable without a Database handle):
clause = json_path("oauth", "sub", dialect="sqlite") + " = ?"
# clause == "json_extract(oauth, '$.sub') = ?"

# Bound method — db.json_path() reads the dialect from the connection:
clause = db.json_path("oauth", "sub") + " = ?"
# SQLite:     "json_extract(oauth, '$.sub') = ?"
# PostgreSQL: "oauth->>'sub' = ?"

user = await Query(Account, "accounts").where(clause, "user-42").fetch(db)

Nested keys chain: json_path("data", "a", "b", dialect="postgresql")data->'a'->>'b' (SQLite: json_extract(data, '$.a.b')).

Real-time updates with LISTEN / NOTIFY

PostgreSQL can push notifications when a row changes. Pairdb.listen()with Server-Sent Events to stream HTML the moment data changes:

from chirp import EventStream, Fragment, Request

@app.route("/orders/live")
async def live_orders(request: Request) -> EventStream:
    async def generate():
        async for note in app.db.listen("new_orders"):
            order = await app.db.fetch_one(
                Order, "SELECT * FROM orders WHERE id = $1",
                int(note.payload),
            )
            if order:
                yield Fragment("orders.html", "order-row", order=order)
    return EventStream(generate())

On the database side, fire the notification from a trigger:

CREATE OR REPLACE FUNCTION notify_new_order()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('new_orders', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER order_created
    AFTER INSERT ON orders
    FOR EACH ROW EXECUTE FUNCTION notify_new_order();

Error handling

All data layer errors inherit fromDataError:

from chirp.data import DataError
from chirp.data.errors import QueryError

try:
    await db.execute("INSERT INTO users ...")
except QueryError as e:
    print(f"Query failed: {e}")
Error When
DataError Base class for all data errors
QueryError SQL execution fails
DatabaseConnectionError Cannot connect to the database
DriverNotInstalledError asyncpg is missing (install bengal-chirp[data-pg])
MigrationError A migration file is invalid or fails

Next Steps