Overview
Chirp's data layer is a thin async query interface, not an ORM: you write SQL, and Chirp maps each row to a frozen dataclass. Reach for it when you want typed reads and writes against SQLite (built in, zero extra dependencies) or PostgreSQL (pip install bengal-chirp[data-pg]) without an object-relational mapper in the way.
Pass a connection URL toApp(db=...) and every handler can run queries via app.db or get_db(). SQLite is the right default for development and single-writer apps; switch the connection string to postgresql://...when you need write concurrency.
When to reach for it
| You want… | Reach for |
|---|---|
| Typed reads/writes from raw SQL, no ORM | db.fetch / db.execute(this page) |
| The same query mapped to a model-bound class with named placeholders | Shapes |
| Real-time HTML pushed when a Postgres row changes | db.listen()+ Server-Sent Events |
Setup
The shortest path: pass a connection URL toApp(). SQLite needs no extra dependency.
from chirp import App, Template
app = App(db="sqlite:///app.db")
The database connects on startup and disconnects on shutdown. Access it via app.dbinside any handler:
@app.route("/users")
async def list_users():
users = await app.db.fetch(User, "SELECT * FROM users")
return Template("users.html", users=users)
Pass a Databaseinstance instead of a URL when you need to set the pool size:
from chirp.data import Database
db = Database("postgresql://user:pass@localhost/mydb", pool_size=10)
app = App(db=db)
Standalone usage
UseDatabase directly without an App— for scripts, jobs, or tests:
from chirp.data import Database
async with Database("sqlite:///app.db") as db:
users = await db.fetch(User, "SELECT * FROM users")
Or manage the lifecycle yourself with connect() / disconnect().
get_db()accessor
When usingApp(db=...), reach the database from any handler without threading it through arguments:
from chirp.data import get_db
@app.route("/users")
async def list_users():
db = get_db()
users = await db.fetch(User, "SELECT * FROM users")
return Template("users.html", users=users)
Data models
Define frozen dataclasses whose fields match your query columns:
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class User:
id: int
name: str
email: str
Query results map to these dataclasses automatically. Extra columns are ignored, so SELECT *works even when the dataclass has fewer fields.
Query methods
Five methods cover almost everything. Each takes the SQL placeholder for your driver —? for SQLite, $1for PostgreSQL (see Parameter style below).
fetch— all rows
users = await db.fetch(User, "SELECT * FROM users WHERE active = ?", True)
# Returns: list[User]
fetch_one— single row
user = await db.fetch_one(User, "SELECT * FROM users WHERE id = ?", 42)
# Returns: User | None
fetch_val— scalar value
count = await db.fetch_val("SELECT COUNT(*) FROM users")
# Returns: Any — pass as_type to coerce
count = await db.fetch_val("SELECT COUNT(*) FROM users", as_type=int)
# Returns: int | None
execute— INSERT / UPDATE / DELETE
rows_affected = await db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
"Alice", "alice@example.com",
)
# Returns: int (rows affected)
stream— cursor-based iteration
For large result sets, stream rows without loading everything into memory:
async for user in db.stream(User, "SELECT * FROM users", batch_size=100):
process(user)
Transactions
Wrap multiple statements in an atomic transaction. It auto-commits on a clean exit and auto-rolls back on any exception:
async with db.transaction():
await db.execute(
"INSERT INTO orders (user_id, total) VALUES (?, ?)",
user_id, total,
)
await db.execute(
"UPDATE inventory SET stock = stock - ? WHERE product_id = ?",
quantity, product_id,
)
Two behaviors worth knowing:
- Reads see uncommitted writes. A
fetch_valinside the block counts rows you just inserted but have not committed. - Nesting is transparent. An inner
transaction()joins the outer one — there is no nested savepoint, so both commit together.
async with db.transaction():
await db.execute("INSERT INTO users ...", name, email)
async with db.transaction(): # joins the outer transaction
await db.execute("INSERT INTO profiles ...", user_id)
# both committed together
Common patterns
Build dynamic queries withQuery
Simple queries are fine as raw SQL. But when filters are conditional, string concatenation gets fragile.Query is an immutable builder that follows the same chaining pattern as Response.with_*(): each method returns a new Query, so the original is never mutated.
from chirp.data import Query
@dataclass(frozen=True, slots=True)
class Todo:
id: int
text: str
done: bool
todos = await (
Query(Todo, "todos")
.where("done = ?", False)
.where_if(search, "text LIKE ?", f"%{search}%") # only added if search is truthy
.order_by("id DESC")
.take(20)
.fetch(db)
)
Because a Queryis frozen, you can define a base at module level and branch from it per request without any shared-state risk:
from chirp import Request, Template
ALL_TODOS = Query(Todo, "todos").order_by("id") # safe at module scope — frozen
@app.route("/todos")
async def list_todos(request: Request) -> Template:
search = request.query.get("q")
todos = await (
ALL_TODOS
.where_if(search, "text LIKE ?", f"%{search}%")
.fetch(app.db)
)
return Template("todos.html", todos=todos)
Inspect the exact SQL before it runs — there are no hidden queries:
print(q.sql) # SELECT * FROM todos WHERE done = ? ORDER BY id DESC LIMIT 20
print(q.params) # (False,)
Full Query builder reference
Query delegates execution to the same Databasemethods you already know.
| Method | Returns | Description |
|---|---|---|
select(columns) |
Query[T] |
Columns to SELECT (default*) |
where(clause, *params) |
Query[T] |
Add a WHERE clause (multiple are ANDed) |
where_if(cond, clause, *params) |
Query[T] |
Add a WHERE clause only ifcondis truthy |
order_by(clause) |
Query[T] |
Set ORDER BY |
take(n) |
Query[T] |
Set LIMIT |
skip(n) |
Query[T] |
Set OFFSET |
fetch(db) |
list[T] |
Execute and return all rows |
fetch_one(db) |
T | None |
Execute and return the first row |
count(db) |
int |
COUNT(*)with the same WHERE clauses (ignores LIMIT/OFFSET) |
exists(db) |
bool |
Whether any row matches |
stream(db) |
AsyncIterator[T] |
Yield rows incrementally |
.sql |
str |
The exact SQL that will execute |
.params |
tuple |
The bound parameters |
Batch inserts
await db.execute_many(
"INSERT INTO users (name, email) VALUES (?, ?)",
[("Alice", "a@b.com"), ("Bob", "b@b.com"), ("Carol", "c@b.com")],
)
Log every query
Passecho=Trueto print each statement with its timing to stderr — useful while developing:
db = Database("sqlite:///app.db", echo=True)
# [chirp.data] 0.3ms SELECT * FROM users WHERE active = ? params=(True,)
Migrations
Forward-only SQL migrations live as numbered.sqlfiles. Pending ones run at startup.
- 1
Create numbered `.sql` files
Name files
NNN_description.sql, whereNNNis a zero-padded version number. Each file is plain SQL run as a single statement.-- migrations/001_create_users.sql CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT NOT NULL UNIQUE ) - 2
Point the app at the directory
Pending migrations run automatically on startup, oldest first.
app = App(db="sqlite:///app.db", migrations="migrations/")Or run them yourself from a script with
migrate():from chirp.data import Database, migrate db = Database("sqlite:///app.db") await db.connect() result = await migrate(db, "migrations/") print(result.summary) # "Applied 2 migration(s): 001_create_users, 002_add_email_index" - 3
Let Chirp track what ran
Applied migrations are recorded in a
_chirp_migrationstable. Each migration runs in its own transaction — if one fails, it rolls back without affecting the migrations before it. Runningmigrate()again only applies new files, so it is safe to call on every startup.
Applied migrations are immutable
Chirp records a SHA-256 checksum of each migration's SQL when it applies it (in thechecksum column of _chirp_migrations). On every run it re-hashes the on-disk file and compares. Editing an already-applied NNN_*.sql file fails loud with MigrationError on the next migrate()— naming the file, before applying anything — instead of being silently ignored forever (a real data-corruption footgun when a team "just fixes a typo").
The correct workflow when an applied migration is wrong is to write a new forward migration that corrects it:
-- migrations/003_add_email_index.sql
CREATE INDEX idx_users_email ON users (email);
SQLite vs PostgreSQL
The only thing that changes between drivers is the connection string and the SQL placeholder style.
app = App(db="sqlite:///app.db")app = App(db="postgresql://user:pass@localhost/mydb", pool_size=10)Parameter style
SQLite uses? placeholders; PostgreSQL uses $1, $2, and so on. The mapped class and method are otherwise identical:
await db.fetch(User, "SELECT * FROM users WHERE id = ?", 42)await db.fetch(User, "SELECT * FROM users WHERE id = $1", 42)JSON columns —json_path
Extracting a key out of a JSON column is the one place the dialects genuinely diverge: SQLite wantsjson_extract(col, '$.key'), PostgreSQL wants col->>'key'. Hand-branching on the driver at every call site is exactly the leak json_path exists to stop. It returns a raw SQL expression fragment for the active dialect — drop it straight into a Query.where()or raw-SQL string:
from chirp.data import Query, json_path
# Free function — supply the dialect explicitly (usable without a Database handle):
clause = json_path("oauth", "sub", dialect="sqlite") + " = ?"
# clause == "json_extract(oauth, '$.sub') = ?"
# Bound method — db.json_path() reads the dialect from the connection:
clause = db.json_path("oauth", "sub") + " = ?"
# SQLite: "json_extract(oauth, '$.sub') = ?"
# PostgreSQL: "oauth->>'sub' = ?"
user = await Query(Account, "accounts").where(clause, "user-42").fetch(db)
Nested keys chain: json_path("data", "a", "b", dialect="postgresql") → data->'a'->>'b' (SQLite: json_extract(data, '$.a.b')).
How SQLite and Postgres concurrency differ under the hood
For a file-backed SQLite database, Chirp opens a small bounded pool of WAL-mode connections sized bypool_size. Reads (fetch, fetch_one, fetch_val, stream) acquire any free pooled connection and run concurrently — they do not wait behind an app-wide lock. Writes (execute, execute_many, execute_script, and transaction()) serialize behind a single write lock to honor SQLite's single-writer model. An open write transaction no longer stalls reads.
In-memory SQLite (sqlite:///:memory:) is a development/test convenience and behaves differently. A private :memory:connection is isolated to whichever connection opened it, and shared-cache mode raises lock errors under concurrent reader/writer access. So in-memory databases use a single shared connection and serialize all access — reads included. For concurrent-reader throughput, use a file database (WAL) or PostgreSQL.
PostgreSQL has the strongest concurrency:asyncpg provides a real connection pool with per-transaction isolation, so reads and writes run concurrently up to pool_size.
For Chirp's broader free-threading posture, see Thread Safety.
Advanced: asyncpg on free-threaded Python (3.14t)
Chirp'sdata-pg backend is asyncpg today — a Cython extension, not pure Python. Chirp talks to asyncpg directly through its own anyio-based pool shim, so Chirp does not route Postgres access through a third-party ORM or greenlet bridge. asyncpg's free-threading status remains a caveat for anyone running bengal-chirp[data-pg]on free-threaded builds.
Treat asyncpg-on-3.14t as best-effort: fine for development and typical OLTP workloads, but verify against the current asyncpg release notes before relying on it under heavy Postgres concurrency in production. See Thread Safety for Chirp's broader free-threading posture.
Real-time updates with LISTEN / NOTIFY
PostgreSQL can push notifications when a row changes. Pairdb.listen()with Server-Sent Events to stream HTML the moment data changes:
from chirp import EventStream, Fragment, Request
@app.route("/orders/live")
async def live_orders(request: Request) -> EventStream:
async def generate():
async for note in app.db.listen("new_orders"):
order = await app.db.fetch_one(
Order, "SELECT * FROM orders WHERE id = $1",
int(note.payload),
)
if order:
yield Fragment("orders.html", "order-row", order=order)
return EventStream(generate())
On the database side, fire the notification from a trigger:
CREATE OR REPLACE FUNCTION notify_new_order()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('new_orders', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER order_created
AFTER INSERT ON orders
FOR EACH ROW EXECUTE FUNCTION notify_new_order();
Error handling
All data layer errors inherit fromDataError:
from chirp.data import DataError
from chirp.data.errors import QueryError
try:
await db.execute("INSERT INTO users ...")
except QueryError as e:
print(f"Query failed: {e}")
| Error | When |
|---|---|
DataError |
Base class for all data errors |
QueryError |
SQL execution fails |
DatabaseConnectionError |
Cannot connect to the database |
DriverNotInstalledError |
asyncpg is missing (install bengal-chirp[data-pg]) |
MigrationError |
A migration file is invalid or fails |
Next Steps
- Shapes — Bind a model class to its SQL with named placeholders and
Shape.fetch. - Forms & Validation — Parse and validate the form data you are about to write.
- Server-Sent Events — Push live HTML when a row changes.
- Built-in Middleware — Session middleware for per-user state.