Skip to content

AsyncSqliterDB

Async database entry point for SQLiter.

from sqliter.asyncio import AsyncSqliterDB

Source: sqliter/asyncio/db.py

See also: Guide -- Asyncio Support


Constructor

def __init__(
    self,
    db_filename: str | None = None,
    *,
    memory: bool = False,
    auto_commit: bool = True,
    debug: bool = False,
    logger: logging.Logger | None = None,
    reset: bool = False,
    return_local_time: bool = True,
    cache_enabled: bool = False,
    cache_max_size: int = 1000,
    cache_ttl: int | None = None,
    cache_max_memory_mb: int | None = None,
) -> None:

This matches the sync constructor closely, except reset=True is not supported directly in __init__().

Note

Use await AsyncSqliterDB.create(..., reset=True) instead of passing reset=True to the constructor.


Async Lifecycle Methods

create()

Async factory that supports reset-on-create.

@classmethod
async def create(
    cls,
    db_filename: str | None = None,
    *,
    memory: bool = False,
    auto_commit: bool = True,
    debug: bool = False,
    logger: logging.Logger | None = None,
    reset: bool = False,
    return_local_time: bool = True,
    cache_enabled: bool = False,
    cache_max_size: int = 1000,
    cache_ttl: int | None = None,
    cache_max_memory_mb: int | None = None,
) -> AsyncSqliterDB:

connect()

async def connect(self) -> aiosqlite.Connection:

close()

async def close(self) -> None:

commit()

async def commit(self) -> None:

Async context manager

async def __aenter__(self) -> Self:
async def __aexit__(self, exc_type, exc, tb) -> None:

Example:

async with AsyncSqliterDB("app.db") as db:
    ...

Warning

Breaking change in 0.21.0: async with commits or rolls back the transaction but does not close the connection on exit. Call await db.close() explicitly when the async database instance is no longer needed.


Table Methods

create_table()

async def create_table(
    self,
    model_class: type[BaseDBModel],
    *,
    exists_ok: bool = True,
    force: bool = False,
) -> None:

Creates tables, indexes, FK constraints, and ORM many-to-many junction tables.

drop_table()

async def drop_table(
    self,
    model_class: type[BaseDBModel],
) -> None:

get_table_names()

async def get_table_names(self) -> list[str]:

This is a method in the async API instead of a property because async properties cannot be awaited.

Read-Only Properties

db_filename

@property
def db_filename(self) -> str:

auto_commit

@property
def auto_commit(self) -> bool:

is_autocommit

@property
def is_autocommit(self) -> bool:

Compatibility alias for the sync property name.

debug

@property
def debug(self) -> bool:

logger

@property
def logger(self) -> logging.Logger | None:

return_local_time

@property
def return_local_time(self) -> bool:

is_connected

@property
def is_connected(self) -> bool:

is_memory

@property
def is_memory(self) -> bool:

filename

@property
def filename(self) -> str | None:

in_transaction

@property
def in_transaction(self) -> bool:

CRUD Methods

insert()

async def insert(self, model_instance: T) -> T:

bulk_insert()

async def bulk_insert(
    self,
    instances: Sequence[T],
    *,
    timestamp_override: bool = False,
) -> list[T]:

get()

async def get(
    self,
    model_class: type[T],
    primary_key_value: int,
    *,
    bypass_cache: bool = False,
    cache_ttl: int | None = None,
) -> T | None:

update()

async def update(self, model_instance: BaseDBModel) -> None:

update_where()

async def update_where(
    self,
    model_class: type[T],
    where: dict[str, Any],
    values: dict[str, Any],
) -> int:

delete()

async def delete(
    self,
    model_class: type[BaseDBModel],
    primary_key_value: int | str,
) -> None:

select()

def select(
    self,
    model_class: type[T],
    fields: list[str] | None = None,
    exclude: list[str] | None = None,
) -> AsyncQueryBuilder[T]:

This is sync to construct the query object. The query's terminal methods are awaited.

Cache Methods

cache_get()

def cache_get(
    self,
    table_name: str,
    cache_key: str,
) -> tuple[bool, Any]:

Returns a cached value for a table/query key pair.

cache_set()

def cache_set(
    self,
    table_name: str,
    cache_key: str,
    result: Any,
    ttl: int | None = None,
) -> None:

Stores a value in the query cache.

invalidate_table_cache()

def invalidate_table_cache(self, table_name: str) -> None:

Invalidates all cache entries for a specific table.

clear_cache()

def clear_cache(self) -> None:

Clears all cached query results.

reset_cache_stats()

def reset_cache_stats(self) -> None:

Resets cache hit/miss counters without changing current cached entries.

get_cache_stats()

def get_cache_stats(self) -> dict[str, int | float]:

Returns the same cache statistics as the sync API.


Query Example

users = await (
    db.select(User)
    .filter(active=True)
    .order("name")
    .limit(20)
    .fetch_all()
)

Async ORM Example

from sqliter.asyncio.orm import AsyncBaseDBModel, AsyncForeignKey


class Author(AsyncBaseDBModel):
    name: str


class Book(AsyncBaseDBModel):
    title: str
    author: AsyncForeignKey[Author] = AsyncForeignKey(
        Author,
        related_name="books",
        on_delete="CASCADE",
    )


book = await db.get(Book, 1)
if book is None:
    raise ValueError("Book not found")
author = await book.author.fetch()