Skip to content

StarRocks

The starrocks/sqlalchemy adapter provides integration with StarRocks, a high-performance analytical database compatible with the MySQL protocol, using SQLAlchemy.

Session Managers

StarRocks-specific session manager handling connection management for the StarRocks analytical engine.

archipy.adapters.starrocks.sqlalchemy.session_managers.StarRocksSQlAlchemySessionManager

Bases: BaseSQLAlchemySessionManager[StarRocksSQLAlchemyConfig]

Synchronous SQLAlchemy session manager for StarRocks.

Inherits from BaseSQLAlchemySessionManager to provide StarRocks-specific session management, including connection URL creation and engine configuration.

Parameters:

Name Type Description Default
orm_config StarRocksSQLAlchemyConfig | None

StarRocks-specific configuration. If None, uses global config.

None
Source code in archipy/adapters/starrocks/sqlalchemy/session_managers.py
class StarRocksSQlAlchemySessionManager(BaseSQLAlchemySessionManager[StarRocksSQLAlchemyConfig], metaclass=Singleton):
    """Synchronous SQLAlchemy session manager for StarRocks.

    Inherits from BaseSQLAlchemySessionManager to provide StarRocks-specific session
    management, including connection URL creation and engine configuration.

    Args:
        orm_config: StarRocks-specific configuration. If None, uses global config.
    """

    def __init__(self, orm_config: StarRocksSQLAlchemyConfig | None = None) -> None:
        """Initialize the StarRocks session manager.

        Args:
            orm_config: StarRocks-specific configuration. If None, uses global config.
        """
        configs = BaseConfig.global_config().STARROCKS_SQLALCHEMY if orm_config is None else orm_config
        super().__init__(configs)

    @override
    def _expected_config_type(self) -> type[StarRocksSQLAlchemyConfig]:
        """Return the expected configuration type for StarRocks.

        Returns:
            The StarRocksSQLAlchemyConfig class.
        """
        return StarRocksSQLAlchemyConfig

    @override
    def _get_database_name(self) -> str:
        """Return the name of the database being used.

        Returns:
            str: The name of the database ('starrocks').
        """
        return "starrocks"

    @override
    def _get_connect_args(self) -> dict:
        """Return connection arguments for StarRocks to ensure proper transaction support.

        StarRocks (using MySQL protocol) requires autocommit to be explicitly disabled
        to ensure transactions work properly with rollback support.

        Returns:
            A dictionary with autocommit=False and connect_timeout from config.
        """
        connect_args = {}

        # Add connect_timeout if configured
        if hasattr(self, "_configs"):
            if hasattr(self._configs, "CONNECT_TIMEOUT") and self._configs.CONNECT_TIMEOUT is not None:
                connect_args["connect_timeout"] = self._configs.CONNECT_TIMEOUT

        # Add StarRocks-specific setting for transaction support
        connect_args["autocommit"] = False

        return connect_args

    @override
    def _create_url(self, configs: StarRocksSQLAlchemyConfig) -> URL:
        """Create a StarRocks connection URL.

        Args:
            configs: StarRocks configuration.

        Returns:
            A SQLAlchemy URL object for StarRocks.

        Raises:
            DatabaseConnectionError: If there's an error creating the URL.
        """
        try:
            return URL.create(
                drivername=configs.DRIVER_NAME,
                username=configs.USERNAME,
                password=configs.PASSWORD,
                host=configs.HOST,
                port=configs.PORT,
                database=configs.DATABASE,
            )
        except SQLAlchemyError as e:
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

archipy.adapters.starrocks.sqlalchemy.session_managers.StarRocksSQlAlchemySessionManager.engine instance-attribute

engine = _create_engine(orm_config)

archipy.adapters.starrocks.sqlalchemy.session_managers.StarRocksSQlAlchemySessionManager.get_session

get_session() -> Session

Retrieve a thread-safe SQLAlchemy session.

Returns:

Name Type Description
Session Session

A SQLAlchemy session instance for database operations.

Raises:

Type Description
DatabaseConnectionError

If there's an error creating the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/session_managers.py
@override
def get_session(self) -> Session:
    """Retrieve a thread-safe SQLAlchemy session.

    Returns:
        Session: A SQLAlchemy session instance for database operations.

    Raises:
        DatabaseConnectionError: If there's an error creating the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    try:
        return self._session_generator()
    except SQLAlchemyError as e:
        if "configuration" in str(e).lower():
            raise DatabaseConfigurationError(
                database=self._get_database_name(),
            ) from e
        raise DatabaseConnectionError(
            database=self._get_database_name(),
        ) from e

archipy.adapters.starrocks.sqlalchemy.session_managers.StarRocksSQlAlchemySessionManager.remove_session

remove_session() -> None

Remove the current session from the registry.

Cleans up the session to prevent resource leaks, typically called at the end of a request.

Raises:

Type Description
DatabaseConnectionError

If there's an error removing the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/session_managers.py
@override
def remove_session(self) -> None:
    """Remove the current session from the registry.

    Cleans up the session to prevent resource leaks, typically called at the end
    of a request.

    Raises:
        DatabaseConnectionError: If there's an error removing the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    try:
        self._session_generator.remove()
    except SQLAlchemyError as e:
        if "configuration" in str(e).lower():
            raise DatabaseConfigurationError(
                database=self._get_database_name(),
            ) from e
        raise DatabaseConnectionError(
            database=self._get_database_name(),
        ) from e

archipy.adapters.starrocks.sqlalchemy.session_managers.AsyncStarRocksSQlAlchemySessionManager

Bases: AsyncBaseSQLAlchemySessionManager[StarRocksSQLAlchemyConfig]

Asynchronous SQLAlchemy session manager for StarRocks.

Inherits from AsyncBaseSQLAlchemySessionManager to provide async StarRocks-specific session management, including connection URL creation and async engine configuration.

Parameters:

Name Type Description Default
orm_config StarRocksSQLAlchemyConfig | None

StarRocks-specific configuration. If None, uses global config.

None
Source code in archipy/adapters/starrocks/sqlalchemy/session_managers.py
class AsyncStarRocksSQlAlchemySessionManager(
    AsyncBaseSQLAlchemySessionManager[StarRocksSQLAlchemyConfig],
    metaclass=Singleton,
):
    """Asynchronous SQLAlchemy session manager for StarRocks.

    Inherits from AsyncBaseSQLAlchemySessionManager to provide async StarRocks-specific
    session management, including connection URL creation and async engine configuration.

    Args:
        orm_config: StarRocks-specific configuration. If None, uses global config.
    """

    def __init__(self, orm_config: StarRocksSQLAlchemyConfig | None = None) -> None:
        """Initialize the async StarRocks session manager.

        Args:
            orm_config: StarRocks-specific configuration. If None, uses global config.
        """
        configs = BaseConfig.global_config().STARROCKS_SQLALCHEMY if orm_config is None else orm_config
        super().__init__(configs)

    @override
    def _expected_config_type(self) -> type[StarRocksSQLAlchemyConfig]:
        """Return the expected configuration type for StarRocks.

        Returns:
            The StarRocksSQLAlchemyConfig class.
        """
        return StarRocksSQLAlchemyConfig

    @override
    def _get_database_name(self) -> str:
        """Return the name of the database being used.

        Returns:
            str: The name of the database ('starrocks').
        """
        return "starrocks"

    @override
    def _create_url(self, configs: StarRocksSQLAlchemyConfig) -> URL:
        """Create an async StarRocks connection URL.

        For async operations, StarRocks requires the starrocks+asyncmy driver
        which uses the asyncmy library for async MySQL protocol support while
        maintaining StarRocks dialect features (type mapping, compiler patches).

        Args:
            configs: StarRocks configuration.

        Returns:
            A SQLAlchemy URL object for StarRocks with async driver.

        Raises:
            DatabaseConnectionError: If there's an error creating the URL.
        """
        try:
            return URL.create(
                drivername="starrocks+asyncmy",
                username=configs.USERNAME,
                password=configs.PASSWORD,
                host=configs.HOST,
                port=configs.PORT,
                database=configs.DATABASE,
            )
        except SQLAlchemyError as e:
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

    @override
    def _get_connect_args(self) -> dict:
        """Return connection arguments for async StarRocks to ensure proper transaction support.

        StarRocks (using MySQL protocol via asyncmy) requires autocommit to be explicitly disabled
        to ensure transactions work properly with rollback support.

        Note: asyncmy driver only supports connect_timeout, not read_timeout/write_timeout.
        These socket-level timeouts are handled differently in async drivers.

        Returns:
            A dictionary with autocommit=False and connect_timeout (no read/write timeouts for asyncmy).
        """
        connect_args = {}

        # Add connect_timeout if configured
        if hasattr(self, "_configs"):
            if hasattr(self._configs, "CONNECT_TIMEOUT") and self._configs.CONNECT_TIMEOUT is not None:
                connect_args["connect_timeout"] = self._configs.CONNECT_TIMEOUT

        # Add StarRocks async-specific setting for transaction support
        connect_args["autocommit"] = False

        return connect_args

archipy.adapters.starrocks.sqlalchemy.session_managers.AsyncStarRocksSQlAlchemySessionManager.engine instance-attribute

engine = _create_async_engine(orm_config)

archipy.adapters.starrocks.sqlalchemy.session_managers.AsyncStarRocksSQlAlchemySessionManager.get_session

get_session() -> AsyncSession

Retrieve a task-safe async SQLAlchemy session.

Returns:

Name Type Description
AsyncSession AsyncSession

An async SQLAlchemy session instance for database operations.

Raises:

Type Description
DatabaseConnectionError

If there's an error creating the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/session_managers.py
@override
def get_session(self) -> AsyncSession:
    """Retrieve a task-safe async SQLAlchemy session.

    Returns:
        AsyncSession: An async SQLAlchemy session instance for database operations.

    Raises:
        DatabaseConnectionError: If there's an error creating the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    try:
        return self._session_generator()
    except SQLAlchemyError as e:
        if "configuration" in str(e).lower():
            raise DatabaseConfigurationError(
                database=self._get_database_name(),
            ) from e
        raise DatabaseConnectionError(
            database=self._get_database_name(),
        ) from e

archipy.adapters.starrocks.sqlalchemy.session_managers.AsyncStarRocksSQlAlchemySessionManager.remove_session async

remove_session() -> None

Remove the current session from the registry.

Cleans up the session to prevent resource leaks, typically called at the end of a request.

Raises:

Type Description
DatabaseConnectionError

If there's an error removing the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/session_managers.py
@override
async def remove_session(self) -> None:
    """Remove the current session from the registry.

    Cleans up the session to prevent resource leaks, typically called at the end
    of a request.

    Raises:
        DatabaseConnectionError: If there's an error removing the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    try:
        await self._session_generator.remove()
    except SQLAlchemyError as e:
        if "configuration" in str(e).lower():
            raise DatabaseConfigurationError(
                database=self._get_database_name(),
            ) from e
        raise DatabaseConnectionError(
            database=self._get_database_name(),
        ) from e

options: show_root_toc_entry: false heading_level: 3

Session Manager Registry

Registry for StarRocks session manager instances.

archipy.adapters.starrocks.sqlalchemy.session_manager_registry.StarRocksSessionManagerRegistry

Bases: SessionManagerRegistry

Registry for StarRocks SQLAlchemy session managers.

This registry provides a centralized access point for both synchronous and asynchronous StarRocks session managers, implementing the Service Locator pattern. It lazily initializes the appropriate session manager when first requested.

The registry maintains singleton instances of: - A synchronous session manager (StarRocksSQlAlchemySessionManager) - An asynchronous session manager (AsyncStarRocksSQlAlchemySessionManager)

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
class StarRocksSessionManagerRegistry(SessionManagerRegistry, metaclass=Singleton):
    """Registry for StarRocks SQLAlchemy session managers.

    This registry provides a centralized access point for both synchronous and
    asynchronous StarRocks session managers, implementing the Service Locator pattern.
    It lazily initializes the appropriate session manager when first requested.

    The registry maintains singleton instances of:
    - A synchronous session manager (StarRocksSQlAlchemySessionManager)
    - An asynchronous session manager (AsyncStarRocksSQlAlchemySessionManager)
    """

    @classmethod
    def get_sync_manager(cls) -> SessionManagerPort:
        """Get the synchronous StarRocks session manager instance.

        Lazily initializes a default StarRocksSQlAlchemySessionManager if none has been set.

        Returns:
            SessionManagerPort: The registered synchronous session manager

        Raises:
            DatabaseConnectionError: If there's an error initializing the session manager
        """
        if cls._sync_instance is None:
            try:
                from archipy.adapters.starrocks.sqlalchemy.session_managers import StarRocksSQlAlchemySessionManager

                cls._sync_instance = StarRocksSQlAlchemySessionManager()
            except Exception as e:
                raise DatabaseConnectionError(
                    database="starrocks",
                ) from e
        return cls._sync_instance

    @classmethod
    def set_sync_manager(cls, manager: SessionManagerPort) -> None:
        """Set a custom synchronous session manager.

        Args:
            manager: An instance implementing SessionManagerPort

        Raises:
            InvalidArgumentError: If the manager is None or doesn't implement SessionManagerPort
        """
        if manager is None:
            raise InvalidArgumentError("StarRocks session manager cannot be None")
        from archipy.adapters.base.sqlalchemy.session_manager_ports import SessionManagerPort

        if not isinstance(manager, SessionManagerPort):
            raise InvalidArgumentError(f"Manager must implement SessionManagerPort, got {type(manager).__name__}")
        cls._sync_instance = manager

    @classmethod
    def get_async_manager(cls) -> AsyncSessionManagerPort:
        """Get the asynchronous StarRocks session manager instance.

        Lazily initializes a default AsyncStarRocksSQlAlchemySessionManager if none has been set.

        Returns:
            AsyncSessionManagerPort: The registered asynchronous session manager

        Raises:
            DatabaseConnectionError: If there's an error initializing the session manager
        """
        if cls._async_instance is None:
            try:
                from archipy.adapters.starrocks.sqlalchemy.session_managers import (
                    AsyncStarRocksSQlAlchemySessionManager,
                )

                cls._async_instance = AsyncStarRocksSQlAlchemySessionManager()
            except Exception as e:
                raise DatabaseConnectionError(
                    database="starrocks",
                ) from e
        return cls._async_instance

    @classmethod
    def set_async_manager(cls, manager: AsyncSessionManagerPort) -> None:
        """Set a custom asynchronous session manager.

        Args:
            manager: An instance implementing AsyncSessionManagerPort

        Raises:
            InvalidArgumentError: If the manager is None or doesn't implement AsyncSessionManagerPort
        """
        if manager is None:
            raise InvalidArgumentError("StarRocks async session manager cannot be None")
        from archipy.adapters.base.sqlalchemy.session_manager_ports import AsyncSessionManagerPort

        if not isinstance(manager, AsyncSessionManagerPort):
            raise InvalidArgumentError(f"Manager must implement AsyncSessionManagerPort, got {type(manager).__name__}")
        cls._async_instance = manager

    @classmethod
    def reset(cls) -> None:
        """Reset the registry to its initial state.

        This method clears both registered managers, useful for testing.
        """
        cls._sync_instance = None
        cls._async_instance = None

archipy.adapters.starrocks.sqlalchemy.session_manager_registry.StarRocksSessionManagerRegistry.get_sync_manager classmethod

get_sync_manager() -> SessionManagerPort

Get the synchronous StarRocks session manager instance.

Lazily initializes a default StarRocksSQlAlchemySessionManager if none has been set.

Returns:

Name Type Description
SessionManagerPort SessionManagerPort

The registered synchronous session manager

Raises:

Type Description
DatabaseConnectionError

If there's an error initializing the session manager

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def get_sync_manager(cls) -> SessionManagerPort:
    """Get the synchronous StarRocks session manager instance.

    Lazily initializes a default StarRocksSQlAlchemySessionManager if none has been set.

    Returns:
        SessionManagerPort: The registered synchronous session manager

    Raises:
        DatabaseConnectionError: If there's an error initializing the session manager
    """
    if cls._sync_instance is None:
        try:
            from archipy.adapters.starrocks.sqlalchemy.session_managers import StarRocksSQlAlchemySessionManager

            cls._sync_instance = StarRocksSQlAlchemySessionManager()
        except Exception as e:
            raise DatabaseConnectionError(
                database="starrocks",
            ) from e
    return cls._sync_instance

archipy.adapters.starrocks.sqlalchemy.session_manager_registry.StarRocksSessionManagerRegistry.set_sync_manager classmethod

set_sync_manager(manager: SessionManagerPort) -> None

Set a custom synchronous session manager.

Parameters:

Name Type Description Default
manager SessionManagerPort

An instance implementing SessionManagerPort

required

Raises:

Type Description
InvalidArgumentError

If the manager is None or doesn't implement SessionManagerPort

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def set_sync_manager(cls, manager: SessionManagerPort) -> None:
    """Set a custom synchronous session manager.

    Args:
        manager: An instance implementing SessionManagerPort

    Raises:
        InvalidArgumentError: If the manager is None or doesn't implement SessionManagerPort
    """
    if manager is None:
        raise InvalidArgumentError("StarRocks session manager cannot be None")
    from archipy.adapters.base.sqlalchemy.session_manager_ports import SessionManagerPort

    if not isinstance(manager, SessionManagerPort):
        raise InvalidArgumentError(f"Manager must implement SessionManagerPort, got {type(manager).__name__}")
    cls._sync_instance = manager

archipy.adapters.starrocks.sqlalchemy.session_manager_registry.StarRocksSessionManagerRegistry.get_async_manager classmethod

get_async_manager() -> AsyncSessionManagerPort

Get the asynchronous StarRocks session manager instance.

Lazily initializes a default AsyncStarRocksSQlAlchemySessionManager if none has been set.

Returns:

Name Type Description
AsyncSessionManagerPort AsyncSessionManagerPort

The registered asynchronous session manager

Raises:

Type Description
DatabaseConnectionError

If there's an error initializing the session manager

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def get_async_manager(cls) -> AsyncSessionManagerPort:
    """Get the asynchronous StarRocks session manager instance.

    Lazily initializes a default AsyncStarRocksSQlAlchemySessionManager if none has been set.

    Returns:
        AsyncSessionManagerPort: The registered asynchronous session manager

    Raises:
        DatabaseConnectionError: If there's an error initializing the session manager
    """
    if cls._async_instance is None:
        try:
            from archipy.adapters.starrocks.sqlalchemy.session_managers import (
                AsyncStarRocksSQlAlchemySessionManager,
            )

            cls._async_instance = AsyncStarRocksSQlAlchemySessionManager()
        except Exception as e:
            raise DatabaseConnectionError(
                database="starrocks",
            ) from e
    return cls._async_instance

archipy.adapters.starrocks.sqlalchemy.session_manager_registry.StarRocksSessionManagerRegistry.set_async_manager classmethod

set_async_manager(manager: AsyncSessionManagerPort) -> None

Set a custom asynchronous session manager.

Parameters:

Name Type Description Default
manager AsyncSessionManagerPort

An instance implementing AsyncSessionManagerPort

required

Raises:

Type Description
InvalidArgumentError

If the manager is None or doesn't implement AsyncSessionManagerPort

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def set_async_manager(cls, manager: AsyncSessionManagerPort) -> None:
    """Set a custom asynchronous session manager.

    Args:
        manager: An instance implementing AsyncSessionManagerPort

    Raises:
        InvalidArgumentError: If the manager is None or doesn't implement AsyncSessionManagerPort
    """
    if manager is None:
        raise InvalidArgumentError("StarRocks async session manager cannot be None")
    from archipy.adapters.base.sqlalchemy.session_manager_ports import AsyncSessionManagerPort

    if not isinstance(manager, AsyncSessionManagerPort):
        raise InvalidArgumentError(f"Manager must implement AsyncSessionManagerPort, got {type(manager).__name__}")
    cls._async_instance = manager

archipy.adapters.starrocks.sqlalchemy.session_manager_registry.StarRocksSessionManagerRegistry.reset classmethod

reset() -> None

Reset the registry to its initial state.

This method clears both registered managers, useful for testing.

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def reset(cls) -> None:
    """Reset the registry to its initial state.

    This method clears both registered managers, useful for testing.
    """
    cls._sync_instance = None
    cls._async_instance = None

options: show_root_toc_entry: false heading_level: 3

Adapters

Concrete StarRocks adapter built on the base SQLAlchemy adapter with StarRocks-specific dialect configuration.

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter

Bases: BaseSQLAlchemyAdapter[StarRocksSQLAlchemyConfig]

Synchronous SQLAlchemy adapter for Starrocks.

Inherits from BaseSQLAlchemyAdapter to provide Starrocks-specific session management and database operations.

Parameters:

Name Type Description Default
orm_config StarRocksSQLAlchemyConfig | None

Starrocks-specific configuration. If None, uses global config.

None
Source code in archipy/adapters/starrocks/sqlalchemy/adapters.py
class StarrocksSQLAlchemyAdapter(BaseSQLAlchemyAdapter[StarRocksSQLAlchemyConfig]):
    """Synchronous SQLAlchemy adapter for Starrocks.

    Inherits from BaseSQLAlchemyAdapter to provide Starrocks-specific session management
    and database operations.

    Args:
        orm_config: Starrocks-specific configuration. If None, uses global config.
    """

    def __init__(self, orm_config: StarRocksSQLAlchemyConfig | None = None) -> None:
        """Initialize the Starrocks adapter with a session manager.

        Args:
            orm_config: Starrocks-specific configuration. If None, uses global config.
        """
        configs = BaseConfig.global_config().STARROCKS_SQLALCHEMY if orm_config is None else orm_config
        super().__init__(configs)

    @override
    def _create_session_manager(self, configs: StarRocksSQLAlchemyConfig) -> StarRocksSQlAlchemySessionManager:
        """Create a Starrocks-specific session manager.

        Args:
            configs: Starrocks configuration.

        Returns:
            A Starrocks session manager instance.
        """
        return StarRocksSQlAlchemySessionManager(configs)

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.session_manager instance-attribute

session_manager: BaseSQLAlchemySessionManager[ConfigT] = (
    _create_session_manager(configs)
)

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.get_session

get_session() -> Session

Get a database session.

Returns:

Name Type Description
Session Session

A SQLAlchemy session.

Raises:

Type Description
DatabaseConnectionError

If there's an error getting the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def get_session(self) -> Session:
    """Get a database session.

    Returns:
        Session: A SQLAlchemy session.

    Raises:
        DatabaseConnectionError: If there's an error getting the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    return self.session_manager.get_session()

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.execute_search_query

execute_search_query(
    entity: type[T],
    query: Select,
    pagination: PaginationDTO | None = None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[T], int]

Execute a search query with pagination and sorting.

Parameters:

Name Type Description Default
entity type[T]

The entity class to query.

required
query Select

The SQLAlchemy SELECT query.

required
pagination PaginationDTO | None

Optional pagination settings.

None
sort_info SortDTO | None

Optional sorting information.

None
has_multiple_entities bool

Optional bool.

False

Returns:

Type Description
tuple[list[T], int]

Tuple of the list of entities and the total count.

Raises:

Type Description
DatabaseQueryError

If the database query fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def execute_search_query(
    self,
    entity: type[T],
    query: Select,
    pagination: PaginationDTO | None = None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[T], int]:
    """Execute a search query with pagination and sorting.

    Args:
        entity: The entity class to query.
        query: The SQLAlchemy SELECT query.
        pagination: Optional pagination settings.
        sort_info: Optional sorting information.
        has_multiple_entities: Optional bool.

    Returns:
        Tuple of the list of entities and the total count.

    Raises:
        DatabaseQueryError: If the database query fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        sort_info = sort_info or SortDTO.default()
        session = self.get_session()
        sorted_query = self._apply_sorting(entity, query, sort_info)
        paginated_query = self._apply_pagination(sorted_query, pagination)
        result_set = session.execute(paginated_query)
        if has_multiple_entities:
            # For multiple entities, fetchall returns list of Row objects
            raw_results = list(result_set.fetchall())
            # Convert to list[T] - each row contains entities of type T
            # Use tuple unpacking to access the first element
            results: list[T] = []
            for row in raw_results:
                if row:
                    # Row supports indexing and tuple unpacking
                    row_tuple = tuple(row)
                    if row_tuple:
                        first_entity = row_tuple[0]
                        # first_entity is T (entity type), verify it's an instance
                        if isinstance(first_entity, entity):
                            results.append(first_entity)
        else:
            # For single entity, scalars() returns list[T] directly
            results = list(result_set.scalars().all())
        count_query = select(func.count()).select_from(query.subquery())
        total_count = session.execute(count_query).scalar_one()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        # Type: results is list[T] where T extends BaseEntity, total_count is int
        return results, total_count

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.create

create(entity: T) -> T | None

Create a new entity in the database.

Parameters:

Name Type Description Default
entity T

The entity to create.

required

Returns:

Type Description
T | None

The created entity with updated attributes, preserving the original type.

Raises:

Type Description
InvalidEntityTypeError

If the entity type is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def create(self, entity: T) -> T | None:
    """Create a new entity in the database.

    Args:
        entity: The entity to create.

    Returns:
        The created entity with updated attributes, preserving the original type.

    Raises:
        InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
            expected_type="BaseEntity",
            actual_type=type(entity).__name__,
        )

    try:
        session = self.get_session()
        session.add(entity)
        session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return entity

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.bulk_create

bulk_create(entities: list[T]) -> list[T] | None

Creates multiple entities in a single database operation.

Parameters:

Name Type Description Default
entities list[T]

List of entities to create.

required

Returns:

Type Description
list[T] | None

List of created entities with updated attributes, preserving original types.

Raises:

Type Description
InvalidEntityTypeError

If any entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def bulk_create(self, entities: list[T]) -> list[T] | None:
    """Creates multiple entities in a single database operation.

    Args:
        entities: List of entities to create.

    Returns:
        List of created entities with updated attributes, preserving original types.

    Raises:
        InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not all(isinstance(entity, BaseEntity) for entity in entities):
        raise InvalidEntityTypeError(
            message="All entities must be BaseEntity subclasses",
            expected_type="BaseEntity",
            actual_type="mixed",
        )

    try:
        session = self.get_session()
        session.add_all(entities)
        session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return entities

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.get_by_uuid

get_by_uuid(
    entity_type: type[T], entity_uuid: UUID
) -> BaseEntity | None

Retrieve an entity by its UUID.

Parameters:

Name Type Description Default
entity_type type[T]

The type of entity to retrieve.

required
entity_uuid UUID

The UUID of the entity.

required

Returns:

Type Description
BaseEntity | None

The entity if found, None otherwise.

Raises:

Type Description
InvalidEntityTypeError

If the entity type is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def get_by_uuid(self, entity_type: type[T], entity_uuid: UUID) -> BaseEntity | None:
    """Retrieve an entity by its UUID.

    Args:
        entity_type: The type of entity to retrieve.
        entity_uuid: The UUID of the entity.

    Returns:
        The entity if found, None otherwise.

    Raises:
        InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not issubclass(entity_type, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {entity_type.__name__}",
            expected_type="BaseEntity",
            actual_type=entity_type.__name__,
        )

    try:
        session = self.get_session()
        result = session.get(entity_type, entity_uuid)
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        # result is T | None where T extends BaseEntity, compatible with BaseEntity | None
        # The type checker needs explicit type annotation to understand the relationship
        typed_result: BaseEntity | None = result
        return typed_result

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.delete

delete(entity: T) -> None

Delete an entity from the database.

Parameters:

Name Type Description Default
entity T

The entity to delete.

required

Raises:

Type Description
InvalidEntityTypeError

If the entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def delete(self, entity: T) -> None:
    """Delete an entity from the database.

    Args:
        entity: The entity to delete.

    Raises:
        InvalidEntityTypeError: If the entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
            expected_type="BaseEntity",
            actual_type=type(entity).__name__,
        )

    try:
        session = self.get_session()
        session.delete(entity)
        session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.bulk_delete

bulk_delete(entities: list[T]) -> None

Delete multiple entities from the database.

Parameters:

Name Type Description Default
entities list[T]

List of entities to delete.

required

Raises:

Type Description
InvalidEntityTypeError

If any entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def bulk_delete(self, entities: list[T]) -> None:
    """Delete multiple entities from the database.

    Args:
        entities: List of entities to delete.

    Raises:
        InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not all(isinstance(entity, BaseEntity) for entity in entities):
        raise InvalidEntityTypeError(
            message="All entities must be BaseEntity subclasses",
            expected_type="BaseEntity",
            actual_type="mixed",
        )

    try:
        session = self.get_session()
        for entity in entities:
            session.delete(entity)
        session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.execute

execute(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> Result[Any]

Execute a SQLAlchemy statement.

Parameters:

Name Type Description Default
statement Executable

The SQLAlchemy statement to execute.

required
params AnyExecuteParams | None

Optional parameters for the statement.

None

Returns:

Type Description
Result[Any]

The result of the execution.

Raises:

Type Description
DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
    """Execute a SQLAlchemy statement.

    Args:
        statement: The SQLAlchemy statement to execute.
        params: Optional parameters for the statement.

    Returns:
        The result of the execution.

    Raises:
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        session = self.get_session()
        result = session.execute(statement, params or {})
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

archipy.adapters.starrocks.sqlalchemy.adapters.StarrocksSQLAlchemyAdapter.scalars

scalars(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> ScalarResult[Any]

Execute a SQLAlchemy statement and return scalar results.

Parameters:

Name Type Description Default
statement Executable

The SQLAlchemy statement to execute.

required
params AnyExecuteParams | None

Optional parameters for the statement.

None

Returns:

Type Description
ScalarResult[Any]

The scalar results of the execution.

Raises:

Type Description
DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
    """Execute a SQLAlchemy statement and return scalar results.

    Args:
        statement: The SQLAlchemy statement to execute.
        params: Optional parameters for the statement.

    Returns:
        The scalar results of the execution.

    Raises:
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        session = self.get_session()
        result = session.scalars(statement, params or {})
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter

Bases: AsyncBaseSQLAlchemyAdapter[StarRocksSQLAlchemyConfig]

Asynchronous SQLAlchemy adapter for Starrocks.

Inherits from AsyncBaseSQLAlchemyAdapter to provide async Starrocks-specific session management and database operations.

Parameters:

Name Type Description Default
orm_config StarRocksSQLAlchemyConfig | None

Starrocks-specific configuration. If None, uses global config.

None
Source code in archipy/adapters/starrocks/sqlalchemy/adapters.py
class AsyncStarrocksSQLAlchemyAdapter(AsyncBaseSQLAlchemyAdapter[StarRocksSQLAlchemyConfig]):
    """Asynchronous SQLAlchemy adapter for Starrocks.

    Inherits from AsyncBaseSQLAlchemyAdapter to provide async Starrocks-specific session
    management and database operations.

    Args:
        orm_config: Starrocks-specific configuration. If None, uses global config.
    """

    def __init__(self, orm_config: StarRocksSQLAlchemyConfig | None = None) -> None:
        """Initialize the async Starrocks adapter with a session manager.

        Args:
            orm_config: Starrocks-specific configuration. If None, uses global config.
        """
        configs = BaseConfig.global_config().STARROCKS_SQLALCHEMY if orm_config is None else orm_config
        super().__init__(configs)

    @override
    def _create_async_session_manager(
        self,
        configs: StarRocksSQLAlchemyConfig,
    ) -> AsyncStarRocksSQlAlchemySessionManager:
        """Create an async Starrocks-specific session manager.

        Args:
            configs: Starrocks configuration.

        Returns:
            An async Starrocks session manager instance.
        """
        return AsyncStarRocksSQlAlchemySessionManager(configs)

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.session_manager instance-attribute

session_manager: AsyncBaseSQLAlchemySessionManager[
    ConfigT
] = _create_async_session_manager(configs)

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.get_session

get_session() -> AsyncSession

Get a database session.

Returns:

Name Type Description
AsyncSession AsyncSession

A SQLAlchemy async session.

Raises:

Type Description
DatabaseConnectionError

If there's an error getting the session.

DatabaseConfigurationError

If there's an error in the database configuration.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
def get_session(self) -> AsyncSession:
    """Get a database session.

    Returns:
        AsyncSession: A SQLAlchemy async session.

    Raises:
        DatabaseConnectionError: If there's an error getting the session.
        DatabaseConfigurationError: If there's an error in the database configuration.
    """
    return self.session_manager.get_session()

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.execute_search_query async

execute_search_query(
    entity: type[T],
    query: Select,
    pagination: PaginationDTO | None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[T], int]

Execute a search query with pagination and sorting.

Parameters:

Name Type Description Default
entity type[T]

The entity class to query.

required
query Select

The SQLAlchemy SELECT query.

required
pagination PaginationDTO | None

Optional pagination settings.

required
sort_info SortDTO | None

Optional sorting information.

None
has_multiple_entities bool

Optional bool

False

Returns:

Type Description
tuple[list[T], int]

Tuple of the list of entities and the total count.

Raises:

Type Description
DatabaseQueryError

If the database query fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def execute_search_query(
    self,
    entity: type[T],
    query: Select,
    pagination: PaginationDTO | None,
    sort_info: SortDTO | None = None,
    has_multiple_entities: bool = False,
) -> tuple[list[T], int]:
    """Execute a search query with pagination and sorting.

    Args:
        entity: The entity class to query.
        query: The SQLAlchemy SELECT query.
        pagination: Optional pagination settings.
        sort_info: Optional sorting information.
        has_multiple_entities: Optional bool

    Returns:
        Tuple of the list of entities and the total count.

    Raises:
        DatabaseQueryError: If the database query fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        sort_info = sort_info or SortDTO.default()
        session = self.get_session()
        sorted_query = self._apply_sorting(entity, query, sort_info)
        paginated_query = self._apply_pagination(sorted_query, pagination)
        result_set = await session.execute(paginated_query)
        if has_multiple_entities:
            # For multiple entities, fetchall returns list of Row objects
            raw_results = list(result_set.fetchall())
            # Convert to list[T] - each row contains entities of type T
            # Use tuple unpacking to access the first element
            results: list[T] = []
            for row in raw_results:
                if row:
                    # Row supports indexing and tuple unpacking
                    row_tuple = tuple(row)
                    if row_tuple:
                        first_entity = row_tuple[0]
                        # first_entity is T (entity type), verify it's an instance
                        if isinstance(first_entity, entity):
                            results.append(first_entity)
        else:
            # For single entity, scalars() returns list[T] directly
            results = list(result_set.scalars().all())
        count_query = select(func.count()).select_from(query.subquery())
        total_count_result = await session.execute(count_query)
        total_count = total_count_result.scalar_one()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        # Type: results is list[T] where T extends BaseEntity, total_count is int
        return results, total_count

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.create async

create(entity: T) -> T | None

Create a new entity in the database.

Parameters:

Name Type Description Default
entity T

The entity to create.

required

Returns:

Type Description
T | None

The created entity with updated attributes, preserving the original type.

Raises:

Type Description
InvalidEntityTypeError

If the entity type is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def create(self, entity: T) -> T | None:
    """Create a new entity in the database.

    Args:
        entity: The entity to create.

    Returns:
        The created entity with updated attributes, preserving the original type.

    Raises:
        InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
            expected_type="BaseEntity",
            actual_type=type(entity).__name__,
        )

    try:
        session = self.get_session()
        session.add(entity)
        await session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return entity

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.bulk_create async

bulk_create(entities: list[T]) -> list[T] | None

Creates multiple entities in a single database operation.

Parameters:

Name Type Description Default
entities list[T]

List of entities to create.

required

Returns:

Type Description
list[T] | None

List of created entities with updated attributes, preserving original types.

Raises:

Type Description
InvalidEntityTypeError

If any entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def bulk_create(self, entities: list[T]) -> list[T] | None:
    """Creates multiple entities in a single database operation.

    Args:
        entities: List of entities to create.

    Returns:
        List of created entities with updated attributes, preserving original types.

    Raises:
        InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not all(isinstance(entity, BaseEntity) for entity in entities):
        raise InvalidEntityTypeError(
            message="All entities must be BaseEntity subclasses",
            expected_type="BaseEntity",
            actual_type="mixed",
        )

    try:
        session = self.get_session()
        session.add_all(entities)
        await session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return entities

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.get_by_uuid async

get_by_uuid(
    entity_type: type[T], entity_uuid: UUID
) -> BaseEntity | None

Retrieve an entity by its UUID.

Parameters:

Name Type Description Default
entity_type type[T]

The type of entity to retrieve.

required
entity_uuid UUID

The UUID of the entity.

required

Returns:

Type Description
BaseEntity | None

The entity if found, None otherwise.

Raises:

Type Description
InvalidEntityTypeError

If the entity type is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def get_by_uuid(self, entity_type: type[T], entity_uuid: UUID) -> BaseEntity | None:
    """Retrieve an entity by its UUID.

    Args:
        entity_type: The type of entity to retrieve.
        entity_uuid: The UUID of the entity.

    Returns:
        The entity if found, None otherwise.

    Raises:
        InvalidEntityTypeError: If the entity type is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not issubclass(entity_type, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {entity_type.__name__}",
            expected_type="BaseEntity",
            actual_type=entity_type.__name__,
        )

    try:
        session = self.get_session()
        result = await session.get(entity_type, entity_uuid)
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        # result is T | None where T extends BaseEntity, compatible with BaseEntity | None
        # The type checker needs explicit type annotation to understand the relationship
        typed_result: BaseEntity | None = result
        return typed_result

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.delete async

delete(entity: BaseEntity) -> None

Delete an entity from the database.

Parameters:

Name Type Description Default
entity BaseEntity

The entity to delete.

required

Raises:

Type Description
InvalidEntityTypeError

If the entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def delete(self, entity: BaseEntity) -> None:
    """Delete an entity from the database.

    Args:
        entity: The entity to delete.

    Raises:
        InvalidEntityTypeError: If the entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(
            message=f"Expected BaseEntity subclass, got {type(entity).__name__}",
            expected_type="BaseEntity",
            actual_type=type(entity).__name__,
        )

    try:
        session = self.get_session()
        await session.delete(entity)
        await session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.bulk_delete async

bulk_delete(entities: list[T]) -> None

Delete multiple entities from the database.

Parameters:

Name Type Description Default
entities list[T]

List of entities to delete.

required

Raises:

Type Description
InvalidEntityTypeError

If any entity is not a valid SQLAlchemy model.

DatabaseQueryError

If the database operation fails.

DatabaseIntegrityError

If there's an integrity constraint violation.

DatabaseConstraintError

If there's a constraint violation.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def bulk_delete(self, entities: list[T]) -> None:
    """Delete multiple entities from the database.

    Args:
        entities: List of entities to delete.

    Raises:
        InvalidEntityTypeError: If any entity is not a valid SQLAlchemy model.
        DatabaseQueryError: If the database operation fails.
        DatabaseIntegrityError: If there's an integrity constraint violation.
        DatabaseConstraintError: If there's a constraint violation.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    if not all(isinstance(entity, BaseEntity) for entity in entities):
        raise InvalidEntityTypeError(
            message="All entities must be BaseEntity subclasses",
            expected_type="BaseEntity",
            actual_type="mixed",
        )

    try:
        session = self.get_session()
        for entity in entities:
            await session.delete(entity)
        await session.flush()
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.execute async

execute(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> Result[Any]

Execute a SQLAlchemy statement.

Parameters:

Name Type Description Default
statement Executable

The SQLAlchemy statement to execute.

required
params AnyExecuteParams | None

Optional parameters for the statement.

None

Returns:

Type Description
Result[Any]

The result of the execution.

Raises:

Type Description
DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
    """Execute a SQLAlchemy statement.

    Args:
        statement: The SQLAlchemy statement to execute.
        params: Optional parameters for the statement.

    Returns:
        The result of the execution.

    Raises:
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        session = self.get_session()
        result = await session.execute(statement, params or {})
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

archipy.adapters.starrocks.sqlalchemy.adapters.AsyncStarrocksSQLAlchemyAdapter.scalars async

scalars(
    statement: Executable,
    params: AnyExecuteParams | None = None,
) -> ScalarResult[Any]

Execute a SQLAlchemy statement and return scalar results.

Parameters:

Name Type Description Default
statement Executable

The SQLAlchemy statement to execute.

required
params AnyExecuteParams | None

Optional parameters for the statement.

None

Returns:

Type Description
ScalarResult[Any]

The scalar results of the execution.

Raises:

Type Description
DatabaseQueryError

If the database operation fails.

DatabaseTimeoutError

If the query times out.

DatabaseConnectionError

If there's a connection error.

DatabaseTransactionError

If there's a transaction error.

Source code in archipy/adapters/base/sqlalchemy/adapters.py
@override
async def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
    """Execute a SQLAlchemy statement and return scalar results.

    Args:
        statement: The SQLAlchemy statement to execute.
        params: Optional parameters for the statement.

    Returns:
        The scalar results of the execution.

    Raises:
        DatabaseQueryError: If the database operation fails.
        DatabaseTimeoutError: If the query times out.
        DatabaseConnectionError: If there's a connection error.
        DatabaseTransactionError: If there's a transaction error.
    """
    try:
        session = self.get_session()
        result = await session.scalars(statement, params or {})
    except Exception as e:
        self._handle_db_exception(e, self.session_manager._get_database_name())
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

options: show_root_toc_entry: false heading_level: 3