QueryBuilder
Fluent API for constructing and executing database queries. Instances are created by SqliterDB.select() -- you do not instantiate QueryBuilder directly.
results = db.select(User).filter(age__gt=18).order("name").fetch_all()
Source: sqliter/query/query.py
See also: Guide -- Filtering, Guide -- Ordering, Guide -- Aggregates, Guide -- Field Control, Guide -- Caching
Type Parameters
T = TypeVar("T", bound=BaseDBModel)
class QueryBuilder(Generic[T]):
...
QueryBuilder is generic over T, the model class. All fetch methods return instances of T.
Type Aliases
FilterValue
The allowed types for filter values:
FilterValue = Union[
str, int, float, bool, None,
list[Union[str, int, float, bool]],
]
Projection Types
Projection queries use the following exported helpers from sqliter.query:
AggregateSpec
Represents one aggregate expression used by annotate().
@dataclass(frozen=True)
class AggregateSpec:
func: Literal["COUNT", "SUM", "AVG", "MIN", "MAX"]
field: str | None = None
distinct: bool = False
Note
COUNT allows field=None (equivalent to COUNT(*)), but SUM/AVG/MIN/MAX require a concrete field name.
func
Namespace of aggregate helper constructors:
func.count(field=None, distinct=False)func.sum(field, distinct=False)func.avg(field, distinct=False)func.min(field, distinct=False)func.max(field, distinct=False)
Constructor
def __init__(
self,
db: SqliterDB,
model_class: type[T],
fields: list[str] | None = None,
) -> None:
Note
You should not call this directly. Use db.select(Model) instead.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
db | SqliterDB | required | Database connection |
model_class | type[T] | required | The model class to query |
fields | list[str] | None | None | Fields to select (all if None) |
Filter Methods
filter()
Apply filter conditions to the query. Supports operator suffixes on field names and relationship traversal.
def filter(
self,
**conditions: FilterValue,
) -> Self:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**conditions | FilterValue | Field-operator pairs as keyword arguments |
Returns: Self for method chaining.
Raises:
InvalidFilterError-- If a field does not exist on the model.InvalidRelationshipError-- If a relationship traversal path is invalid.TypeError-- If a list is passed for a scalar operator, or a non-string for a string operator.
Example:
# Simple equality (default __eq)
db.select(User).filter(name="Alice")
# Comparison operators
db.select(User).filter(age__gt=18, age__lt=65)
# Multiple chained calls (AND logic)
db.select(User).filter(active=True).filter(age__gte=21)
# Relationship traversal (ORM mode)
db.select(Book).filter(author__name="Alice")
Filter Operators
| Operator | SQL | Value Type | Description |
|---|---|---|---|
| (none) | = | scalar | Equality (default) |
__eq | = | scalar | Explicit equality |
__ne | != | scalar | Not equal |
__gt | > | scalar | Greater than |
__lt | < | scalar | Less than |
__gte | >= | scalar | Greater than or equal |
__lte | <= | scalar | Less than or equal |
__in | IN | list | Value in list |
__not_in | NOT IN | list | Value not in list |
__isnull | IS NULL | bool | Field is NULL (pass True) |
__notnull | IS NOT NULL | bool | Field is not NULL (pass True) |
__like | LIKE | str | Raw SQL LIKE pattern (user provides % wildcards) |
__startswith | GLOB | str | Case-sensitive starts with |
__endswith | GLOB | str | Case-sensitive ends with |
__contains | GLOB | str | Case-sensitive contains |
__istartswith | LIKE | str | Case-insensitive starts with |
__iendswith | LIKE | str | Case-insensitive ends with |
__icontains | LIKE | str | Case-insensitive contains |
Field Selection
fields()
Specify which fields to include in the query results.
def fields(
self,
fields: list[str] | None = None,
) -> Self:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
fields | list[str] | None | None | Fields to select; pk is always included |
Returns: Self for method chaining.
Example:
db.select(User).fields(["name", "email"]).fetch_all()
exclude()
Specify which fields to exclude from the query results.
def exclude(
self,
fields: list[str] | None = None,
) -> Self:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
fields | list[str] | None | None | Fields to exclude |
Returns: Self for method chaining.
Raises:
ValueError-- Ifpkis excluded, if invalid fields are specified, or if exclusion leaves no fields.
Example:
db.select(User).exclude(["password_hash"]).fetch_all()
only()
Select a single field (plus pk).
def only(
self,
field: str,
) -> Self:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
field | str | required | The single field to select |
Returns: Self for method chaining.
Raises:
ValueError-- If the field does not exist.
Example:
db.select(User).only("email").fetch_all()
Relationships
select_related()
Specify FK relationships to eager load via SQL JOINs. Reduces the N+1 query problem by fetching related objects in a single query.
def select_related(
self,
*paths: str,
) -> Self:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*paths | str | One or more relationship paths to eager load |
Returns: Self for method chaining.
Raises:
InvalidRelationshipError-- If a path contains invalid fields.
Example:
# Single level
books = db.select(Book).select_related("author").fetch_all()
# book.author.name -- no additional query needed
# Nested
comments = db.select(Comment).select_related(
"post__author"
).fetch_all()
# Multiple paths
books = db.select(Book).select_related(
"author", "publisher"
).fetch_all()
prefetch_related()
Specify reverse FK and M2M relationships to eager load via a second query. Reduces the N+1 query problem for reverse relationships.
def prefetch_related(
self,
*paths: str,
) -> Self:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*paths | str | One or more reverse FK or M2M relationship names |
Returns: Self for method chaining.
Raises:
InvalidPrefetchError-- If a path is not a valid reverse FK or M2M relationship. Forward FK paths (which should useselect_related()) also raise this error.
Example:
# Reverse FK
authors = db.select(Author).prefetch_related("books").fetch_all()
# author.books.fetch_all() -- no additional query
# M2M (forward)
articles = db.select(Article).prefetch_related("tags").fetch_all()
# M2M (reverse)
tags = db.select(Tag).prefetch_related("articles").fetch_all()
# Multiple paths
authors = db.select(Author).prefetch_related(
"books", "reviews"
).fetch_all()
# Nested paths
authors = db.select(Author).prefetch_related(
"books__reviews"
).fetch_all()
# Combined with select_related
books = db.select(Book).select_related("author").prefetch_related().fetch_all()
Note
Use select_related() for forward FK relationships (e.g., book.author) and prefetch_related() for reverse FK and M2M relationships (e.g., author.books, article.tags). Nested paths must only include reverse FK or M2M segments (use select_related() for forward FK data).
Projection and Aggregates
Projection mode is enabled by group_by(), annotate(), or with_count(). In this mode, query results are dictionaries returned by fetch_dicts().
group_by()
Group projection rows by one or more base-model fields.
def group_by(
self,
*fields: str,
) -> Self:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields | str | Fields to group results |
Returns: Self for method chaining.
Raises:
InvalidProjectionError-- If any field does not exist on the model.
annotate()
Add aggregate projections keyed by alias.
def annotate(
self,
**aggregates: AggregateSpec,
) -> Self:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**aggregates | AggregateSpec | Mapping of output alias to aggregate |
Returns: Self for method chaining.
Raises:
InvalidProjectionError-- For empty/conflicting/duplicate aliases, unknown aggregate fields, or unsupported aggregate combinations.TypeError-- If any value is not anAggregateSpec.
having()
Add SQL HAVING filters for grouped/aggregate queries.
def having(
self,
**conditions: FilterValue,
) -> Self:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**conditions | FilterValue | HAVING conditions with operators |
Returns: Self for method chaining.
Raises:
InvalidProjectionError-- If projection mode is not active, or conditions reference fields that are neither grouped fields nor aggregate aliases.TypeError-- If operator value types are invalid (same rules asfilter()).
with_count()
Add a relationship count aggregate using LEFT JOIN semantics.
def with_count(
self,
path: str,
alias: str = "count",
*,
distinct: bool = False,
) -> Self:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
path | str | required | Relationship path to count |
alias | str | "count" | Output alias for the count column |
distinct | bool | False | Use COUNT(DISTINCT ...) |
Returns: Self for method chaining.
Raises:
InvalidProjectionError-- For invalid aliases, invalid terminal relationship types, or unresolved M2M SQL metadata.InvalidRelationshipError-- Ifpathis not a valid relationship on the model.
Note
with_count() supports nested paths (for example, "author__books" or "articles__tags"), but the terminal segment must be to-many (reverse FK or many-to-many). If no group_by() exists, SQLiter automatically groups by current selected model fields.
fetch_dicts()
Execute a projection query and return dictionaries.
def fetch_dicts(self) -> list[dict[str, Any]]:
Returns: list[dict[str, Any]] -- Projection rows as dictionaries.
Raises:
InvalidProjectionError-- If called before projection mode is enabled.RecordFetchError-- If SQL execution fails.
Example:
from sqliter.query import func
rows = (
db.select(Sale)
.group_by("category")
.annotate(total=func.sum("amount"), entries=func.count())
.having(total__gt=10)
.order("total", reverse=True)
.fetch_dicts()
)
Pagination
limit()
Limit the number of results returned.
def limit(
self,
limit_value: int,
) -> Self:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
limit_value | int | required | Maximum number of records |
Returns: Self for method chaining.
offset()
Skip a number of records before returning results.
def offset(
self,
offset_value: int,
) -> Self:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
offset_value | int | required | Number of records to skip |
Returns: Self for method chaining.
Raises:
InvalidOffsetError-- If the offset value is negative.
Note
If offset() is called without a prior limit(), the limit is automatically set to -1 (unlimited) to satisfy SQLite's requirement that OFFSET must be paired with LIMIT.
Example:
# Pagination: page 2, 10 items per page
db.select(User).limit(10).offset(10).fetch_all()
Ordering
order()
Order the query results by a field.
def order(
self,
order_by_field: str | None = None,
direction: str | None = None,
*,
reverse: bool = False,
) -> Self:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
order_by_field | str | None | None | Field to order by; defaults to pk |
direction | str | None | None | Deprecated. Use reverse instead |
reverse | bool | False | If True, sort descending |
Returns: Self for method chaining.
Raises:
InvalidOrderError-- If the field does not exist, or if bothdirectionandreverseare specified.
Warns:
DeprecationWarning-- Ifdirectionis used.
Caution
The direction parameter is deprecated. Use reverse=True for descending order instead.
Example:
# Ascending (default)
db.select(User).order("name").fetch_all()
# Descending
db.select(User).order("created_at", reverse=True).fetch_all()
Cache Control
bypass_cache()
Force this query to skip the cache and hit the database directly.
def bypass_cache(self) -> Self:
Returns: Self for method chaining.
Example:
fresh = db.select(User).bypass_cache().fetch_all()
cache_ttl()
Set a custom time-to-live for this query's cached result, overriding the global cache_ttl.
def cache_ttl(
self,
ttl: int,
) -> Self:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
ttl | int | required | TTL in seconds |
Returns: Self for method chaining.
Raises:
ValueError-- Ifttlis negative.
Example:
# Cache this query for 60 seconds
db.select(User).cache_ttl(60).fetch_all()
Execution Methods
fetch_all()
Execute the query and return all matching records.
def fetch_all(self) -> list[T]:
Returns: list[T] -- List of model instances.
Raises:
InvalidProjectionError-- If projection mode is active (usefetch_dicts()instead).
Example:
users = db.select(User).filter(active=True).fetch_all()
fetch_one()
Execute the query and return a single record.
def fetch_one(self) -> T | None:
Returns: T | None -- A model instance, or None if no match.
Raises:
InvalidProjectionError-- If projection mode is active (usefetch_dicts()instead).
fetch_first()
Fetch the first record (sets LIMIT 1).
def fetch_first(self) -> T | None:
Returns: T | None -- The first model instance, or None.
Raises:
InvalidProjectionError-- If projection mode is active (usefetch_dicts()instead).
fetch_last()
Fetch the last record (orders by rowid DESC, sets LIMIT 1).
def fetch_last(self) -> T | None:
Returns: T | None -- The last model instance, or None.
Raises:
InvalidProjectionError-- If projection mode is active (usefetch_dicts()instead).
count()
Count the number of matching records.
def count(self) -> int:
Returns: int -- The count of matching records.
Raises:
InvalidProjectionError-- If projection mode is active. In non-projection mode,count()executes a database-sideCOUNT(*). In projection mode, uselen(fetch_dicts())instead (note this loads matching projection rows into memory).
Example:
total = db.select(User).filter(active=True).count()
exists()
Check if any matching records exist.
def exists(self) -> bool:
Returns: bool -- True if at least one record matches.
Raises:
InvalidProjectionError-- If projection mode is active. For projection queries, uselen(fetch_dicts()) > 0for an existence check (this loads matching projection rows into memory).
Example:
if db.select(User).filter(email="[email protected]").exists():
print("User exists")
update()
Update all records matching the current query conditions with new values.
def update(self, values: dict[str, Any]) -> int:
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
values | dict[str, Any] | required | Field names and their new values |
Returns: int -- The number of records updated.
Raises:
InvalidUpdateError-- If an invalid field name is provided invalues.RecordUpdateError-- If there is an error executing the update.
Example:
# Update all users over 30 to active
count = db.select(User).filter(age__gt=30).update({"active": True})
print(f"Updated {count} users")
delete()
Delete all records matching the current query conditions.
def delete(self) -> int:
Returns: int -- The number of records deleted.
Raises:
RecordDeletionError-- If there is an error deleting the records.
Example:
deleted = db.select(User).filter(active=False).delete()
print(f"Deleted {deleted} inactive users")
Supporting Types
JoinInfo
Dataclass holding metadata for a JOIN clause. Used internally by select_related() and relationship filter traversal.
@dataclass
class JoinInfo:
alias: str
table_name: str
model_class: type[BaseDBModel]
fk_field: str
parent_alias: str
fk_column: str
join_type: str
path: str
is_nullable: bool
Fields:
| Field | Type | Description |
|---|---|---|
alias | str | Table alias (e.g., "t1", "t2") |
table_name | str | Actual database table name |
model_class | type[BaseDBModel] | Model class for the joined table |
fk_field | str | FK field name on the parent model |
parent_alias | str | Alias of the parent table |
fk_column | str | FK column name (e.g., "author_id") |
join_type | str | "LEFT" (nullable FK) or "INNER" (required FK) |
path | str | Full relationship path (e.g., "post__author") |
is_nullable | bool | Whether the FK is nullable |