Skip to content

Base SQLAlchemy

The base/sqlalchemy subpackage provides the foundational SQLAlchemy components shared across all relational database adapters, including the abstract port interface, session managers, and session manager registries.

Ports

Abstract port interface defining the contract all SQLAlchemy-based adapters must fulfil.

archipy.adapters.base.sqlalchemy.ports.AnyExecuteParams module-attribute

AnyExecuteParams = (
    _CoreMultiExecuteParams | _CoreSingleExecuteParams
)

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort

Abstract interface defining synchronous SQLAlchemy database operations.

This interface defines the contract that all synchronous SQLAlchemy adapters must implement, providing standard methods for database operations like create, read, update, delete (CRUD), as well as search and transaction management.

Implementations of this interface are responsible for handling the specific details of database interactions and connection management.

Source code in archipy/adapters/base/sqlalchemy/ports.py
class SQLAlchemyPort:
    """Abstract interface defining synchronous SQLAlchemy database operations.

    This interface defines the contract that all synchronous SQLAlchemy adapters must
    implement, providing standard methods for database operations like create,
    read, update, delete (CRUD), as well as search and transaction management.

    Implementations of this interface are responsible for handling the specific
    details of database interactions and connection management.
    """

    @abstractmethod
    def get_session(self) -> Session:
        """Retrieves a SQLAlchemy session for database operations.

        Returns:
            Session: A SQLAlchemy session object
        """
        raise NotImplementedError

    @abstractmethod
    def execute_search_query(
        self,
        entity: type[BaseEntity],
        query: Select,
        pagination: PaginationDTO | None = None,
        sort_info: SortDTO | None = None,
        has_multiple_entities: bool = False,
    ) -> tuple[list[BaseEntity], int]:
        """Executes a search query with pagination and sorting.

        Args:
            entity: The entity class to query
            query: The SQLAlchemy SELECT query
            pagination: Optional pagination settings
            sort_info: Optional sorting information
            has_multiple_entities: Optional bool.

        Returns:
            A tuple containing the list of entities and the total count
        """
        raise NotImplementedError

    @abstractmethod
    def create(self, entity: BaseEntity) -> BaseEntity | None:
        """Creates a new entity in the database.

        Args:
            entity: The entity to create

        Returns:
            The created entity (with updated attributes) or None if creation failed
        """
        raise NotImplementedError

    @abstractmethod
    def bulk_create(self, entities: list[BaseEntity]) -> list[BaseEntity] | None:
        """Creates multiple entities in the database.

        Args:
            entities: List of entities to create

        Returns:
            The list of created entities or None if creation failed
        """
        raise NotImplementedError

    @abstractmethod
    def get_by_uuid(self, entity_type: type[BaseEntity], entity_uuid: UUID) -> BaseEntity | None:
        """Retrieves an entity by its UUID.

        Args:
            entity_type: The type of entity to retrieve
            entity_uuid: The UUID of the entity

        Returns:
            The entity if found, None otherwise
        """
        raise NotImplementedError

    @abstractmethod
    def delete(self, entity: BaseEntity) -> None:
        """Deletes an entity from the database.

        Args:
            entity: The entity to delete
        """
        raise NotImplementedError

    @abstractmethod
    def bulk_delete(self, entities: list[BaseEntity]) -> None:
        """Deletes multiple entities from the database.

        Args:
            entities: List of entities to delete
        """
        raise NotImplementedError

    @abstractmethod
    def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
        """Executes a raw SQL statement.

        Args:
            statement: The SQL statement to execute
            params: Optional parameters for the SQL statement

        Returns:
            The result of the execution
        """
        raise NotImplementedError

    @abstractmethod
    def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
        """Executes a statement and returns the scalar result.

        Args:
            statement: The SQL statement to execute
            params: Optional parameters for the SQL statement

        Returns:
            The scalar result of the execution
        """
        raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.get_session abstractmethod

get_session() -> Session

Retrieves a SQLAlchemy session for database operations.

Returns:

Name Type Description
Session Session

A SQLAlchemy session object

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def get_session(self) -> Session:
    """Retrieves a SQLAlchemy session for database operations.

    Returns:
        Session: A SQLAlchemy session object
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.execute_search_query abstractmethod

execute_search_query(
    entity: type[BaseEntity],
    query: Select,
    pagination: PaginationDTO | None = None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[BaseEntity], int]

Executes a search query with pagination and sorting.

Parameters:

Name Type Description Default
entity type[BaseEntity]

The entity class to query

required
query Select

The SQLAlchemy SELECT query

required
pagination PaginationDTO | None

Optional pagination settings

None
sort_info SortDTO | None

Optional sorting information

None
has_multiple_entities bool

Optional bool.

False

Returns:

Type Description
tuple[list[BaseEntity], int]

A tuple containing the list of entities and the total count

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def execute_search_query(
    self,
    entity: type[BaseEntity],
    query: Select,
    pagination: PaginationDTO | None = None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[BaseEntity], int]:
    """Executes a search query with pagination and sorting.

    Args:
        entity: The entity class to query
        query: The SQLAlchemy SELECT query
        pagination: Optional pagination settings
        sort_info: Optional sorting information
        has_multiple_entities: Optional bool.

    Returns:
        A tuple containing the list of entities and the total count
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.create abstractmethod

create(entity: BaseEntity) -> BaseEntity | None

Creates a new entity in the database.

Parameters:

Name Type Description Default
entity BaseEntity

The entity to create

required

Returns:

Type Description
BaseEntity | None

The created entity (with updated attributes) or None if creation failed

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def create(self, entity: BaseEntity) -> BaseEntity | None:
    """Creates a new entity in the database.

    Args:
        entity: The entity to create

    Returns:
        The created entity (with updated attributes) or None if creation failed
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.bulk_create abstractmethod

bulk_create(
    entities: list[BaseEntity],
) -> list[BaseEntity] | None

Creates multiple entities in the database.

Parameters:

Name Type Description Default
entities list[BaseEntity]

List of entities to create

required

Returns:

Type Description
list[BaseEntity] | None

The list of created entities or None if creation failed

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def bulk_create(self, entities: list[BaseEntity]) -> list[BaseEntity] | None:
    """Creates multiple entities in the database.

    Args:
        entities: List of entities to create

    Returns:
        The list of created entities or None if creation failed
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.get_by_uuid abstractmethod

get_by_uuid(
    entity_type: type[BaseEntity], entity_uuid: UUID
) -> BaseEntity | None

Retrieves an entity by its UUID.

Parameters:

Name Type Description Default
entity_type type[BaseEntity]

The type of entity to retrieve

required
entity_uuid UUID

The UUID of the entity

required

Returns:

Type Description
BaseEntity | None

The entity if found, None otherwise

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def get_by_uuid(self, entity_type: type[BaseEntity], entity_uuid: UUID) -> BaseEntity | None:
    """Retrieves an entity by its UUID.

    Args:
        entity_type: The type of entity to retrieve
        entity_uuid: The UUID of the entity

    Returns:
        The entity if found, None otherwise
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.delete abstractmethod

delete(entity: BaseEntity) -> None

Deletes an entity from the database.

Parameters:

Name Type Description Default
entity BaseEntity

The entity to delete

required
Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def delete(self, entity: BaseEntity) -> None:
    """Deletes an entity from the database.

    Args:
        entity: The entity to delete
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.bulk_delete abstractmethod

bulk_delete(entities: list[BaseEntity]) -> None

Deletes multiple entities from the database.

Parameters:

Name Type Description Default
entities list[BaseEntity]

List of entities to delete

required
Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def bulk_delete(self, entities: list[BaseEntity]) -> None:
    """Deletes multiple entities from the database.

    Args:
        entities: List of entities to delete
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.execute abstractmethod

execute(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> Result[Any]

Executes a raw SQL statement.

Parameters:

Name Type Description Default
statement Executable

The SQL statement to execute

required
params AnyExecuteParams | None

Optional parameters for the SQL statement

None

Returns:

Type Description
Result[Any]

The result of the execution

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
    """Executes a raw SQL statement.

    Args:
        statement: The SQL statement to execute
        params: Optional parameters for the SQL statement

    Returns:
        The result of the execution
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.SQLAlchemyPort.scalars abstractmethod

scalars(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> ScalarResult[Any]

Executes a statement and returns the scalar result.

Parameters:

Name Type Description Default
statement Executable

The SQL statement to execute

required
params AnyExecuteParams | None

Optional parameters for the SQL statement

None

Returns:

Type Description
ScalarResult[Any]

The scalar result of the execution

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
    """Executes a statement and returns the scalar result.

    Args:
        statement: The SQL statement to execute
        params: Optional parameters for the SQL statement

    Returns:
        The scalar result of the execution
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort

Abstract interface defining asynchronous SQLAlchemy database operations.

This interface defines the contract that all asynchronous SQLAlchemy adapters must implement, providing standard methods for database operations like create, read, update, delete (CRUD), as well as search and transaction management.

Implementations of this interface are responsible for handling the specific details of asynchronous database interactions and connection management.

Source code in archipy/adapters/base/sqlalchemy/ports.py
class AsyncSQLAlchemyPort:
    """Abstract interface defining asynchronous SQLAlchemy database operations.

    This interface defines the contract that all asynchronous SQLAlchemy adapters must
    implement, providing standard methods for database operations like create,
    read, update, delete (CRUD), as well as search and transaction management.

    Implementations of this interface are responsible for handling the specific
    details of asynchronous database interactions and connection management.
    """

    @abstractmethod
    def get_session(self) -> AsyncSession:
        """Retrieves an asynchronous SQLAlchemy session for database operations.

        Returns:
            AsyncSession: An asynchronous SQLAlchemy session object
        """
        raise NotImplementedError

    @abstractmethod
    async def execute_search_query(
        self,
        entity: type[BaseEntity],
        query: Select,
        pagination: PaginationDTO | None,
        sort_info: SortDTO | None = None,
        has_multiple_entities: bool = False,
    ) -> tuple[list[BaseEntity], int]:
        """Executes a search query with pagination and sorting asynchronously.

        Args:
            entity: The entity class to query
            query: The SQLAlchemy SELECT query
            pagination: Optional pagination settings
            sort_info: Optional sorting information
            has_multiple_entities: Optional bool

        Returns:
            A tuple containing the list of entities and the total count
        """
        raise NotImplementedError

    @abstractmethod
    async def create(self, entity: BaseEntity) -> BaseEntity | None:
        """Creates a new entity in the database asynchronously.

        Args:
            entity: The entity to create

        Returns:
            The created entity (with updated attributes) or None if creation failed
        """
        raise NotImplementedError

    @abstractmethod
    async def bulk_create(self, entities: list[BaseEntity]) -> list[BaseEntity] | None:
        """Creates multiple entities in the database asynchronously.

        Args:
            entities: List of entities to create

        Returns:
            The list of created entities or None if creation failed
        """
        raise NotImplementedError

    @abstractmethod
    async def get_by_uuid(self, entity_type: type[BaseEntity], entity_uuid: UUID) -> BaseEntity | None:
        """Retrieves an entity by its UUID asynchronously.

        Args:
            entity_type: The type of entity to retrieve
            entity_uuid: The UUID of the entity

        Returns:
            The entity if found, None otherwise
        """
        raise NotImplementedError

    @abstractmethod
    async def delete(self, entity: BaseEntity) -> None:
        """Deletes an entity from the database asynchronously.

        Args:
            entity: The entity to delete
        """
        raise NotImplementedError

    @abstractmethod
    async def bulk_delete(self, entities: list[BaseEntity]) -> None:
        """Deletes multiple entities from the database asynchronously.

        Args:
            entities: List of entities to delete
        """
        raise NotImplementedError

    @abstractmethod
    async def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
        """Executes a raw SQL statement asynchronously.

        Args:
            statement: The SQL statement to execute
            params: Optional parameters for the SQL statement

        Returns:
            The result of the execution
        """
        raise NotImplementedError

    @abstractmethod
    async def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
        """Executes a statement and returns the scalar result asynchronously.

        Args:
            statement: The SQL statement to execute
            params: Optional parameters for the SQL statement

        Returns:
            The scalar result of the execution
        """
        raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.get_session abstractmethod

get_session() -> AsyncSession

Retrieves an asynchronous SQLAlchemy session for database operations.

Returns:

Name Type Description
AsyncSession AsyncSession

An asynchronous SQLAlchemy session object

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
def get_session(self) -> AsyncSession:
    """Retrieves an asynchronous SQLAlchemy session for database operations.

    Returns:
        AsyncSession: An asynchronous SQLAlchemy session object
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.execute_search_query abstractmethod async

execute_search_query(
    entity: type[BaseEntity],
    query: Select,
    pagination: PaginationDTO | None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[BaseEntity], int]

Executes a search query with pagination and sorting asynchronously.

Parameters:

Name Type Description Default
entity type[BaseEntity]

The entity class to query

required
query Select

The SQLAlchemy SELECT query

required
pagination PaginationDTO | None

Optional pagination settings

required
sort_info SortDTO | None

Optional sorting information

None
has_multiple_entities bool

Optional bool

False

Returns:

Type Description
tuple[list[BaseEntity], int]

A tuple containing the list of entities and the total count

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
async def execute_search_query(
    self,
    entity: type[BaseEntity],
    query: Select,
    pagination: PaginationDTO | None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[BaseEntity], int]:
    """Executes a search query with pagination and sorting asynchronously.

    Args:
        entity: The entity class to query
        query: The SQLAlchemy SELECT query
        pagination: Optional pagination settings
        sort_info: Optional sorting information
        has_multiple_entities: Optional bool

    Returns:
        A tuple containing the list of entities and the total count
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.create abstractmethod async

create(entity: BaseEntity) -> BaseEntity | None

Creates a new entity in the database asynchronously.

Parameters:

Name Type Description Default
entity BaseEntity

The entity to create

required

Returns:

Type Description
BaseEntity | None

The created entity (with updated attributes) or None if creation failed

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
async def create(self, entity: BaseEntity) -> BaseEntity | None:
    """Creates a new entity in the database asynchronously.

    Args:
        entity: The entity to create

    Returns:
        The created entity (with updated attributes) or None if creation failed
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.bulk_create abstractmethod async

bulk_create(
    entities: list[BaseEntity],
) -> list[BaseEntity] | None

Creates multiple entities in the database asynchronously.

Parameters:

Name Type Description Default
entities list[BaseEntity]

List of entities to create

required

Returns:

Type Description
list[BaseEntity] | None

The list of created entities or None if creation failed

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
async def bulk_create(self, entities: list[BaseEntity]) -> list[BaseEntity] | None:
    """Creates multiple entities in the database asynchronously.

    Args:
        entities: List of entities to create

    Returns:
        The list of created entities or None if creation failed
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.get_by_uuid abstractmethod async

get_by_uuid(
    entity_type: type[BaseEntity], entity_uuid: UUID
) -> BaseEntity | None

Retrieves an entity by its UUID asynchronously.

Parameters:

Name Type Description Default
entity_type type[BaseEntity]

The type of entity to retrieve

required
entity_uuid UUID

The UUID of the entity

required

Returns:

Type Description
BaseEntity | None

The entity if found, None otherwise

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
async def get_by_uuid(self, entity_type: type[BaseEntity], entity_uuid: UUID) -> BaseEntity | None:
    """Retrieves an entity by its UUID asynchronously.

    Args:
        entity_type: The type of entity to retrieve
        entity_uuid: The UUID of the entity

    Returns:
        The entity if found, None otherwise
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.delete abstractmethod async

delete(entity: BaseEntity) -> None

Deletes an entity from the database asynchronously.

Parameters:

Name Type Description Default
entity BaseEntity

The entity to delete

required
Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
async def delete(self, entity: BaseEntity) -> None:
    """Deletes an entity from the database asynchronously.

    Args:
        entity: The entity to delete
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.bulk_delete abstractmethod async

bulk_delete(entities: list[BaseEntity]) -> None

Deletes multiple entities from the database asynchronously.

Parameters:

Name Type Description Default
entities list[BaseEntity]

List of entities to delete

required
Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
async def bulk_delete(self, entities: list[BaseEntity]) -> None:
    """Deletes multiple entities from the database asynchronously.

    Args:
        entities: List of entities to delete
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.execute abstractmethod async

execute(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> Result[Any]

Executes a raw SQL statement asynchronously.

Parameters:

Name Type Description Default
statement Executable

The SQL statement to execute

required
params AnyExecuteParams | None

Optional parameters for the SQL statement

None

Returns:

Type Description
Result[Any]

The result of the execution

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
async def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
    """Executes a raw SQL statement asynchronously.

    Args:
        statement: The SQL statement to execute
        params: Optional parameters for the SQL statement

    Returns:
        The result of the execution
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.ports.AsyncSQLAlchemyPort.scalars abstractmethod async

scalars(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> ScalarResult[Any]

Executes a statement and returns the scalar result asynchronously.

Parameters:

Name Type Description Default
statement Executable

The SQL statement to execute

required
params AnyExecuteParams | None

Optional parameters for the SQL statement

None

Returns:

Type Description
ScalarResult[Any]

The scalar result of the execution

Source code in archipy/adapters/base/sqlalchemy/ports.py
@abstractmethod
async def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
    """Executes a statement and returns the scalar result asynchronously.

    Args:
        statement: The SQL statement to execute
        params: Optional parameters for the SQL statement

    Returns:
        The scalar result of the execution
    """
    raise NotImplementedError

options: show_root_toc_entry: false heading_level: 3

Session Manager Ports

Abstract interface for SQLAlchemy session managers, decoupling session lifecycle from adapter logic.

archipy.adapters.base.sqlalchemy.session_manager_ports.SessionManagerPort

Interface for SQLAlchemy session management operations.

This interface defines the contract for session management adapters, providing methods for retrieving and managing database sessions in a synchronous context.

Implementing classes must provide mechanisms to: 1. Retrieve a properly configured SQLAlchemy session 2. Release/remove sessions when they're no longer needed

Source code in archipy/adapters/base/sqlalchemy/session_manager_ports.py
class SessionManagerPort:
    """Interface for SQLAlchemy session management operations.

    This interface defines the contract for session management adapters,
    providing methods for retrieving and managing database sessions
    in a synchronous context.

    Implementing classes must provide mechanisms to:
    1. Retrieve a properly configured SQLAlchemy session
    2. Release/remove sessions when they're no longer needed
    """

    @abstractmethod
    def get_session(self) -> Session:
        """Retrieve a SQLAlchemy session.

        This method provides a database session that can be used for
        querying, creating, updating, and deleting data.

        Returns:
            Session: A SQLAlchemy session object

        Examples:
            >>> session = session_manager.get_session()
            >>> results = session.query(User).all()
        """
        raise NotImplementedError

    @abstractmethod
    def remove_session(self) -> None:
        """Remove the current session from the registry.

        This method should be called to clean up the session when it's
        no longer needed, helping to prevent resource leaks and ensure
        proper session management.
        """
        raise NotImplementedError

archipy.adapters.base.sqlalchemy.session_manager_ports.SessionManagerPort.get_session abstractmethod

get_session() -> Session

Retrieve a SQLAlchemy session.

This method provides a database session that can be used for querying, creating, updating, and deleting data.

Returns:

Name Type Description
Session Session

A SQLAlchemy session object

Examples:

>>> session = session_manager.get_session()
>>> results = session.query(User).all()
Source code in archipy/adapters/base/sqlalchemy/session_manager_ports.py
@abstractmethod
def get_session(self) -> Session:
    """Retrieve a SQLAlchemy session.

    This method provides a database session that can be used for
    querying, creating, updating, and deleting data.

    Returns:
        Session: A SQLAlchemy session object

    Examples:
        >>> session = session_manager.get_session()
        >>> results = session.query(User).all()
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.session_manager_ports.SessionManagerPort.remove_session abstractmethod

remove_session() -> None

Remove the current session from the registry.

This method should be called to clean up the session when it's no longer needed, helping to prevent resource leaks and ensure proper session management.

Source code in archipy/adapters/base/sqlalchemy/session_manager_ports.py
@abstractmethod
def remove_session(self) -> None:
    """Remove the current session from the registry.

    This method should be called to clean up the session when it's
    no longer needed, helping to prevent resource leaks and ensure
    proper session management.
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.session_manager_ports.AsyncSessionManagerPort

Interface for asynchronous SQLAlchemy session management operations.

This interface defines the contract for asynchronous session management adapters, providing methods for retrieving and managing database sessions in an asynchronous context using SQLAlchemy's async capabilities.

Implementing classes must provide mechanisms to: 1. Retrieve a properly configured asynchronous SQLAlchemy session 2. Release/remove sessions asynchronously when they're no longer needed

Source code in archipy/adapters/base/sqlalchemy/session_manager_ports.py
class AsyncSessionManagerPort:
    """Interface for asynchronous SQLAlchemy session management operations.

    This interface defines the contract for asynchronous session management adapters,
    providing methods for retrieving and managing database sessions in an
    asynchronous context using SQLAlchemy's async capabilities.

    Implementing classes must provide mechanisms to:
    1. Retrieve a properly configured asynchronous SQLAlchemy session
    2. Release/remove sessions asynchronously when they're no longer needed
    """

    @abstractmethod
    def get_session(self) -> AsyncSession:
        """Retrieve an asynchronous SQLAlchemy session.

        This method provides an async database session that can be used for
        asynchronous querying, creating, updating, and deleting data.

        Returns:
            AsyncSession: An asynchronous SQLAlchemy session object
        """
        raise NotImplementedError

    @abstractmethod
    async def remove_session(self) -> None:
        """Asynchronously remove the current session from the registry.

        This method should be called to clean up the session when it's
        no longer needed, helping to prevent resource leaks and ensure
        proper session management in async contexts.
        """
        raise NotImplementedError

archipy.adapters.base.sqlalchemy.session_manager_ports.AsyncSessionManagerPort.get_session abstractmethod

get_session() -> AsyncSession

Retrieve an asynchronous SQLAlchemy session.

This method provides an async database session that can be used for asynchronous querying, creating, updating, and deleting data.

Returns:

Name Type Description
AsyncSession AsyncSession

An asynchronous SQLAlchemy session object

Source code in archipy/adapters/base/sqlalchemy/session_manager_ports.py
@abstractmethod
def get_session(self) -> AsyncSession:
    """Retrieve an asynchronous SQLAlchemy session.

    This method provides an async database session that can be used for
    asynchronous querying, creating, updating, and deleting data.

    Returns:
        AsyncSession: An asynchronous SQLAlchemy session object
    """
    raise NotImplementedError

archipy.adapters.base.sqlalchemy.session_manager_ports.AsyncSessionManagerPort.remove_session abstractmethod async

remove_session() -> None

Asynchronously remove the current session from the registry.

This method should be called to clean up the session when it's no longer needed, helping to prevent resource leaks and ensure proper session management in async contexts.

Source code in archipy/adapters/base/sqlalchemy/session_manager_ports.py
@abstractmethod
async def remove_session(self) -> None:
    """Asynchronously remove the current session from the registry.

    This method should be called to clean up the session when it's
    no longer needed, helping to prevent resource leaks and ensure
    proper session management in async contexts.
    """
    raise NotImplementedError

options: show_root_toc_entry: false heading_level: 3

Session Manager Registry

Registry for tracking and resolving active session manager instances.

archipy.adapters.base.sqlalchemy.session_manager_registry.SessionManagerRegistry

Registry for SQLAlchemy session managers.

This registry provides a centralized access point for both synchronous and asynchronous session managers, implementing the Service Locator pattern.

Subclasses should override get_sync_manager and get_async_manager to provide concrete session managers, or use set_sync_manager and set_async_manager to register managers manually.

Examples:

>>> from archipy.adapters.postgres.sqlalchemy.session_manager_registry import PostgresSessionManagerRegistry
>>> sync_manager = PostgresSessionManagerRegistry.get_sync_manager()
>>> session = sync_manager.get_session()
Source code in archipy/adapters/base/sqlalchemy/session_manager_registry.py
class SessionManagerRegistry:
    """Registry for SQLAlchemy session managers.

    This registry provides a centralized access point for both synchronous and
    asynchronous session managers, implementing the Service Locator pattern.

    Subclasses should override get_sync_manager and get_async_manager to provide
    concrete session managers, or use set_sync_manager and set_async_manager to
    register managers manually.

    Examples:
        >>> from archipy.adapters.postgres.sqlalchemy.session_manager_registry import PostgresSessionManagerRegistry
        >>> sync_manager = PostgresSessionManagerRegistry.get_sync_manager()
        >>> session = sync_manager.get_session()
    """

    _sync_instance: ClassVar[SessionManagerPort | None] = None
    _async_instance: ClassVar[AsyncSessionManagerPort | None] = None

    @classmethod
    def get_sync_manager(cls) -> SessionManagerPort:
        """Get the synchronous session manager instance.

        Returns:
            SessionManagerPort: The registered synchronous session manager

        Raises:
            InternalError: If no synchronous session manager is set
            DatabaseConnectionError: If there's an error initializing the session manager
        """
        if cls._sync_instance is None:
            raise InternalError("Synchronous session manager not initialized")
        return cls._sync_instance

    @classmethod
    def set_sync_manager(cls, manager: SessionManagerPort) -> None:
        """Set a custom synchronous session manager.

        Args:
            manager: An instance implementing SessionManagerPort

        Raises:
            InvalidArgumentError: If the manager is None or doesn't implement SessionManagerPort
        """
        if manager is None:
            raise InvalidArgumentError("Session manager cannot be None")
        from archipy.adapters.base.sqlalchemy.session_manager_ports import SessionManagerPort

        if not isinstance(manager, SessionManagerPort):
            raise InvalidArgumentError(f"Manager must implement SessionManagerPort, got {type(manager).__name__}")
        cls._sync_instance = manager

    @classmethod
    def get_async_manager(cls) -> AsyncSessionManagerPort:
        """Get the asynchronous session manager instance.

        Returns:
            AsyncSessionManagerPort: The registered asynchronous session manager

        Raises:
            InternalError: If no asynchronous session manager is set
            DatabaseConnectionError: If there's an error initializing the session manager
        """
        if cls._async_instance is None:
            raise InternalError("Asynchronous session manager not initialized")
        return cls._async_instance

    @classmethod
    def set_async_manager(cls, manager: AsyncSessionManagerPort) -> None:
        """Set a custom asynchronous session manager.

        Args:
            manager: An instance implementing AsyncSessionManagerPort

        Raises:
            InvalidArgumentError: If the manager is None or doesn't implement AsyncSessionManagerPort
        """
        if manager is None:
            raise InvalidArgumentError("Session manager cannot be None")
        from archipy.adapters.base.sqlalchemy.session_manager_ports import AsyncSessionManagerPort

        if not isinstance(manager, AsyncSessionManagerPort):
            raise InvalidArgumentError(f"Manager must implement AsyncSessionManagerPort, got {type(manager).__name__}")
        cls._async_instance = manager

    @classmethod
    def reset(cls) -> None:
        """Reset the registry to its initial state.

        This method clears both registered managers, useful for testing.
        """
        cls._sync_instance = None
        cls._async_instance = None

archipy.adapters.base.sqlalchemy.session_manager_registry.SessionManagerRegistry.get_sync_manager classmethod

get_sync_manager() -> SessionManagerPort

Get the synchronous session manager instance.

Returns:

Name Type Description
SessionManagerPort SessionManagerPort

The registered synchronous session manager

Raises:

Type Description
InternalError

If no synchronous session manager is set

DatabaseConnectionError

If there's an error initializing the session manager

Source code in archipy/adapters/base/sqlalchemy/session_manager_registry.py
@classmethod
def get_sync_manager(cls) -> SessionManagerPort:
    """Get the synchronous session manager instance.

    Returns:
        SessionManagerPort: The registered synchronous session manager

    Raises:
        InternalError: If no synchronous session manager is set
        DatabaseConnectionError: If there's an error initializing the session manager
    """
    if cls._sync_instance is None:
        raise InternalError("Synchronous session manager not initialized")
    return cls._sync_instance

archipy.adapters.base.sqlalchemy.session_manager_registry.SessionManagerRegistry.set_sync_manager classmethod

set_sync_manager(manager: SessionManagerPort) -> None

Set a custom synchronous session manager.

Parameters:

Name Type Description Default
manager SessionManagerPort

An instance implementing SessionManagerPort

required

Raises:

Type Description
InvalidArgumentError

If the manager is None or doesn't implement SessionManagerPort

Source code in archipy/adapters/base/sqlalchemy/session_manager_registry.py
@classmethod
def set_sync_manager(cls, manager: SessionManagerPort) -> None:
    """Set a custom synchronous session manager.

    Args:
        manager: An instance implementing SessionManagerPort

    Raises:
        InvalidArgumentError: If the manager is None or doesn't implement SessionManagerPort
    """
    if manager is None:
        raise InvalidArgumentError("Session manager cannot be None")
    from archipy.adapters.base.sqlalchemy.session_manager_ports import SessionManagerPort

    if not isinstance(manager, SessionManagerPort):
        raise InvalidArgumentError(f"Manager must implement SessionManagerPort, got {type(manager).__name__}")
    cls._sync_instance = manager

archipy.adapters.base.sqlalchemy.session_manager_registry.SessionManagerRegistry.get_async_manager classmethod

get_async_manager() -> AsyncSessionManagerPort

Get the asynchronous session manager instance.

Returns:

Name Type Description
AsyncSessionManagerPort AsyncSessionManagerPort

The registered asynchronous session manager

Raises:

Type Description
InternalError

If no asynchronous session manager is set

DatabaseConnectionError

If there's an error initializing the session manager

Source code in archipy/adapters/base/sqlalchemy/session_manager_registry.py
@classmethod
def get_async_manager(cls) -> AsyncSessionManagerPort:
    """Get the asynchronous session manager instance.

    Returns:
        AsyncSessionManagerPort: The registered asynchronous session manager

    Raises:
        InternalError: If no asynchronous session manager is set
        DatabaseConnectionError: If there's an error initializing the session manager
    """
    if cls._async_instance is None:
        raise InternalError("Asynchronous session manager not initialized")
    return cls._async_instance

archipy.adapters.base.sqlalchemy.session_manager_registry.SessionManagerRegistry.set_async_manager classmethod

set_async_manager(manager: AsyncSessionManagerPort) -> None

Set a custom asynchronous session manager.

Parameters:

Name Type Description Default
manager AsyncSessionManagerPort

An instance implementing AsyncSessionManagerPort

required

Raises:

Type Description
InvalidArgumentError

If the manager is None or doesn't implement AsyncSessionManagerPort

Source code in archipy/adapters/base/sqlalchemy/session_manager_registry.py
@classmethod
def set_async_manager(cls, manager: AsyncSessionManagerPort) -> None:
    """Set a custom asynchronous session manager.

    Args:
        manager: An instance implementing AsyncSessionManagerPort

    Raises:
        InvalidArgumentError: If the manager is None or doesn't implement AsyncSessionManagerPort
    """
    if manager is None:
        raise InvalidArgumentError("Session manager cannot be None")
    from archipy.adapters.base.sqlalchemy.session_manager_ports import AsyncSessionManagerPort

    if not isinstance(manager, AsyncSessionManagerPort):
        raise InvalidArgumentError(f"Manager must implement AsyncSessionManagerPort, got {type(manager).__name__}")
    cls._async_instance = manager

archipy.adapters.base.sqlalchemy.session_manager_registry.SessionManagerRegistry.reset classmethod

reset() -> None

Reset the registry to its initial state.

This method clears both registered managers, useful for testing.

Source code in archipy/adapters/base/sqlalchemy/session_manager_registry.py
@classmethod
def reset(cls) -> None:
    """Reset the registry to its initial state.

    This method clears both registered managers, useful for testing.
    """
    cls._sync_instance = None
    cls._async_instance = None

options: show_root_toc_entry: false heading_level: 3

Session Managers

Concrete session manager implementations that handle SQLAlchemy session creation, scoping, and teardown.

archipy.adapters.base.sqlalchemy.session_managers.ConfigT module-attribute

ConfigT = TypeVar('ConfigT', bound=SQLAlchemyConfig)

archipy.adapters.base.sqlalchemy.session_managers.BaseSQLAlchemySessionManager

Bases: SessionManagerPort

Base synchronous SQLAlchemy session manager.

Implements the SessionManagerPort interface to provide session management for synchronous database operations. Database-specific session managers should inherit from this class and implement database-specific engine creation.

Parameters:

Name Type Description Default
orm_config ConfigT

SQLAlchemy configuration. Must match the expected config type for the database.

required
Source code in archipy/adapters/base/sqlalchemy/session_managers.py
class BaseSQLAlchemySessionManager[ConfigT: SQLAlchemyConfig](SessionManagerPort):
    """Base synchronous SQLAlchemy session manager.

    Implements the SessionManagerPort interface to provide session management for
    synchronous database operations. Database-specific session managers should inherit
    from this class and implement database-specific engine creation.

    Args:
        orm_config: SQLAlchemy configuration. Must match the expected config type for the database.
    """

    def __init__(self, orm_config: ConfigT) -> None:
        """Initialize the base session manager.

        Args:
            orm_config: SQLAlchemy configuration.

        Raises:
            InvalidArgumentError: If the configuration type is invalid.
            DatabaseConnectionError: If there's an error creating the database connection.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        if not isinstance(orm_config, self._expected_config_type()):
            raise InvalidArgumentError(
                f"Expected {self._expected_config_type().__name__}, got {type(orm_config).__name__}",
            )
        try:
            self.engine = self._create_engine(orm_config)
            self._session_generator = self._get_session_generator()
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

    @abstractmethod
    def _expected_config_type(self) -> type[SQLAlchemyConfig]:
        """Return the expected configuration type for the database.

        Returns:
            The SQLAlchemy configuration class expected by this session manager.
        """

    @abstractmethod
    def _get_database_name(self) -> str:
        """Return the name of the database being used.

        Returns:
            str: The name of the database (e.g., 'postgresql', 'sqlite', 'starrocks').
        """

    @abstractmethod
    def _create_url(self, configs: ConfigT) -> URL:
        """Create a database connection URL.

        Args:
            configs: Database-specific configuration.

        Returns:
            A SQLAlchemy URL object for the database.

        Raises:
            DatabaseConnectionError: If there's an error creating the URL.
        """

    def _create_engine(self, configs: ConfigT) -> Engine:
        """Create a SQLAlchemy engine with common configuration.

        Args:
            configs: SQLAlchemy configuration.

        Returns:
            A configured SQLAlchemy engine.

        Raises:
            DatabaseConnectionError: If there's an error creating the engine.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        try:
            url = self._create_url(configs)
            return create_engine(
                url,
                isolation_level=configs.ISOLATION_LEVEL,
                echo=configs.ECHO,
                echo_pool=configs.ECHO_POOL,
                enable_from_linting=configs.ENABLE_FROM_LINTING,
                hide_parameters=configs.HIDE_PARAMETERS,
                pool_pre_ping=configs.POOL_PRE_PING,
                pool_size=configs.POOL_SIZE,
                pool_recycle=configs.POOL_RECYCLE_SECONDS,
                pool_reset_on_return=configs.POOL_RESET_ON_RETURN,
                pool_timeout=configs.POOL_TIMEOUT,
                pool_use_lifo=configs.POOL_USE_LIFO,
                query_cache_size=configs.QUERY_CACHE_SIZE,
                max_overflow=configs.POOL_MAX_OVERFLOW,
                connect_args=self._get_connect_args(),
            )
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

    def _get_connect_args(self) -> dict:
        """Return additional connection arguments for the engine.

        Returns:
            A dictionary of connection arguments (default is empty).
        """
        return {}

    def _get_session_generator(self) -> scoped_session:
        """Create a scoped session factory for synchronous sessions.

        Returns:
            A scoped_session instance used by `get_session` to provide thread-safe sessions.

        Raises:
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        try:
            session_maker = sessionmaker(self.engine)
            return scoped_session(session_maker)
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseError(
                database=self._get_database_name(),
            ) from e

    @override
    def get_session(self) -> Session:
        """Retrieve a thread-safe SQLAlchemy session.

        Returns:
            Session: A SQLAlchemy session instance for database operations.

        Raises:
            DatabaseConnectionError: If there's an error creating the session.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        try:
            return self._session_generator()
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

    @override
    def remove_session(self) -> None:
        """Remove the current session from the registry.

        Cleans up the session to prevent resource leaks, typically called at the end
        of a request.

        Raises:
            DatabaseConnectionError: If there's an error removing the session.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        try:
            self._session_generator.remove()
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

archipy.adapters.base.sqlalchemy.session_managers.BaseSQLAlchemySessionManager.engine instance-attribute

engine = _create_engine(orm_config)

archipy.adapters.base.sqlalchemy.session_managers.BaseSQLAlchemySessionManager.get_session

get_session() -> Session

Retrieve a thread-safe SQLAlchemy session.

Returns:

Name Type Description
Session Session

A SQLAlchemy session instance for database operations.

Raises:

Type Description
DatabaseConnectionError

If there's an error creating the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/session_managers.py
@override
def get_session(self) -> Session:
    """Retrieve a thread-safe SQLAlchemy session.

    Returns:
        Session: A SQLAlchemy session instance for database operations.

    Raises:
        DatabaseConnectionError: If there's an error creating the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    try:
        return self._session_generator()
    except SQLAlchemyError as e:
        if "configuration" in str(e).lower():
            raise DatabaseConfigurationError(
                database=self._get_database_name(),
            ) from e
        raise DatabaseConnectionError(
            database=self._get_database_name(),
        ) from e

archipy.adapters.base.sqlalchemy.session_managers.BaseSQLAlchemySessionManager.remove_session

remove_session() -> None

Remove the current session from the registry.

Cleans up the session to prevent resource leaks, typically called at the end of a request.

Raises:

Type Description
DatabaseConnectionError

If there's an error removing the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/session_managers.py
@override
def remove_session(self) -> None:
    """Remove the current session from the registry.

    Cleans up the session to prevent resource leaks, typically called at the end
    of a request.

    Raises:
        DatabaseConnectionError: If there's an error removing the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    try:
        self._session_generator.remove()
    except SQLAlchemyError as e:
        if "configuration" in str(e).lower():
            raise DatabaseConfigurationError(
                database=self._get_database_name(),
            ) from e
        raise DatabaseConnectionError(
            database=self._get_database_name(),
        ) from e

archipy.adapters.base.sqlalchemy.session_managers.AsyncBaseSQLAlchemySessionManager

Bases: AsyncSessionManagerPort

Base asynchronous SQLAlchemy session manager.

Implements the AsyncSessionManagerPort interface to provide session management for asynchronous database operations. Database-specific session managers should inherit from this class and implement database-specific async engine creation.

Parameters:

Name Type Description Default
orm_config ConfigT

SQLAlchemy configuration. Must match the expected config type for the database.

required
Source code in archipy/adapters/base/sqlalchemy/session_managers.py
class AsyncBaseSQLAlchemySessionManager[ConfigT: SQLAlchemyConfig](AsyncSessionManagerPort):
    """Base asynchronous SQLAlchemy session manager.

    Implements the AsyncSessionManagerPort interface to provide session management for
    asynchronous database operations. Database-specific session managers should inherit
    from this class and implement database-specific async engine creation.

    Args:
        orm_config: SQLAlchemy configuration. Must match the expected config type for the database.
    """

    def __init__(self, orm_config: ConfigT) -> None:
        """Initialize the base async session manager.

        Args:
            orm_config: SQLAlchemy configuration.

        Raises:
            InvalidArgumentError: If the configuration type is invalid.
            DatabaseConnectionError: If there's an error creating the database connection.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        if not isinstance(orm_config, self._expected_config_type()):
            raise InvalidArgumentError(
                f"Expected {self._expected_config_type().__name__}, got {type(orm_config).__name__}",
            )
        try:
            self.engine = self._create_async_engine(orm_config)
            self._session_generator = self._get_session_generator()
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

    @abstractmethod
    def _expected_config_type(self) -> type[SQLAlchemyConfig]:
        """Return the expected configuration type for the database.

        Returns:
            The SQLAlchemy configuration class expected by this session manager.
        """

    @abstractmethod
    def _get_database_name(self) -> str:
        """Return the name of the database being used.

        Returns:
            str: The name of the database (e.g., 'postgresql', 'sqlite', 'starrocks').
        """

    @abstractmethod
    def _create_url(self, configs: ConfigT) -> URL:
        """Create a database connection URL.

        Args:
            configs: Database-specific configuration.

        Returns:
            A SQLAlchemy URL object for the database.

        Raises:
            DatabaseConnectionError: If there's an error creating the URL.
        """

    def _create_async_engine(self, configs: ConfigT) -> AsyncEngine:
        """Create an async SQLAlchemy engine with common configuration.

        Args:
            configs: SQLAlchemy configuration.

        Returns:
            A configured async SQLAlchemy engine.

        Raises:
            DatabaseConnectionError: If there's an error creating the engine.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        try:
            url = self._create_url(configs)
            return create_async_engine(
                url,
                isolation_level=configs.ISOLATION_LEVEL,
                echo=configs.ECHO,
                echo_pool=configs.ECHO_POOL,
                enable_from_linting=configs.ENABLE_FROM_LINTING,
                hide_parameters=configs.HIDE_PARAMETERS,
                pool_pre_ping=configs.POOL_PRE_PING,
                pool_size=configs.POOL_SIZE,
                pool_recycle=configs.POOL_RECYCLE_SECONDS,
                pool_reset_on_return=configs.POOL_RESET_ON_RETURN,
                pool_timeout=configs.POOL_TIMEOUT,
                pool_use_lifo=configs.POOL_USE_LIFO,
                query_cache_size=configs.QUERY_CACHE_SIZE,
                max_overflow=configs.POOL_MAX_OVERFLOW,
                connect_args=self._get_connect_args(),
            )
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

    def _get_connect_args(self) -> dict:
        """Return additional connection arguments for the engine.

        Returns:
            A dictionary of connection arguments (default is empty).
        """
        return {}

    def _get_session_generator(self) -> async_scoped_session:
        """Create an async scoped session factory.

        Returns:
            An async_scoped_session instance used by `get_session` to provide task-safe sessions.

        Raises:
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        try:
            session_maker = async_sessionmaker(self.engine)
            return async_scoped_session(session_maker, scopefunc=current_task)
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseError(
                database=self._get_database_name(),
            ) from e

    @override
    def get_session(self) -> AsyncSession:
        """Retrieve a task-safe async SQLAlchemy session.

        Returns:
            AsyncSession: An async SQLAlchemy session instance for database operations.

        Raises:
            DatabaseConnectionError: If there's an error creating the session.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        try:
            return self._session_generator()
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

    @override
    async def remove_session(self) -> None:
        """Remove the current session from the registry.

        Cleans up the session to prevent resource leaks, typically called at the end
        of a request.

        Raises:
            DatabaseConnectionError: If there's an error removing the session.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        try:
            await self._session_generator.remove()
        except SQLAlchemyError as e:
            if "configuration" in str(e).lower():
                raise DatabaseConfigurationError(
                    database=self._get_database_name(),
                ) from e
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

archipy.adapters.base.sqlalchemy.session_managers.AsyncBaseSQLAlchemySessionManager.engine instance-attribute

engine = _create_async_engine(orm_config)

archipy.adapters.base.sqlalchemy.session_managers.AsyncBaseSQLAlchemySessionManager.get_session

get_session() -> AsyncSession

Retrieve a task-safe async SQLAlchemy session.

Returns:

Name Type Description
AsyncSession AsyncSession

An async SQLAlchemy session instance for database operations.

Raises:

Type Description
DatabaseConnectionError

If there's an error creating the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/session_managers.py
@override
def get_session(self) -> AsyncSession:
    """Retrieve a task-safe async SQLAlchemy session.

    Returns:
        AsyncSession: An async SQLAlchemy session instance for database operations.

    Raises:
        DatabaseConnectionError: If there's an error creating the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    try:
        return self._session_generator()
    except SQLAlchemyError as e:
        if "configuration" in str(e).lower():
            raise DatabaseConfigurationError(
                database=self._get_database_name(),
            ) from e
        raise DatabaseConnectionError(
            database=self._get_database_name(),
        ) from e

archipy.adapters.base.sqlalchemy.session_managers.AsyncBaseSQLAlchemySessionManager.remove_session async

remove_session() -> None

Remove the current session from the registry.

Cleans up the session to prevent resource leaks, typically called at the end of a request.

Raises:

Type Description
DatabaseConnectionError

If there's an error removing the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/session_managers.py
@override
async def remove_session(self) -> None:
    """Remove the current session from the registry.

    Cleans up the session to prevent resource leaks, typically called at the end
    of a request.

    Raises:
        DatabaseConnectionError: If there's an error removing the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    try:
        await self._session_generator.remove()
    except SQLAlchemyError as e:
        if "configuration" in str(e).lower():
            raise DatabaseConfigurationError(
                database=self._get_database_name(),
            ) from e
        raise DatabaseConnectionError(
            database=self._get_database_name(),
        ) from e

options: show_root_toc_entry: false heading_level: 3

Adapters

The base SQLAlchemy adapter implements generic CRUD operations that concrete database adapters (PostgreSQL, SQLite, StarRocks) inherit from.

archipy.adapters.base.sqlalchemy.adapters.T module-attribute

T = TypeVar('T', bound=BaseEntity)

archipy.adapters.base.sqlalchemy.adapters.ConfigT module-attribute

ConfigT = TypeVar('ConfigT', bound=SQLAlchemyConfig)

archipy.adapters.base.sqlalchemy.adapters.SQLAlchemyExceptionHandlerMixin

Mixin providing centralized exception handling for SQLAlchemy operations.

This mixin provides a standard method for handling database exceptions and converting them to appropriate application-specific exceptions.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
class SQLAlchemyExceptionHandlerMixin:
    """Mixin providing centralized exception handling for SQLAlchemy operations.

    This mixin provides a standard method for handling database exceptions and
    converting them to appropriate application-specific exceptions.
    """

    @classmethod
    def _handle_db_exception(cls, exception: Exception, db_name: str | None = None) -> NoReturn:
        """Handle database exceptions and raise appropriate errors.

        Args:
            exception: The exception to handle.
            db_name: Optional database name for error context.

        Raises:
            DatabaseTimeoutError: If a timeout is detected.
            DatabaseConnectionError: If a connection error is detected.
            DatabaseTransactionError: If a transaction error is detected.
            DatabaseIntegrityError: If an integrity violation is detected.
            DatabaseConstraintError: If a constraint violation is detected.
            DatabaseQueryError: For other database errors.
        """
        if "timeout" in str(exception).lower():
            raise DatabaseTimeoutError(database=db_name) from exception
        if "integrity" in str(exception).lower():
            raise DatabaseIntegrityError(database=db_name) from exception
        if "constraint" in str(exception).lower():
            raise DatabaseConstraintError(database=db_name) from exception
        if "connection" in str(exception).lower():
            raise DatabaseConnectionError(database=db_name) from exception
        if "transaction" in str(exception).lower():
            raise DatabaseTransactionError(database=db_name) from exception

        # Default error if no specific error is detected
        raise DatabaseQueryError(database=db_name) from exception

archipy.adapters.base.sqlalchemy.adapters.SQLAlchemyFilterMixin

Mixin providing filtering capabilities for SQLAlchemy queries.

Supports equality, inequality, string operations, list operations, and NULL checks.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
class SQLAlchemyFilterMixin:
    """Mixin providing filtering capabilities for SQLAlchemy queries.

    Supports equality, inequality, string operations, list operations, and NULL checks.
    """

    @staticmethod
    def _validate_list_operation(
        value: str | float | bool | Decimal | list | UUID | datetime | date | Enum | None,
        operation: FilterOperationType,
    ) -> list:
        """Validate that value is a list for list operations."""
        if not isinstance(value, list):
            raise InvalidArgumentError(f"{operation.value} operation requires a list, got {type(value)}")
        return value

    @staticmethod
    def _apply_filter(
        query: Select | Update | Delete,
        field: InstrumentedAttribute,
        value: str | float | bool | Decimal | list | UUID | datetime | date | Enum | None,
        operation: FilterOperationType,
    ) -> Select | Update | Delete:
        """Apply a filter to a SQLAlchemy query based on the specified operation.

        Args:
            query: The SQLAlchemy query to apply the filter to.
            field: The model attribute/column to filter on.
            value: The value to compare against.
            operation: The type of filter operation to apply.

        Returns:
            The updated query with the filter applied.
        """
        # Skip filter if value is None (except for IS_NULL/IS_NOT_NULL operations)
        if value is None and operation not in [FilterOperationType.IS_NULL, FilterOperationType.IS_NOT_NULL]:
            return query

        # Map operations to their corresponding SQLAlchemy expressions
        filter_map = {
            FilterOperationType.EQUAL: lambda: field == value,
            FilterOperationType.NOT_EQUAL: lambda: field != value,
            FilterOperationType.LESS_THAN: lambda: field < value,
            FilterOperationType.LESS_THAN_OR_EQUAL: lambda: field <= value,
            FilterOperationType.GREATER_THAN: lambda: field > value,
            FilterOperationType.GREATER_THAN_OR_EQUAL: lambda: field >= value,
            FilterOperationType.IN_LIST: lambda: field.in_(
                SQLAlchemyFilterMixin._validate_list_operation(value, operation),
            ),
            FilterOperationType.NOT_IN_LIST: lambda: (
                ~field.in_(
                    SQLAlchemyFilterMixin._validate_list_operation(value, operation),
                )
            ),
            FilterOperationType.LIKE: lambda: field.like(f"%{value}%"),
            FilterOperationType.ILIKE: lambda: field.ilike(f"%{value}%"),
            FilterOperationType.STARTS_WITH: lambda: field.startswith(value),
            FilterOperationType.ENDS_WITH: lambda: field.endswith(value),
            FilterOperationType.CONTAINS: lambda: field.contains(value),
            FilterOperationType.IS_NULL: lambda: field.is_(None),
            FilterOperationType.IS_NOT_NULL: lambda: field.isnot(None),
        }

        filter_expr = filter_map.get(operation)
        if filter_expr:
            return query.where(filter_expr())
        return query

archipy.adapters.base.sqlalchemy.adapters.SQLAlchemyPaginationMixin

Mixin providing pagination capabilities for SQLAlchemy queries.

Supports limiting results and applying offsets for paginated queries.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
class SQLAlchemyPaginationMixin:
    """Mixin providing pagination capabilities for SQLAlchemy queries.

    Supports limiting results and applying offsets for paginated queries.
    """

    @staticmethod
    def _apply_pagination(query: Select, pagination: PaginationDTO | None) -> Select:
        """Apply pagination to a SQLAlchemy query.

        Args:
            query: The SQLAlchemy query to paginate.
            pagination: Pagination settings (page size and offset).

        Returns:
            The paginated query.
        """
        if pagination is None:
            return query
        return query.limit(pagination.page_size).offset(pagination.offset)

archipy.adapters.base.sqlalchemy.adapters.SQLAlchemySortMixin

Mixin providing sorting capabilities for SQLAlchemy queries.

Supports dynamic column selection and ascending/descending order.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
class SQLAlchemySortMixin:
    """Mixin providing sorting capabilities for SQLAlchemy queries.

    Supports dynamic column selection and ascending/descending order.
    """

    @staticmethod
    def _apply_sorting(entity: type[T], query: Select, sort_info: SortDTO | None) -> Select:
        """Apply sorting to a SQLAlchemy query.

        Args:
            entity: The entity class to query.
            query: The SQLAlchemy query to sort.
            sort_info: Sorting information (column and direction).

        Returns:
            The sorted query.

        Raises:
            InvalidArgumentError: If the sort order is invalid.
        """
        if sort_info is None:
            return query
        if isinstance(sort_info.column, str):
            sort_column = getattr(entity, sort_info.column)
        elif isinstance(sort_info.column, Enum):
            sort_column = getattr(entity, sort_info.column.name.lower())
        else:
            sort_column = sort_info.column

        order_value = sort_info.order.value if isinstance(sort_info.order, Enum) else sort_info.order
        match order_value:
            case SortOrderType.ASCENDING.value:
                return query.order_by(sort_column.asc())
            case SortOrderType.DESCENDING.value:
                return query.order_by(sort_column.desc())
            case _:
                raise InvalidArgumentError(argument_name="sort_info.order")

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter

Bases: SQLAlchemyPort, SQLAlchemyPaginationMixin, SQLAlchemySortMixin, SQLAlchemyFilterMixin, SQLAlchemyExceptionHandlerMixin

Base synchronous SQLAlchemy adapter for ORM operations.

Provides a standardized interface for CRUD operations, pagination, sorting, and filtering. Specific database adapters should inherit from this class and provide their own session manager.

Parameters:

Name Type Description Default
orm_config ConfigT | None

Configuration for SQLAlchemy. If None, uses global config.

None
Source code in archipy/adapters/base/sqlalchemy/adapters.py
class BaseSQLAlchemyAdapter[ConfigT: SQLAlchemyConfig](
    SQLAlchemyPort,
    SQLAlchemyPaginationMixin,
    SQLAlchemySortMixin,
    SQLAlchemyFilterMixin,
    SQLAlchemyExceptionHandlerMixin,
):
    """Base synchronous SQLAlchemy adapter for ORM operations.

    Provides a standardized interface for CRUD operations, pagination, sorting, and filtering.
    Specific database adapters should inherit from this class and provide their own session manager.

    Args:
        orm_config: Configuration for SQLAlchemy. If None, uses global config.
    """

    def __init__(self, orm_config: ConfigT | None = None) -> None:
        """Initialize the base adapter with a session manager.

        Args:
            orm_config: Configuration for SQLAlchemy. If None, uses global config.
        """
        configs = BaseConfig.global_config().SQLALCHEMY if orm_config is None else orm_config
        # Cast to ConfigT since subclasses will ensure the proper type
        self.session_manager: BaseSQLAlchemySessionManager[ConfigT] = self._create_session_manager(
            configs,
        )

    def _create_session_manager(self, configs: ConfigT) -> BaseSQLAlchemySessionManager[ConfigT]:
        """Create a session manager for the specific database.

        Args:
            configs: SQLAlchemy configuration.

        Returns:
            A session manager instance.
        """
        raise NotImplementedError("Subclasses must implement _create_session_manager")

    @override
    def execute_search_query(
        self,
        entity: type[T],
        query: Select,
        pagination: PaginationDTO | None = None,
        sort_info: SortDTO | None = None,
        has_multiple_entities: bool = False,
    ) -> tuple[list[T], int]:
        """Execute a search query with pagination and sorting.

        Args:
            entity: The entity class to query.
            query: The SQLAlchemy SELECT query.
            pagination: Optional pagination settings.
            sort_info: Optional sorting information.
            has_multiple_entities: Optional bool.

        Returns:
            Tuple of the list of entities and the total count.

        Raises:
            DatabaseQueryError: If the database query fails.
            DatabaseTimeoutError: If the query times out.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        try:
            sort_info = sort_info or SortDTO.default()
            session = self.get_session()
            sorted_query = self._apply_sorting(entity, query, sort_info)
            paginated_query = self._apply_pagination(sorted_query, pagination)
            result_set = session.execute(paginated_query)
            if has_multiple_entities:
                # For multiple entities, fetchall returns list of Row objects
                raw_results = list(result_set.fetchall())
                # Convert to list[T] - each row contains entities of type T
                # Use tuple unpacking to access the first element
                results: list[T] = []
                for row in raw_results:
                    if row:
                        # Row supports indexing and tuple unpacking
                        row_tuple = tuple(row)
                        if row_tuple:
                            first_entity = row_tuple[0]
                            # first_entity is T (entity type), verify it's an instance
                            if isinstance(first_entity, entity):
                                results.append(first_entity)
            else:
                # For single entity, scalars() returns list[T] directly
                results = list(result_set.scalars().all())
            count_query = select(func.count()).select_from(query.subquery())
            total_count = session.execute(count_query).scalar_one()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            # Type: results is list[T] where T extends BaseEntity, total_count is int
            return results, total_count

    @override
    def get_session(self) -> Session:
        """Get a database session.

        Returns:
            Session: A SQLAlchemy session.

        Raises:
            DatabaseConnectionError: If there's an error getting the session.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        return self.session_manager.get_session()

    @override
    def create(self, entity: T) -> T | None:
        """Create a new entity in the database.

        Args:
            entity: The entity to create.

        Returns:
            The created entity with updated attributes, preserving the original type.

        Raises:
            InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseIntegrityError: If there's an integrity constraint violation.
            DatabaseConstraintError: If there's a constraint violation.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not isinstance(entity, BaseEntity):
            raise InvalidEntityTypeError(
                message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
                expected_type="BaseEntity",
                actual_type=type(entity).__name__,
            )

        try:
            session = self.get_session()
            session.add(entity)
            session.flush()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            return entity

    @override
    def bulk_create(self, entities: list[T]) -> list[T] | None:
        """Creates multiple entities in a single database operation.

        Args:
            entities: List of entities to create.

        Returns:
            List of created entities with updated attributes, preserving original types.

        Raises:
            InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseIntegrityError: If there's an integrity constraint violation.
            DatabaseConstraintError: If there's a constraint violation.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not all(isinstance(entity, BaseEntity) for entity in entities):
            raise InvalidEntityTypeError(
                message="All entities must be BaseEntity subclasses",
                expected_type="BaseEntity",
                actual_type="mixed",
            )

        try:
            session = self.get_session()
            session.add_all(entities)
            session.flush()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            return entities

    @override
    def get_by_uuid(self, entity_type: type[T], entity_uuid: UUID) -> BaseEntity | None:
        """Retrieve an entity by its UUID.

        Args:
            entity_type: The type of entity to retrieve.
            entity_uuid: The UUID of the entity.

        Returns:
            The entity if found, None otherwise.

        Raises:
            InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseTimeoutError: If the query times out.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not issubclass(entity_type, BaseEntity):
            raise InvalidEntityTypeError(
                message=f"Expected BaseEntity subclass, got {entity_type.__name__}",
                expected_type="BaseEntity",
                actual_type=entity_type.__name__,
            )

        try:
            session = self.get_session()
            result = session.get(entity_type, entity_uuid)
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            # result is T | None where T extends BaseEntity, compatible with BaseEntity | None
            # The type checker needs explicit type annotation to understand the relationship
            typed_result: BaseEntity | None = result
            return typed_result

    @override
    def delete(self, entity: T) -> None:
        """Delete an entity from the database.

        Args:
            entity: The entity to delete.

        Raises:
            InvalidEntityTypeError: If the entity is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseIntegrityError: If there's an integrity constraint violation.
            DatabaseConstraintError: If there's a constraint violation.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not isinstance(entity, BaseEntity):
            raise InvalidEntityTypeError(
                message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
                expected_type="BaseEntity",
                actual_type=type(entity).__name__,
            )

        try:
            session = self.get_session()
            session.delete(entity)
            session.flush()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())

    @override
    def bulk_delete(self, entities: list[T]) -> None:
        """Delete multiple entities from the database.

        Args:
            entities: List of entities to delete.

        Raises:
            InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseIntegrityError: If there's an integrity constraint violation.
            DatabaseConstraintError: If there's a constraint violation.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not all(isinstance(entity, BaseEntity) for entity in entities):
            raise InvalidEntityTypeError(
                message="All entities must be BaseEntity subclasses",
                expected_type="BaseEntity",
                actual_type="mixed",
            )

        try:
            session = self.get_session()
            for entity in entities:
                session.delete(entity)
            session.flush()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())

    @override
    def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
        """Execute a SQLAlchemy statement.

        Args:
            statement: The SQLAlchemy statement to execute.
            params: Optional parameters for the statement.

        Returns:
            The result of the execution.

        Raises:
            DatabaseQueryError: If the database operation fails.
            DatabaseTimeoutError: If the query times out.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        try:
            session = self.get_session()
            result = session.execute(statement, params or {})
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            return result

    @override
    def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
        """Execute a SQLAlchemy statement and return scalar results.

        Args:
            statement: The SQLAlchemy statement to execute.
            params: Optional parameters for the statement.

        Returns:
            The scalar results of the execution.

        Raises:
            DatabaseQueryError: If the database operation fails.
            DatabaseTimeoutError: If the query times out.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        try:
            session = self.get_session()
            result = session.scalars(statement, params or {})
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            return result

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.session_manager instance-attribute

session_manager: BaseSQLAlchemySessionManager[ConfigT] = (
    _create_session_manager(configs)
)

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.execute_search_query

execute_search_query(
    entity: type[T],
    query: Select,
    pagination: PaginationDTO | None = None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[T], int]

Execute a search query with pagination and sorting.

Parameters:

Name Type Description Default
entity type[T]

The entity class to query.

required
query Select

The SQLAlchemy SELECT query.

required
pagination PaginationDTO | None

Optional pagination settings.

None
sort_info SortDTO | None

Optional sorting information.

None
has_multiple_entities bool

Optional bool.

False

Returns:

Type Description
tuple[list[T], int]

Tuple of the list of entities and the total count.

Raises:

Type Description
DatabaseQueryError

If the database query fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def execute_search_query(
    self,
    entity: type[T],
    query: Select,
    pagination: PaginationDTO | None = None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[T], int]:
    """Execute a search query with pagination and sorting.

    Args:
        entity: The entity class to query.
        query: The SQLAlchemy SELECT query.
        pagination: Optional pagination settings.
        sort_info: Optional sorting information.
        has_multiple_entities: Optional bool.

    Returns:
        Tuple of the list of entities and the total count.

    Raises:
        DatabaseQueryError: If the database query fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        sort_info = sort_info or SortDTO.default()
        session = self.get_session()
        sorted_query = self._apply_sorting(entity, query, sort_info)
        paginated_query = self._apply_pagination(sorted_query, pagination)
        result_set = session.execute(paginated_query)
        if has_multiple_entities:
            # For multiple entities, fetchall returns list of Row objects
            raw_results = list(result_set.fetchall())
            # Convert to list[T] - each row contains entities of type T
            # Use tuple unpacking to access the first element
            results: list[T] = []
            for row in raw_results:
                if row:
                    # Row supports indexing and tuple unpacking
                    row_tuple = tuple(row)
                    if row_tuple:
                        first_entity = row_tuple[0]
                        # first_entity is T (entity type), verify it's an instance
                        if isinstance(first_entity, entity):
                            results.append(first_entity)
        else:
            # For single entity, scalars() returns list[T] directly
            results = list(result_set.scalars().all())
        count_query = select(func.count()).select_from(query.subquery())
        total_count = session.execute(count_query).scalar_one()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        # Type: results is list[T] where T extends BaseEntity, total_count is int
        return results, total_count

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.get_session

get_session() -> Session

Get a database session.

Returns:

Name Type Description
Session Session

A SQLAlchemy session.

Raises:

Type Description
DatabaseConnectionError

If there's an error getting the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def get_session(self) -> Session:
    """Get a database session.

    Returns:
        Session: A SQLAlchemy session.

    Raises:
        DatabaseConnectionError: If there's an error getting the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    return self.session_manager.get_session()

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.create

create(entity: T) -> T | None

Create a new entity in the database.

Parameters:

Name Type Description Default
entity T

The entity to create.

required

Returns:

Type Description
T | None

The created entity with updated attributes, preserving the original type.

Raises:

Type Description
InvalidEntityTypeError

If the entity type is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def create(self, entity: T) -> T | None:
    """Create a new entity in the database.

    Args:
        entity: The entity to create.

    Returns:
        The created entity with updated attributes, preserving the original type.

    Raises:
        InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
            expected_type="BaseEntity",
            actual_type=type(entity).__name__,
        )

    try:
        session = self.get_session()
        session.add(entity)
        session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return entity

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.bulk_create

bulk_create(entities: list[T]) -> list[T] | None

Creates multiple entities in a single database operation.

Parameters:

Name Type Description Default
entities list[T]

List of entities to create.

required

Returns:

Type Description
list[T] | None

List of created entities with updated attributes, preserving original types.

Raises:

Type Description
InvalidEntityTypeError

If any entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def bulk_create(self, entities: list[T]) -> list[T] | None:
    """Creates multiple entities in a single database operation.

    Args:
        entities: List of entities to create.

    Returns:
        List of created entities with updated attributes, preserving original types.

    Raises:
        InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not all(isinstance(entity, BaseEntity) for entity in entities):
        raise InvalidEntityTypeError(
            message="All entities must be BaseEntity subclasses",
            expected_type="BaseEntity",
            actual_type="mixed",
        )

    try:
        session = self.get_session()
        session.add_all(entities)
        session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return entities

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.get_by_uuid

get_by_uuid(
    entity_type: type[T], entity_uuid: UUID
) -> BaseEntity | None

Retrieve an entity by its UUID.

Parameters:

Name Type Description Default
entity_type type[T]

The type of entity to retrieve.

required
entity_uuid UUID

The UUID of the entity.

required

Returns:

Type Description
BaseEntity | None

The entity if found, None otherwise.

Raises:

Type Description
InvalidEntityTypeError

If the entity type is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def get_by_uuid(self, entity_type: type[T], entity_uuid: UUID) -> BaseEntity | None:
    """Retrieve an entity by its UUID.

    Args:
        entity_type: The type of entity to retrieve.
        entity_uuid: The UUID of the entity.

    Returns:
        The entity if found, None otherwise.

    Raises:
        InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not issubclass(entity_type, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {entity_type.__name__}",
            expected_type="BaseEntity",
            actual_type=entity_type.__name__,
        )

    try:
        session = self.get_session()
        result = session.get(entity_type, entity_uuid)
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        # result is T | None where T extends BaseEntity, compatible with BaseEntity | None
        # The type checker needs explicit type annotation to understand the relationship
        typed_result: BaseEntity | None = result
        return typed_result

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.delete

delete(entity: T) -> None

Delete an entity from the database.

Parameters:

Name Type Description Default
entity T

The entity to delete.

required

Raises:

Type Description
InvalidEntityTypeError

If the entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def delete(self, entity: T) -> None:
    """Delete an entity from the database.

    Args:
        entity: The entity to delete.

    Raises:
        InvalidEntityTypeError: If the entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
            expected_type="BaseEntity",
            actual_type=type(entity).__name__,
        )

    try:
        session = self.get_session()
        session.delete(entity)
        session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.bulk_delete

bulk_delete(entities: list[T]) -> None

Delete multiple entities from the database.

Parameters:

Name Type Description Default
entities list[T]

List of entities to delete.

required

Raises:

Type Description
InvalidEntityTypeError

If any entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def bulk_delete(self, entities: list[T]) -> None:
    """Delete multiple entities from the database.

    Args:
        entities: List of entities to delete.

    Raises:
        InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not all(isinstance(entity, BaseEntity) for entity in entities):
        raise InvalidEntityTypeError(
            message="All entities must be BaseEntity subclasses",
            expected_type="BaseEntity",
            actual_type="mixed",
        )

    try:
        session = self.get_session()
        for entity in entities:
            session.delete(entity)
        session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.execute

execute(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> Result[Any]

Execute a SQLAlchemy statement.

Parameters:

Name Type Description Default
statement Executable

The SQLAlchemy statement to execute.

required
params AnyExecuteParams | None

Optional parameters for the statement.

None

Returns:

Type Description
Result[Any]

The result of the execution.

Raises:

Type Description
DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
    """Execute a SQLAlchemy statement.

    Args:
        statement: The SQLAlchemy statement to execute.
        params: Optional parameters for the statement.

    Returns:
        The result of the execution.

    Raises:
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        session = self.get_session()
        result = session.execute(statement, params or {})
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

archipy.adapters.base.sqlalchemy.adapters.BaseSQLAlchemyAdapter.scalars

scalars(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> ScalarResult[Any]

Execute a SQLAlchemy statement and return scalar results.

Parameters:

Name Type Description Default
statement Executable

The SQLAlchemy statement to execute.

required
params AnyExecuteParams | None

Optional parameters for the statement.

None

Returns:

Type Description
ScalarResult[Any]

The scalar results of the execution.

Raises:

Type Description
DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
    """Execute a SQLAlchemy statement and return scalar results.

    Args:
        statement: The SQLAlchemy statement to execute.
        params: Optional parameters for the statement.

    Returns:
        The scalar results of the execution.

    Raises:
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        session = self.get_session()
        result = session.scalars(statement, params or {})
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter

Bases: AsyncSQLAlchemyPort, SQLAlchemyPaginationMixin, SQLAlchemySortMixin, SQLAlchemyFilterMixin, SQLAlchemyExceptionHandlerMixin

Base asynchronous SQLAlchemy adapter for ORM operations.

Provides a standardized interface for CRUD operations, pagination, sorting, and filtering. Specific database adapters should inherit from this class and provide their own session manager.

Parameters:

Name Type Description Default
orm_config ConfigT | None

Configuration for SQLAlchemy. If None, uses global config.

None
Source code in archipy/adapters/base/sqlalchemy/adapters.py
class AsyncBaseSQLAlchemyAdapter[ConfigT: SQLAlchemyConfig](
    AsyncSQLAlchemyPort,
    SQLAlchemyPaginationMixin,
    SQLAlchemySortMixin,
    SQLAlchemyFilterMixin,
    SQLAlchemyExceptionHandlerMixin,
):
    """Base asynchronous SQLAlchemy adapter for ORM operations.

    Provides a standardized interface for CRUD operations, pagination, sorting, and filtering.
    Specific database adapters should inherit from this class and provide their own session manager.

    Args:
        orm_config: Configuration for SQLAlchemy. If None, uses global config.
    """

    def __init__(self, orm_config: ConfigT | None = None) -> None:
        """Initialize the base async adapter with a session manager.

        Args:
            orm_config: Configuration for SQLAlchemy. If None, uses global config.
        """
        configs = BaseConfig.global_config().SQLALCHEMY if orm_config is None else orm_config
        # Cast to ConfigT since subclasses will ensure the proper type
        self.session_manager: AsyncBaseSQLAlchemySessionManager[ConfigT] = self._create_async_session_manager(
            configs,
        )

    def _create_async_session_manager(self, configs: ConfigT) -> AsyncBaseSQLAlchemySessionManager[ConfigT]:
        """Create an async session manager for the specific database.

        Args:
            configs: SQLAlchemy configuration.

        Returns:
            An async session manager instance.
        """
        raise NotImplementedError("Subclasses must implement _create_async_session_manager")

    @override
    async def execute_search_query(
        self,
        entity: type[T],
        query: Select,
        pagination: PaginationDTO | None,
        sort_info: SortDTO | None = None,
        has_multiple_entities: bool = False,
    ) -> tuple[list[T], int]:
        """Execute a search query with pagination and sorting.

        Args:
            entity: The entity class to query.
            query: The SQLAlchemy SELECT query.
            pagination: Optional pagination settings.
            sort_info: Optional sorting information.
            has_multiple_entities: Optional bool

        Returns:
            Tuple of the list of entities and the total count.

        Raises:
            DatabaseQueryError: If the database query fails.
            DatabaseTimeoutError: If the query times out.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        try:
            sort_info = sort_info or SortDTO.default()
            session = self.get_session()
            sorted_query = self._apply_sorting(entity, query, sort_info)
            paginated_query = self._apply_pagination(sorted_query, pagination)
            result_set = await session.execute(paginated_query)
            if has_multiple_entities:
                # For multiple entities, fetchall returns list of Row objects
                raw_results = list(result_set.fetchall())
                # Convert to list[T] - each row contains entities of type T
                # Use tuple unpacking to access the first element
                results: list[T] = []
                for row in raw_results:
                    if row:
                        # Row supports indexing and tuple unpacking
                        row_tuple = tuple(row)
                        if row_tuple:
                            first_entity = row_tuple[0]
                            # first_entity is T (entity type), verify it's an instance
                            if isinstance(first_entity, entity):
                                results.append(first_entity)
            else:
                # For single entity, scalars() returns list[T] directly
                results = list(result_set.scalars().all())
            count_query = select(func.count()).select_from(query.subquery())
            total_count_result = await session.execute(count_query)
            total_count = total_count_result.scalar_one()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            # Type: results is list[T] where T extends BaseEntity, total_count is int
            return results, total_count

    @override
    def get_session(self) -> AsyncSession:
        """Get a database session.

        Returns:
            AsyncSession: A SQLAlchemy async session.

        Raises:
            DatabaseConnectionError: If there's an error getting the session.
            DatabaseConfigurationError: If there's an error in the database configuration.
        """
        return self.session_manager.get_session()

    @override
    async def create(self, entity: T) -> T | None:
        """Create a new entity in the database.

        Args:
            entity: The entity to create.

        Returns:
            The created entity with updated attributes, preserving the original type.

        Raises:
            InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseIntegrityError: If there's an integrity constraint violation.
            DatabaseConstraintError: If there's a constraint violation.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not isinstance(entity, BaseEntity):
            raise InvalidEntityTypeError(
                message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
                expected_type="BaseEntity",
                actual_type=type(entity).__name__,
            )

        try:
            session = self.get_session()
            session.add(entity)
            await session.flush()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            return entity

    @override
    async def bulk_create(self, entities: list[T]) -> list[T] | None:
        """Creates multiple entities in a single database operation.

        Args:
            entities: List of entities to create.

        Returns:
            List of created entities with updated attributes, preserving original types.

        Raises:
            InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseIntegrityError: If there's an integrity constraint violation.
            DatabaseConstraintError: If there's a constraint violation.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not all(isinstance(entity, BaseEntity) for entity in entities):
            raise InvalidEntityTypeError(
                message="All entities must be BaseEntity subclasses",
                expected_type="BaseEntity",
                actual_type="mixed",
            )

        try:
            session = self.get_session()
            session.add_all(entities)
            await session.flush()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            return entities

    @override
    async def get_by_uuid(self, entity_type: type[T], entity_uuid: UUID) -> BaseEntity | None:
        """Retrieve an entity by its UUID.

        Args:
            entity_type: The type of entity to retrieve.
            entity_uuid: The UUID of the entity.

        Returns:
            The entity if found, None otherwise.

        Raises:
            InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseTimeoutError: If the query times out.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not issubclass(entity_type, BaseEntity):
            raise InvalidEntityTypeError(
                message=f"Expected BaseEntity subclass, got {entity_type.__name__}",
                expected_type="BaseEntity",
                actual_type=entity_type.__name__,
            )

        try:
            session = self.get_session()
            result = await session.get(entity_type, entity_uuid)
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            # result is T | None where T extends BaseEntity, compatible with BaseEntity | None
            # The type checker needs explicit type annotation to understand the relationship
            typed_result: BaseEntity | None = result
            return typed_result

    @override
    async def delete(self, entity: BaseEntity) -> None:
        """Delete an entity from the database.

        Args:
            entity: The entity to delete.

        Raises:
            InvalidEntityTypeError: If the entity is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseIntegrityError: If there's an integrity constraint violation.
            DatabaseConstraintError: If there's a constraint violation.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not isinstance(entity, BaseEntity):
            raise InvalidEntityTypeError(
                message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
                expected_type="BaseEntity",
                actual_type=type(entity).__name__,
            )

        try:
            session = self.get_session()
            await session.delete(entity)
            await session.flush()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())

    @override
    async def bulk_delete(self, entities: list[T]) -> None:
        """Delete multiple entities from the database.

        Args:
            entities: List of entities to delete.

        Raises:
            InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
            DatabaseQueryError: If the database operation fails.
            DatabaseIntegrityError: If there's an integrity constraint violation.
            DatabaseConstraintError: If there's a constraint violation.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        if not all(isinstance(entity, BaseEntity) for entity in entities):
            raise InvalidEntityTypeError(
                message="All entities must be BaseEntity subclasses",
                expected_type="BaseEntity",
                actual_type="mixed",
            )

        try:
            session = self.get_session()
            for entity in entities:
                await session.delete(entity)
            await session.flush()
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())

    @override
    async def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
        """Execute a SQLAlchemy statement.

        Args:
            statement: The SQLAlchemy statement to execute.
            params: Optional parameters for the statement.

        Returns:
            The result of the execution.

        Raises:
            DatabaseQueryError: If the database operation fails.
            DatabaseTimeoutError: If the query times out.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        try:
            session = self.get_session()
            result = await session.execute(statement, params or {})
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            return result

    @override
    async def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
        """Execute a SQLAlchemy statement and return scalar results.

        Args:
            statement: The SQLAlchemy statement to execute.
            params: Optional parameters for the statement.

        Returns:
            The scalar results of the execution.

        Raises:
            DatabaseQueryError: If the database operation fails.
            DatabaseTimeoutError: If the query times out.
            DatabaseConnectionError: If there's a connection error.
            DatabaseTransactionError: If there's a transaction error.
        """
        try:
            session = self.get_session()
            result = await session.scalars(statement, params or {})
        except Exception as e:
            self._handle_db_exception(e, self.session_manager._get_database_name())
            raise  # This will never be reached, but satisfies MyPy
        else:
            return result

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.session_manager instance-attribute

session_manager: AsyncBaseSQLAlchemySessionManager[
    ConfigT
] = _create_async_session_manager(configs)

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.execute_search_query async

execute_search_query(
    entity: type[T],
    query: Select,
    pagination: PaginationDTO | None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[T], int]

Execute a search query with pagination and sorting.

Parameters:

Name Type Description Default
entity type[T]

The entity class to query.

required
query Select

The SQLAlchemy SELECT query.

required
pagination PaginationDTO | None

Optional pagination settings.

required
sort_info SortDTO | None

Optional sorting information.

None
has_multiple_entities bool

Optional bool

False

Returns:

Type Description
tuple[list[T], int]

Tuple of the list of entities and the total count.

Raises:

Type Description
DatabaseQueryError

If the database query fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def execute_search_query(
    self,
    entity: type[T],
    query: Select,
    pagination: PaginationDTO | None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[T], int]:
    """Execute a search query with pagination and sorting.

    Args:
        entity: The entity class to query.
        query: The SQLAlchemy SELECT query.
        pagination: Optional pagination settings.
        sort_info: Optional sorting information.
        has_multiple_entities: Optional bool

    Returns:
        Tuple of the list of entities and the total count.

    Raises:
        DatabaseQueryError: If the database query fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        sort_info = sort_info or SortDTO.default()
        session = self.get_session()
        sorted_query = self._apply_sorting(entity, query, sort_info)
        paginated_query = self._apply_pagination(sorted_query, pagination)
        result_set = await session.execute(paginated_query)
        if has_multiple_entities:
            # For multiple entities, fetchall returns list of Row objects
            raw_results = list(result_set.fetchall())
            # Convert to list[T] - each row contains entities of type T
            # Use tuple unpacking to access the first element
            results: list[T] = []
            for row in raw_results:
                if row:
                    # Row supports indexing and tuple unpacking
                    row_tuple = tuple(row)
                    if row_tuple:
                        first_entity = row_tuple[0]
                        # first_entity is T (entity type), verify it's an instance
                        if isinstance(first_entity, entity):
                            results.append(first_entity)
        else:
            # For single entity, scalars() returns list[T] directly
            results = list(result_set.scalars().all())
        count_query = select(func.count()).select_from(query.subquery())
        total_count_result = await session.execute(count_query)
        total_count = total_count_result.scalar_one()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        # Type: results is list[T] where T extends BaseEntity, total_count is int
        return results, total_count

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.get_session

get_session() -> AsyncSession

Get a database session.

Returns:

Name Type Description
AsyncSession AsyncSession

A SQLAlchemy async session.

Raises:

Type Description
DatabaseConnectionError

If there's an error getting the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def get_session(self) -> AsyncSession:
    """Get a database session.

    Returns:
        AsyncSession: A SQLAlchemy async session.

    Raises:
        DatabaseConnectionError: If there's an error getting the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    return self.session_manager.get_session()

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.create async

create(entity: T) -> T | None

Create a new entity in the database.

Parameters:

Name Type Description Default
entity T

The entity to create.

required

Returns:

Type Description
T | None

The created entity with updated attributes, preserving the original type.

Raises:

Type Description
InvalidEntityTypeError

If the entity type is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def create(self, entity: T) -> T | None:
    """Create a new entity in the database.

    Args:
        entity: The entity to create.

    Returns:
        The created entity with updated attributes, preserving the original type.

    Raises:
        InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
            expected_type="BaseEntity",
            actual_type=type(entity).__name__,
        )

    try:
        session = self.get_session()
        session.add(entity)
        await session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return entity

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.bulk_create async

bulk_create(entities: list[T]) -> list[T] | None

Creates multiple entities in a single database operation.

Parameters:

Name Type Description Default
entities list[T]

List of entities to create.

required

Returns:

Type Description
list[T] | None

List of created entities with updated attributes, preserving original types.

Raises:

Type Description
InvalidEntityTypeError

If any entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def bulk_create(self, entities: list[T]) -> list[T] | None:
    """Creates multiple entities in a single database operation.

    Args:
        entities: List of entities to create.

    Returns:
        List of created entities with updated attributes, preserving original types.

    Raises:
        InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not all(isinstance(entity, BaseEntity) for entity in entities):
        raise InvalidEntityTypeError(
            message="All entities must be BaseEntity subclasses",
            expected_type="BaseEntity",
            actual_type="mixed",
        )

    try:
        session = self.get_session()
        session.add_all(entities)
        await session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return entities

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.get_by_uuid async

get_by_uuid(
    entity_type: type[T], entity_uuid: UUID
) -> BaseEntity | None

Retrieve an entity by its UUID.

Parameters:

Name Type Description Default
entity_type type[T]

The type of entity to retrieve.

required
entity_uuid UUID

The UUID of the entity.

required

Returns:

Type Description
BaseEntity | None

The entity if found, None otherwise.

Raises:

Type Description
InvalidEntityTypeError

If the entity type is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def get_by_uuid(self, entity_type: type[T], entity_uuid: UUID) -> BaseEntity | None:
    """Retrieve an entity by its UUID.

    Args:
        entity_type: The type of entity to retrieve.
        entity_uuid: The UUID of the entity.

    Returns:
        The entity if found, None otherwise.

    Raises:
        InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not issubclass(entity_type, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {entity_type.__name__}",
            expected_type="BaseEntity",
            actual_type=entity_type.__name__,
        )

    try:
        session = self.get_session()
        result = await session.get(entity_type, entity_uuid)
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        # result is T | None where T extends BaseEntity, compatible with BaseEntity | None
        # The type checker needs explicit type annotation to understand the relationship
        typed_result: BaseEntity | None = result
        return typed_result

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.delete async

delete(entity: BaseEntity) -> None

Delete an entity from the database.

Parameters:

Name Type Description Default
entity BaseEntity

The entity to delete.

required

Raises:

Type Description
InvalidEntityTypeError

If the entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def delete(self, entity: BaseEntity) -> None:
    """Delete an entity from the database.

    Args:
        entity: The entity to delete.

    Raises:
        InvalidEntityTypeError: If the entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
            expected_type="BaseEntity",
            actual_type=type(entity).__name__,
        )

    try:
        session = self.get_session()
        await session.delete(entity)
        await session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.bulk_delete async

bulk_delete(entities: list[T]) -> None

Delete multiple entities from the database.

Parameters:

Name Type Description Default
entities list[T]

List of entities to delete.

required

Raises:

Type Description
InvalidEntityTypeError

If any entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def bulk_delete(self, entities: list[T]) -> None:
    """Delete multiple entities from the database.

    Args:
        entities: List of entities to delete.

    Raises:
        InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not all(isinstance(entity, BaseEntity) for entity in entities):
        raise InvalidEntityTypeError(
            message="All entities must be BaseEntity subclasses",
            expected_type="BaseEntity",
            actual_type="mixed",
        )

    try:
        session = self.get_session()
        for entity in entities:
            await session.delete(entity)
        await session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.execute async

execute(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> Result[Any]

Execute a SQLAlchemy statement.

Parameters:

Name Type Description Default
statement Executable

The SQLAlchemy statement to execute.

required
params AnyExecuteParams | None

Optional parameters for the statement.

None

Returns:

Type Description
Result[Any]

The result of the execution.

Raises:

Type Description
DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
    """Execute a SQLAlchemy statement.

    Args:
        statement: The SQLAlchemy statement to execute.
        params: Optional parameters for the statement.

    Returns:
        The result of the execution.

    Raises:
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        session = self.get_session()
        result = await session.execute(statement, params or {})
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

archipy.adapters.base.sqlalchemy.adapters.AsyncBaseSQLAlchemyAdapter.scalars async

scalars(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> ScalarResult[Any]

Execute a SQLAlchemy statement and return scalar results.

Parameters:

Name Type Description Default
statement Executable

The SQLAlchemy statement to execute.

required
params AnyExecuteParams | None

Optional parameters for the statement.

None

Returns:

Type Description
ScalarResult[Any]

The scalar results of the execution.

Raises:

Type Description
DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
    """Execute a SQLAlchemy statement and return scalar results.

    Args:
        statement: The SQLAlchemy statement to execute.
        params: Optional parameters for the statement.

    Returns:
        The scalar results of the execution.

    Raises:
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        session = self.get_session()
        result = await session.scalars(statement, params or {})
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

options: show_root_toc_entry: false heading_level: 3