Module

data.database

Typed async database access.

Supports SQLite (via stdlibsqlite3 + anyio) and PostgreSQL (via asyncpg). SQL in, frozen dataclasses out.

Connection URL format::

sqlite:///path/to/db.sqlite    # SQLite file
sqlite:///:memory:             # In-memory SQLite
postgresql://user:pass@host/db # PostgreSQL

Free-threading safety:

  • Connection pool usesthreading.Lockfor thread-safe access
  • Connections are per-task (ContextVar), never shared between tasks
  • All public methods are async — no sync I/O on the calling thread

Classes

DatabaseConfig 3
Database connection configuration. Parsed from a URL string or constructed directly.

Database connection configuration.

Parsed from a URL string or constructed directly.

Attributes

Name Type Description
url str
pool_size int
echo bool
Notification 2
A notification received from PostgreSQL LISTEN/NOTIFY.

A notification received from PostgreSQL LISTEN/NOTIFY.

Attributes

Name Type Description
channel str

The notification channel name.

payload str

The notification payload string (may be empty).

Database 18
Typed async database access. SQL queries return frozen dataclasses. Streaming queries return async…

Typed async database access.

SQL queries return frozen dataclasses. Streaming queries return async iterators. Both modes use the same SQL — the difference is whether you want all results at once or incrementally.

Usage::

db = Database("sqlite:///app.db")

@dataclass(frozen=True, slots=True)
class User:
    id: int
    name: str
    email: str

# Fetch all
users = await db.fetch(User, "SELECT * FROM users")

# Fetch one
user = await db.fetch_one(User, "SELECT * FROM users WHERE id = ?", 42)

# Stream (cursor-based)
async for user in db.stream(User, "SELECT * FROM users"):
    process(user)

# Execute (INSERT/UPDATE/DELETE)
await db.execute("INSERT INTO users (name, email) VALUES (?, ?)",
                 "Alice", "alice@example.com")

# Raw scalar
count = await db.fetch_val("SELECT COUNT(*) FROM users")

# Transaction (atomic multi-statement)
async with db.transaction():
    await db.execute("INSERT INTO users ...", name, email)
    await db.execute("INSERT INTO profiles ...", user_id)

Methods

transaction 0 AsyncIterator[None]
Execute multiple statements atomically. Auto-commits on clean exit, rolls back…
async
async def transaction(self) -> AsyncIterator[None]

Execute multiple statements atomically.

Auto-commits on clean exit, rolls back on exception. Calls toexecute, fetch, etc. inside the block reuse the transaction's connection automatically via ContextVar.

Nesting is transparent — if already inside a transaction, the innertransaction()joins the outer one (no-op).

Usage::

async with db.transaction():
    await db.execute("INSERT INTO users ...", name, email)
    await db.execute("INSERT INTO profiles ...", user_id)
    # auto-commits here

async with db.transaction():
    await db.execute("INSERT INTO users ...", name, email)
    raise ValueError("oops")
    # auto-rollback on exception
Returns
AsyncIterator[None]
fetch 2 list[T]
Execute a query and return all rows as typed dataclasses. Usage:: users =…
async
async def fetch(self, cls: type[T], sql: str, /, *params: Any) -> list[T]

Execute a query and return all rows as typed dataclasses.

Usage::

users = await db.fetch(User, "SELECT * FROM users WHERE active = ?", True)
Parameters
Name Type Description
sql
*params
Returns
list[T]
fetch_one 2 T | None
Execute a query and return the first row, or ``None``. Usage:: user = awa…
async
async def fetch_one(self, cls: type[T], sql: str, /, *params: Any) -> T | None

Execute a query and return the first row, orNone.

Usage::

user = await db.fetch_one(User, "SELECT * FROM users WHERE id = ?", 42)
Parameters
Name Type Description
sql
*params
Returns
T | None
stream 3 AsyncIterator[T]
Execute a query and yield rows incrementally as typed dataclasses. Uses a serv…
async
async def stream(self, cls: type[T], sql: str, /, *params: Any, batch_size: int = 100) -> AsyncIterator[T]

Execute a query and yield rows incrementally as typed dataclasses.

Uses a server-side cursor for memory-efficient iteration over large result sets. Rows are fetched in batches ofbatch_size.

Usage::

async for entry in db.stream(LogEntry, "SELECT * FROM logs"):
    process(entry)
Parameters
Name Type Description
sql
*params
batch_size Default:100
Returns
AsyncIterator[T]
execute 2 int
Execute a statement (INSERT/UPDATE/DELETE) and return rows affected. Usage:: …
async
async def execute(self, sql: str, /, *params: Any) -> int

Execute a statement (INSERT/UPDATE/DELETE) and return rows affected.

Usage::

count = await db.execute(
    "INSERT INTO users (name, email) VALUES (?, ?)",
    "Alice", "alice@example.com",
)
Parameters
Name Type Description
sql
*params
Returns
int
execute_script 1
Execute multiple SQL statements at once (SQLite only). Useful for migrations t…
async
async def execute_script(self, sql: str, /) -> None

Execute multiple SQL statements at once (SQLite only).

Useful for migrations that contain multiple statements::

await db.execute_script('''
    CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);
    CREATE INDEX idx_users_name ON users(name);
''')

For PostgreSQL, useexecute()with individual statements inside atransaction()block instead.

Parameters
Name Type Description
sql
execute_many 2 int
Execute a statement for each parameter set (batch INSERT/UPDATE). Returns the …
async
async def execute_many(self, sql: str, params_seq: Sequence[tuple[Any, ...]], /) -> int

Execute a statement for each parameter set (batch INSERT/UPDATE).

Returns the total number of rows affected.

Usage::

await db.execute_many(
    "INSERT INTO users (name, email) VALUES (?, ?)",
    [("Alice", "a@b.com"), ("Bob", "b@b.com")],
)
Parameters
Name Type Description
sql
params_seq
Returns
int
fetch_val 2 Any
async
async def fetch_val(self, sql: str, /, *params: Any) -> Any
Parameters
Name Type Description
sql
*params
Returns
Any
fetch_val 3 T | None
async
async def fetch_val(self, sql: str, /, *params: Any, as_type: type[T]) -> T | None
Parameters
Name Type Description
sql
*params
as_type
Returns
T | None
fetch_val 3 Any
Execute a query and return the first column of the first row. Useful for COUNT…
async
async def fetch_val(self, sql: str, /, *params: Any, as_type: type | None = None) -> Any

Execute a query and return the first column of the first row.

Useful for COUNT, SUM, MAX, etc.

Usage::

count = await db.fetch_val("SELECT COUNT(*) FROM users")
Parameters
Name Type Description
sql
*params
as_type Default:None
Returns
Any
listen 1 AsyncIterator[Notificati…
Listen for PostgreSQL NOTIFY events on one or more channels. Opens a **dedicat…
async
async def listen(self, *channels: str) -> AsyncIterator[Notification]

Listen for PostgreSQL NOTIFY events on one or more channels.

Opens a dedicated connection (not from the pool) that stays open for the lifetime of the iterator. YieldsNotification objects as they arrive.

Pair with chirp'sEventStreamfor real-time HTML updates::

@app.route("/orders/live")
async def live_orders(request):
    async def generate():
        async for note in app.db.listen("new_orders"):
            order = await app.db.fetch_one(
                Order, "SELECT * FROM orders WHERE id = $1",
                int(note.payload),
            )
            if order:
                yield Fragment("orders.html", "row", order=order)
    return EventStream(generate())

SQLite does not support LISTEN/NOTIFY — raisesDataError.

Parameters
Name Type Description
*channels
Returns
AsyncIterator[Notification]
connect 0
Initialize the connection pool. Called automatically on first query. Call expl…
async
async def connect(self) -> None

Initialize the connection pool.

Called automatically on first query. Call explicitly if you want to fail fast at startup.

disconnect 0
Close all connections in the pool.
async
async def disconnect(self) -> None
Internal Methods 5
__init__ 3
def __init__(self, url: str, /, *, pool_size: int = 5, echo: bool = False) -> None
Parameters
Name Type Description
url
pool_size Default:5
echo Default:False
_connection 0 AsyncIterator[Any]
Acquire a connection, release when done. If inside a ``transaction()`` block, …
async
async def _connection(self) -> AsyncIterator[Any]

Acquire a connection, release when done.

If inside atransaction()block, reuses the transaction's connection (no acquire/release — the transaction owns it). Otherwise acquires from the pool and releases on exit.

SQLite connections are serialized via an async lock to prevent concurrent thread-pool dispatches on the same connection — matching the serialization guarantee thataiosqliteprovided via its dedicated thread.

Returns
AsyncIterator[Any]
_log_query 3
Log a query to stderr when echo is enabled.
def _log_query(self, sql: str, params: tuple[Any, ...] | Sequence[Any], elapsed: float) -> None
Parameters
Name Type Description
sql
params
elapsed
__aenter__ 0 Database
async
async def __aenter__(self) -> Database
Returns
Database
__aexit__ 1
async
async def __aexit__(self, *_: Any) -> None
Parameters
Name Type Description
*_

Functions

get_db 0 Database
Return the app-level database instance. Available when a ``Database`` is confi…
def get_db() -> Database

Return the app-level database instance.

Available when aDatabase is configured on the App::

app = App(db="sqlite:///app.db")

@app.route("/users")
async def users():
    db = get_db()
    return await db.fetch(User, "SELECT * FROM users")

RaisesLookupErrorif no database is configured or the app has not started yet.

Returns
Database
_in_transaction 0 bool
Check if the current task is inside a managed transaction.
def _in_transaction() -> bool
Returns
bool
_detect_driver 1 str
Detect the database driver from the URL scheme.
def _detect_driver(url: str) -> str
Parameters
Name Type Description
url str
Returns
str
_parse_sqlite_path 1 str
Extract the file path from a sqlite:// URL.
def _parse_sqlite_path(url: str) -> str
Parameters
Name Type Description
url str
Returns
str