Overview
Chirp'sdatamodule provides typed async database access. SQL in, frozen dataclasses out. It is not an ORM -- you write SQL, and chirp maps the results to typed Python objects.
Note
Requires optional extras:pip install bengal-chirp[data] for SQLite or pip install bengal-chirp[data-pg]for PostgreSQL.
Setup
App Integration
The simplest setup -- pass a connection URL toApp():
from chirp import App
app = App(db="sqlite:///app.db")
The database connects on startup and disconnects on shutdown automatically. Access it via app.db:
@app.route("/users")
async def list_users():
users = await app.db.fetch(User, "SELECT * FROM users")
return Template("users.html", users=users)
You can also pass a Databaseinstance for more control:
from chirp.data import Database
db = Database("postgresql://user:pass@localhost/mydb", pool_size=10)
app = App(db=db)
Standalone Usage
UseDatabasedirectly without the app:
from chirp.data import Database
db = Database("sqlite:///app.db")
await db.connect()
# ... use db ...
await db.disconnect()
Or as an async context manager:
async with Database("sqlite:///app.db") as db:
users = await db.fetch(User, "SELECT * FROM users")
get_db()Accessor
When usingApp(db=...), the database is available from any handler via get_db():
from chirp.data import get_db
@app.route("/users")
async def list_users():
db = get_db()
users = await db.fetch(User, "SELECT * FROM users")
return Template("users.html", users=users)
Data Models
Define frozen dataclasses that match your query columns:
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class User:
id: int
name: str
email: str
Query results are mapped to these dataclasses automatically. Extra columns are silently ignored -- SELECT *works even if the dataclass has fewer fields.
Query Methods
fetch-- All Rows
users = await db.fetch(User, "SELECT * FROM users WHERE active = ?", True)
# Returns: list[User]
fetch_one-- Single Row
user = await db.fetch_one(User, "SELECT * FROM users WHERE id = ?", 42)
# Returns: User | None
fetch_val-- Scalar Value
count = await db.fetch_val("SELECT COUNT(*) FROM users")
# Returns: Any
count = await db.fetch_val("SELECT COUNT(*) FROM users", as_type=int)
# Returns: int | None
execute-- INSERT / UPDATE / DELETE
rows_affected = await db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
"Alice", "alice@example.com",
)
# Returns: int (rows affected)
execute_many-- Batch Operations
await db.execute_many(
"INSERT INTO users (name, email) VALUES (?, ?)",
[("Alice", "a@b.com"), ("Bob", "b@b.com"), ("Carol", "c@b.com")],
)
stream-- Cursor-Based Iteration
For large result sets, stream rows without loading everything into memory:
async for user in db.stream(User, "SELECT * FROM users", batch_size=100):
process(user)
Reference
| Method | Returns | Description |
|---|---|---|
fetch(cls, sql, *params) |
list[T] |
All matching rows as dataclasses |
fetch_one(cls, sql, *params) |
T | None |
First row or None |
fetch_val(sql, *params) |
Any |
First column of first row |
execute(sql, *params) |
int |
Rows affected |
execute_many(sql, params_seq) |
int |
Total rows affected |
stream(cls, sql, *params) |
AsyncIterator[T] |
Cursor-based row iteration |
Query Builder
For dynamic queries with optional filters, useQuery — an immutable query builder that follows the same chaining pattern as Response.with_*(). Each method returns a new Query, so the original is never mutated.
from chirp.data import Query
@dataclass(frozen=True, slots=True)
class Todo:
id: int
text: str
done: bool
# Build a query with optional filters
todos = await (
Query(Todo, "todos")
.where("done = ?", False)
.where_if(search, "text LIKE ?", f"%{search}%")
.order_by("id DESC")
.take(20)
.fetch(db)
)
Why Not Just SQL?
Simple queries are fine as raw SQL. But when filters are conditional, string concatenation gets ugly fast:
# Without Query — fragile string building
sql = "SELECT * FROM todos WHERE 1=1"
params = []
if status:
sql += " AND done = ?"
params.append(status == "done")
if search:
sql += " AND text LIKE ?"
params.append(f"%{search}%")
sql += " ORDER BY id DESC LIMIT 20"
todos = await db.fetch(Todo, sql, *params)
# With Query — clean, chainable, immutable
todos = await (
Query(Todo, "todos")
.where_if(status, "done = ?", status == "done")
.where_if(search, "text LIKE ?", f"%{search}%")
.order_by("id DESC")
.take(20)
.fetch(db)
)
Building Queries
Every method returns a new frozenQuery. Chain them in any order:
q = (
Query(Todo, "todos")
.select("id, text, done") # columns (default: *)
.where("done = ?", False) # WHERE clause (multiple are ANDed)
.where_if(search, "text LIKE ?", f"%{search}%") # conditional WHERE
.order_by("id DESC") # ORDER BY
.take(20) # LIMIT
.skip(40) # OFFSET
)
Transparency
Inspect exactly what SQL will run — no hidden queries:
print(q.sql)
# SELECT id, text, done FROM todos WHERE done = ? AND text LIKE ? ORDER BY id DESC LIMIT 20 OFFSET 40
print(q.params)
# (False, '%milk%')
Executing Queries
Query delegates to the same Databasemethods you already know:
# All matching rows
todos = await q.fetch(db) # list[Todo]
# First match
todo = await q.fetch_one(db) # Todo | None
# Count matching rows (ignores LIMIT/OFFSET)
n = await q.count(db) # int
# Check existence
found = await q.exists(db) # bool
# Stream large results
async for todo in q.stream(db): # AsyncIterator[Todo]
process(todo)
Reusable Queries
Since queries are immutable, define a base query once and branch from it:
# Module-level — safe because it's frozen
ALL_TODOS = Query(Todo, "todos").order_by("id")
# In handlers — branch from the base
@app.route("/todos")
async def list_todos(request):
search = request.query.get("q")
todos = await (
ALL_TODOS
.where_if(search, "text LIKE ?", f"%{search}%")
.fetch(app.db)
)
return Template("todos.html", todos=todos)
Reference
| Method | Returns | Description |
|---|---|---|
where(clause, *params) |
Query[T] |
Add a WHERE clause (multiple are ANDed) |
where_if(cond, clause, *params) |
Query[T] |
Add a WHERE clause only ifcondis truthy |
order_by(clause) |
Query[T] |
Set ORDER BY |
take(n) |
Query[T] |
Set LIMIT |
skip(n) |
Query[T] |
Set OFFSET |
select(columns) |
Query[T] |
Set columns to SELECT (default:*) |
fetch(db) |
list[T] |
Execute and return all rows |
fetch_one(db) |
T | None |
Execute and return first row |
count(db) |
int |
COUNT(*) with same WHERE clauses |
exists(db) |
bool |
Check if any row matches |
stream(db) |
AsyncIterator[T] |
Yield rows incrementally |
.sql |
str |
The exact SQL that will execute |
.params |
tuple |
The bound parameters |
Transactions
Wrap multiple statements in an atomic transaction:
async with db.transaction():
await db.execute(
"INSERT INTO orders (user_id, total) VALUES (?, ?)",
user_id, total,
)
await db.execute(
"UPDATE inventory SET stock = stock - ? WHERE product_id = ?",
quantity, product_id,
)
Auto-commits on clean exit, auto-rolls back on exception:
async with db.transaction():
await db.execute("INSERT INTO users ...", name, email)
raise ValueError("something went wrong")
# everything rolled back -- nothing committed
Reads inside a transaction see uncommitted writes:
async with db.transaction():
await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", "Alice", "a@b.com")
count = await db.fetch_val("SELECT COUNT(*) FROM users")
# count includes the uncommitted row
Nesting is transparent -- inner transaction()joins the outer one:
async with db.transaction():
await db.execute("INSERT INTO users ...", name, email)
async with db.transaction(): # no-op, joins outer
await db.execute("INSERT INTO profiles ...", user_id)
# both committed together
Migrations
Forward-only SQL migrations. Create numbered.sqlfiles in a directory:
migrations/
001_create_users.sql
002_add_email_index.sql
003_create_orders.sql
With the App
app = App(db="sqlite:///app.db", migrations="migrations/")
Pending migrations run automatically at startup. Each migration runs in its own transaction -- if one fails, it rolls back without affecting previously applied migrations.
Standalone
from chirp.data import Database, migrate
db = Database("sqlite:///app.db")
await db.connect()
result = await migrate(db, "migrations/")
print(result.summary)
# "Applied 2 migration(s): 001_create_users, 002_add_email_index"
Tracking
Applied migrations are tracked in a_chirp_migrations table. Running migrate()again only applies new migrations -- it is safe to call on every startup.
Migration Files
Each file is plain SQL executed as a single statement:
-- migrations/001_create_users.sql
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE
)
Naming convention: NNN_description.sql where NNNis a zero-padded version number.
Echo / Query Logging
Enable SQL logging to see every query with timing:
db = Database("sqlite:///app.db", echo=True)
Output goes to stderr:
[chirp.data] 0.3ms SELECT * FROM users WHERE active = ? params=(True,)
[chirp.data] 0.1ms INSERT INTO users (name, email) VALUES (?, ?) params=('Alice', 'alice@example.com')
LISTEN / NOTIFY (PostgreSQL)
Listen for real-time database notifications:
async for notification in db.listen("new_orders"):
print(f"Channel: {notification.channel}")
print(f"Payload: {notification.payload}")
Pair with chirp's EventStreamfor real-time HTML updates:
from chirp import EventStream, Fragment
@app.route("/orders/live")
async def live_orders(request):
async def generate():
async for note in app.db.listen("new_orders"):
order = await app.db.fetch_one(
Order, "SELECT * FROM orders WHERE id = $1",
int(note.payload),
)
if order:
yield Fragment("orders.html", "order-row", order=order)
return EventStream(generate())
On the PostgreSQL side, trigger notifications from a function or manually:
-- Trigger on INSERT
CREATE OR REPLACE FUNCTION notify_new_order()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('new_orders', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER order_created
AFTER INSERT ON orders
FOR EACH ROW EXECUTE FUNCTION notify_new_order();
Note
LISTEN/NOTIFY is a PostgreSQL feature. SQLite does not support real-time notifications -- callingdb.listen() on a SQLite database raises DataError.
PostgreSQL
Swap the connection string for PostgreSQL:
app = App(db="postgresql://user:pass@localhost/mydb")
PostgreSQL uses asyncpgwith connection pooling. Configure pool size:
from chirp.data import Database
db = Database("postgresql://user:pass@localhost/mydb", pool_size=10)
app = App(db=db)
Note
PostgreSQL requires thedata-pg extra: pip install bengal-chirp[data-pg]. This installs asyncpg.
Parameter Style
SQLite uses? placeholders. PostgreSQL uses $1, $2, etc:
# SQLite
await db.fetch(User, "SELECT * FROM users WHERE id = ?", 42)
# PostgreSQL
await db.fetch(User, "SELECT * FROM users WHERE id = $1", 42)
Error Handling
All data layer errors inherit fromDataError:
| Error | When |
|---|---|
DataError |
Base class for all data errors |
QueryError |
SQL execution fails |
ConnectionError |
Cannot connect to database |
DriverNotInstalledError |
Missingaiosqlite or asyncpg |
MigrationError |
Migration file invalid or execution fails |
from chirp.data import DataError
from chirp.data.errors import QueryError
try:
await db.execute("INSERT INTO users ...")
except QueryError as e:
print(f"Query failed: {e}")
Next Steps
- Forms & Validation -- Parse and validate form data
- [Server-Sent Events] -- Real-time updates with SSE
- Built-in Middleware -- Session middleware for user state