Module

data.query

Immutable query builder for chirp.data.

Accumulates SQL clauses through chaining methods, compiles to a SQL string

  • parameters tuple, and executes via the existingDatabasemethods.

Each method returns a new frozenQuery— the original is never mutated. Same pattern asResponse.with_*()but for SELECT queries.

Usage::

from chirp.data import Database, Query

@dataclass(frozen=True, slots=True)
class Todo:
    id: int
    text: str
    done: bool

todos = await (
    Query(Todo, "todos")
    .where("done = ?", False)
    .where_if(search, "text LIKE ?", f"%{search}%")
    .order_by("id DESC")
    .take(20)
    .fetch(db)
)

Transparency:.sql and .paramsshow exactly what will run. No hidden queries, no magic.

Free-threading safety:

  • Frozen dataclass — immutable after creation
  • Tuple accumulators — no shared mutable state
  • No locks needed

Classes

Query 20
Immutable SELECT query builder. Construct with a target dataclass and table name, chain methods to…

Immutable SELECT query builder.

Construct with a target dataclass and table name, chain methods to add clauses, then execute viafetch(), fetch_one(), etc.

Every method returns a newQuery— the original is unchanged.

Attributes

Name Type Description
_cls type[T]
_table str
_wheres tuple[tuple[str, tuple[object, ...]], ...]
_order str | None
_limit int | None
_offset int | None
_columns str

Methods

sql 0 str
The exact SQL that will run. No surprises.
property
def sql(self) -> str
Returns
str
params 0 tuple[object, ...]
The bound parameters, in order.
property
def params(self) -> tuple[object, ...]
Returns
tuple[object, ...]
where 2 Query[T]
Add a WHERE clause. Multiple calls are ANDed. :: Query(Todo, "todos").whe…
def where(self, clause: str, /, *params: object) -> Query[T]

Add a WHERE clause. Multiple calls are ANDed.

::

Query(Todo, "todos").where("done = ?", False).where("id > ?", 10)
# WHERE done = ? AND id > ?
Parameters
Name Type Description
clause
*params
Returns
Query[T]
where_if 3 Query[T]
Add a WHERE clause only if ``condition`` is truthy. The killer method for dyna…
def where_if(self, condition: object, clause: str, /, *params: object) -> Query[T]

Add a WHERE clause only ifconditionis truthy.

The killer method for dynamic queries — no more string concatenation withifblocks::

Query(Todo, "todos")
    .where_if(status, "done = ?", status == "done")
    .where_if(search, "text LIKE ?", f"%{search}%")
Parameters
Name Type Description
condition
clause
*params
Returns
Query[T]
order_by 1 Query[T]
Set ORDER BY. Replaces any previous ordering. :: Query(Todo, "todos").ord…
def order_by(self, clause: str) -> Query[T]

Set ORDER BY. Replaces any previous ordering.

::

Query(Todo, "todos").order_by("id DESC")
Parameters
Name Type Description
clause
Returns
Query[T]
take 1 Query[T]
Set LIMIT (max rows to return). :: Query(Todo, "todos").take(20)
def take(self, n: int) -> Query[T]
Parameters
Name Type Description
n
Returns
Query[T]
skip 1 Query[T]
Set OFFSET (rows to skip). :: Query(Todo, "todos").take(20).skip(40) # p…
def skip(self, n: int) -> Query[T]

Set OFFSET (rows to skip).

::

Query(Todo, "todos").take(20).skip(40)  # page 3
Parameters
Name Type Description
n
Returns
Query[T]
select 1 Query[T]
Set which columns to SELECT. Default is ``*``. :: Query(Todo, "todos").se…
def select(self, columns: str) -> Query[T]

Set which columns to SELECT. Default is*.

::

Query(Todo, "todos").select("id, text")
Parameters
Name Type Description
columns
Returns
Query[T]
fetch 1 list[T]
Execute and return all matching rows as typed dataclasses.
async
async def fetch(self, db: Database) -> list[T]
Parameters
Name Type Description
db
Returns
list[T]
fetch_one 1 T | None
Execute and return the first matching row, or ``None``.
async
async def fetch_one(self, db: Database) -> T | None
Parameters
Name Type Description
db
Returns
T | None
count 1 int
Execute a COUNT(*) with the same WHERE clauses. Ignores ``select()``, ``order_…
async
async def count(self, db: Database) -> int

Execute a COUNT(*) with the same WHERE clauses.

Ignoresselect(), order_by(), take(), and skip() — counts all matching rows.

Parameters
Name Type Description
db
Returns
int
stream 2 AsyncIterator[T]
Execute and yield rows incrementally as typed dataclasses.
async
async def stream(self, db: Database, *, batch_size: int = 100) -> AsyncIterator[T]
Parameters
Name Type Description
db
batch_size Default:100
Returns
AsyncIterator[T]
exists 1 bool
Check if at least one matching row exists. Uses ``SELECT 1 ... LIMIT 1`` for e…
async
async def exists(self, db: Database) -> bool

Check if at least one matching row exists.

UsesSELECT 1 ... LIMIT 1for efficiency.

Parameters
Name Type Description
db
Returns
bool