Classes
DatabaseConfig
3
▼
Database connection configuration.
Parsed from a URL string or constructed directly.
DatabaseConfig
3
▼
Database connection configuration.
Parsed from a URL string or constructed directly.
Attributes
| Name | Type | Description |
|---|---|---|
url |
str
|
— |
pool_size |
int
|
— |
echo |
bool
|
— |
Notification
2
▼
A notification received from PostgreSQL LISTEN/NOTIFY.
Notification
2
▼
A notification received from PostgreSQL LISTEN/NOTIFY.
Attributes
| Name | Type | Description |
|---|---|---|
channel |
str
|
The notification channel name. |
payload |
str
|
The notification payload string (may be empty). |
Database
18
▼
Typed async database access.
SQL queries return frozen dataclasses. Streaming queries return async…
Database
18
▼
Typed async database access.
SQL queries return frozen dataclasses. Streaming queries return async iterators. Both modes use the same SQL — the difference is whether you want all results at once or incrementally.
Usage::
db = Database("sqlite:///app.db")
@dataclass(frozen=True, slots=True)
class User:
id: int
name: str
email: str
# Fetch all
users = await db.fetch(User, "SELECT * FROM users")
# Fetch one
user = await db.fetch_one(User, "SELECT * FROM users WHERE id = ?", 42)
# Stream (cursor-based)
async for user in db.stream(User, "SELECT * FROM users"):
process(user)
# Execute (INSERT/UPDATE/DELETE)
await db.execute("INSERT INTO users (name, email) VALUES (?, ?)",
"Alice", "alice@example.com")
# Raw scalar
count = await db.fetch_val("SELECT COUNT(*) FROM users")
# Transaction (atomic multi-statement)
async with db.transaction():
await db.execute("INSERT INTO users ...", name, email)
await db.execute("INSERT INTO profiles ...", user_id)
Methods
transaction
0
AsyncIterator[None]
▼
Execute multiple statements atomically.
Auto-commits on clean exit, rolls back…
async
transaction
0
AsyncIterator[None]
▼
async def transaction(self) -> AsyncIterator[None]
Execute multiple statements atomically.
Auto-commits on clean exit, rolls back on exception.
Calls toexecute, fetch, etc. inside the block reuse
the transaction's connection automatically via ContextVar.
Nesting is transparent — if already inside a transaction,
the innertransaction()joins the outer one (no-op).
Usage::
async with db.transaction():
await db.execute("INSERT INTO users ...", name, email)
await db.execute("INSERT INTO profiles ...", user_id)
# auto-commits here
async with db.transaction():
await db.execute("INSERT INTO users ...", name, email)
raise ValueError("oops")
# auto-rollback on exception
Returns
AsyncIterator[None]
fetch
2
list[T]
▼
Execute a query and return all rows as typed dataclasses.
Usage::
users =…
async
fetch
2
list[T]
▼
async def fetch(self, cls: type[T], sql: str, /, *params: Any) -> list[T]
Execute a query and return all rows as typed dataclasses.
Usage::
users = await db.fetch(User, "SELECT * FROM users WHERE active = ?", True)
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
*params |
— |
Returns
list[T]
fetch_one
2
T | None
▼
Execute a query and return the first row, or ``None``.
Usage::
user = awa…
async
fetch_one
2
T | None
▼
async def fetch_one(self, cls: type[T], sql: str, /, *params: Any) -> T | None
Execute a query and return the first row, orNone.
Usage::
user = await db.fetch_one(User, "SELECT * FROM users WHERE id = ?", 42)
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
*params |
— |
Returns
T | None
stream
3
AsyncIterator[T]
▼
Execute a query and yield rows incrementally as typed dataclasses.
Uses a serv…
async
stream
3
AsyncIterator[T]
▼
async def stream(self, cls: type[T], sql: str, /, *params: Any, batch_size: int = 100) -> AsyncIterator[T]
Execute a query and yield rows incrementally as typed dataclasses.
Uses a server-side cursor for memory-efficient iteration over large
result sets. Rows are fetched in batches ofbatch_size.
Usage::
async for entry in db.stream(LogEntry, "SELECT * FROM logs"):
process(entry)
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
*params |
— |
|
batch_size |
— |
Default:100
|
Returns
AsyncIterator[T]
execute
2
int
▼
Execute a statement (INSERT/UPDATE/DELETE) and return rows affected.
Usage::
…
async
execute
2
int
▼
async def execute(self, sql: str, /, *params: Any) -> int
Execute a statement (INSERT/UPDATE/DELETE) and return rows affected.
Usage::
count = await db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
"Alice", "alice@example.com",
)
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
*params |
— |
Returns
int
execute_script
1
▼
Execute multiple SQL statements at once (SQLite only).
Useful for migrations t…
async
execute_script
1
▼
async def execute_script(self, sql: str, /) -> None
Execute multiple SQL statements at once (SQLite only).
Useful for migrations that contain multiple statements::
await db.execute_script('''
CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);
CREATE INDEX idx_users_name ON users(name);
''')
For PostgreSQL, useexecute()with individual statements
inside atransaction()block instead.
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
execute_many
2
int
▼
Execute a statement for each parameter set (batch INSERT/UPDATE).
Returns the …
async
execute_many
2
int
▼
async def execute_many(self, sql: str, params_seq: Sequence[tuple[Any, ...]], /) -> int
Execute a statement for each parameter set (batch INSERT/UPDATE).
Returns the total number of rows affected.
Usage::
await db.execute_many(
"INSERT INTO users (name, email) VALUES (?, ?)",
[("Alice", "a@b.com"), ("Bob", "b@b.com")],
)
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
params_seq |
— |
Returns
int
fetch_val
2
Any
▼
async
fetch_val
2
Any
▼
async def fetch_val(self, sql: str, /, *params: Any) -> Any
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
*params |
— |
Returns
Any
fetch_val
3
T | None
▼
async
fetch_val
3
T | None
▼
async def fetch_val(self, sql: str, /, *params: Any, as_type: type[T]) -> T | None
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
*params |
— |
|
as_type |
— |
Returns
T | None
fetch_val
3
Any
▼
Execute a query and return the first column of the first row.
Useful for COUNT…
async
fetch_val
3
Any
▼
async def fetch_val(self, sql: str, /, *params: Any, as_type: type | None = None) -> Any
Execute a query and return the first column of the first row.
Useful for COUNT, SUM, MAX, etc.
Usage::
count = await db.fetch_val("SELECT COUNT(*) FROM users")
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
*params |
— |
|
as_type |
— |
Default:None
|
Returns
Any
listen
1
AsyncIterator[Notificati…
▼
Listen for PostgreSQL NOTIFY events on one or more channels.
Opens a **dedicat…
async
listen
1
AsyncIterator[Notificati…
▼
async def listen(self, *channels: str) -> AsyncIterator[Notification]
Listen for PostgreSQL NOTIFY events on one or more channels.
Opens a dedicated connection (not from the pool) that stays
open for the lifetime of the iterator. YieldsNotification
objects as they arrive.
Pair with chirp'sEventStreamfor real-time HTML updates::
@app.route("/orders/live")
async def live_orders(request):
async def generate():
async for note in app.db.listen("new_orders"):
order = await app.db.fetch_one(
Order, "SELECT * FROM orders WHERE id = $1",
int(note.payload),
)
if order:
yield Fragment("orders.html", "row", order=order)
return EventStream(generate())
SQLite does not support LISTEN/NOTIFY — raisesDataError.
Parameters
| Name | Type | Description |
|---|---|---|
*channels |
— |
Returns
AsyncIterator[Notification]
connect
0
▼
Initialize the connection pool.
Called automatically on first query. Call expl…
async
connect
0
▼
async def connect(self) -> None
Initialize the connection pool.
Called automatically on first query. Call explicitly if you want to fail fast at startup.
disconnect
0
▼
Close all connections in the pool.
async
disconnect
0
▼
async def disconnect(self) -> None
Internal Methods 5 ▼
__init__
3
▼
__init__
3
▼
def __init__(self, url: str, /, *, pool_size: int = 5, echo: bool = False) -> None
Parameters
| Name | Type | Description |
|---|---|---|
url |
— |
|
pool_size |
— |
Default:5
|
echo |
— |
Default:False
|
_connection
0
AsyncIterator[Any]
▼
Acquire a connection, release when done.
If inside a ``transaction()`` block, …
async
_connection
0
AsyncIterator[Any]
▼
async def _connection(self) -> AsyncIterator[Any]
Acquire a connection, release when done.
If inside atransaction()block, reuses the transaction's
connection (no acquire/release — the transaction owns it).
Otherwise acquires from the pool and releases on exit.
SQLite connections are serialized via an async lock to prevent
concurrent thread-pool dispatches on the same connection — matching
the serialization guarantee thataiosqliteprovided via its
dedicated thread.
Returns
AsyncIterator[Any]
_log_query
3
▼
Log a query to stderr when echo is enabled.
_log_query
3
▼
def _log_query(self, sql: str, params: tuple[Any, ...] | Sequence[Any], elapsed: float) -> None
Parameters
| Name | Type | Description |
|---|---|---|
sql |
— |
|
params |
— |
|
elapsed |
— |
__aenter__
0
Database
▼
async
__aenter__
0
Database
▼
async def __aenter__(self) -> Database
Returns
Database
__aexit__
1
▼
async
__aexit__
1
▼
async def __aexit__(self, *_: Any) -> None
Parameters
| Name | Type | Description |
|---|---|---|
*_ |
— |
Functions
get_db
0
Database
▼
Return the app-level database instance.
Available when a ``Database`` is confi…
get_db
0
Database
▼
def get_db() -> Database
Return the app-level database instance.
Available when aDatabase is configured on the App::
app = App(db="sqlite:///app.db")
@app.route("/users")
async def users():
db = get_db()
return await db.fetch(User, "SELECT * FROM users")
RaisesLookupErrorif no database is configured or the app
has not started yet.
Returns
Database
_in_transaction
0
bool
▼
Check if the current task is inside a managed transaction.
_in_transaction
0
bool
▼
def _in_transaction() -> bool
Returns
bool
_detect_driver
1
str
▼
Detect the database driver from the URL scheme.
_detect_driver
1
str
▼
def _detect_driver(url: str) -> str
Parameters
| Name | Type | Description |
|---|---|---|
url |
str |
Returns
str
_parse_sqlite_path
1
str
▼
Extract the file path from a sqlite:// URL.
_parse_sqlite_path
1
str
▼
def _parse_sqlite_path(url: str) -> str
Parameters
| Name | Type | Description |
|---|---|---|
url |
str |
Returns
str