Skip to content

ScyllaDB

The scylladb adapter provides integration with ScyllaDB (and Apache Cassandra) for high-throughput, low-latency wide-column store operations.

Ports

Abstract port interface defining the ScyllaDB adapter contract.

Port interfaces for ScyllaDB adapter.

This module defines the abstract interfaces for ScyllaDB operations, providing a standardized contract for implementations to follow. It includes both synchronous and asynchronous port definitions.

archipy.adapters.scylladb.ports.ScyllaDBPort

Bases: ABC

Interface for synchronous ScyllaDB operations.

This interface defines the contract for ScyllaDB adapters, ensuring consistent implementation of database operations across different adapters. It covers CQL execution and CRUD operations.

Note: Connection is established automatically during adapter initialization. Use the context manager (with statement) for automatic cleanup.

Source code in archipy/adapters/scylladb/ports.py
class ScyllaDBPort(ABC):
    """Interface for synchronous ScyllaDB operations.

    This interface defines the contract for ScyllaDB adapters, ensuring consistent
    implementation of database operations across different adapters. It covers
    CQL execution and CRUD operations.

    Note: Connection is established automatically during adapter initialization.
    Use the context manager (with statement) for automatic cleanup.
    """

    @abstractmethod
    def execute(self, query: str, params: dict[str, Any] | None = None) -> Any:
        """Execute a CQL query.

        Args:
            query (str): The CQL query to execute.
            params (dict[str, Any] | None): Query parameters for parameterized queries.

        Returns:
            Any: The query result set.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def prepare(self, query: str) -> Any:
        """Prepare a CQL statement for repeated execution.

        Args:
            query (str): The CQL query to prepare.

        Returns:
            Any: The prepared statement object.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def execute_prepared(self, statement: Any, params: dict[str, Any] | None = None) -> Any:
        """Execute a prepared statement.

        Args:
            statement (Any): The prepared statement object.
            params (dict[str, Any] | None): Parameters to bind to the statement.

        Returns:
            Any: The query result set.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
        """Create a keyspace with simple replication strategy.

        Args:
            keyspace (str): The name of the keyspace to create.
            replication_factor (int): The replication factor. Defaults to 1.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def drop_keyspace(self, keyspace: str) -> None:
        """Drop a keyspace.

        Args:
            keyspace (str): The name of the keyspace to drop.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def use_keyspace(self, keyspace: str) -> None:
        """Switch to a different keyspace context.

        Args:
            keyspace (str): The name of the keyspace to use.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def create_table(self, table_schema: str) -> None:
        """Create a table using raw CQL DDL.

        Args:
            table_schema (str): The complete CREATE TABLE CQL statement.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def drop_table(self, table: str) -> None:
        """Drop a table.

        Args:
            table (str): The name of the table to drop.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def insert(self, table: str, data: dict[str, Any], ttl: int | None = None, if_not_exists: bool = False) -> None:
        """Insert data into a table.

        Args:
            table (str): The name of the table.
            data (dict[str, Any]): Key-value pairs representing column names and values.
            ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
            if_not_exists (bool): If True, use lightweight transaction (INSERT ... IF NOT EXISTS).
                              This prevents errors on duplicate primary keys but is slow

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def select(
        self,
        table: str,
        columns: list[str] | None = None,
        conditions: dict[str, Any] | None = None,
    ) -> list[Any]:
        """Select data from a table.

        Args:
            table (str): The name of the table.
            columns (list[str] | None): List of columns to select. If None, selects all (*).
            conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

        Returns:
            list[Any]: List of result rows.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def update(self, table: str, data: dict[str, Any], conditions: dict[str, Any], ttl: int | None = None) -> None:
        """Update data in a table.

        Args:
            table (str): The name of the table.
            data (dict[str, Any]): Key-value pairs for SET clause.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
            ttl (int | None): Time to live in seconds. If None, data persists indefinitely.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def delete(self, table: str, conditions: dict[str, Any]) -> None:
        """Delete data from a table.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def batch_execute(self, statements: list[str]) -> None:
        """Execute multiple CQL statements in a batch.

        Args:
            statements (list[str]): List of CQL statements to execute in batch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def get_session(self) -> Any:
        """Get the current session object.

        Returns:
            Any: The active session object.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def is_connected(self) -> bool:
        """Check if the adapter is connected to ScyllaDB cluster.

        Returns:
            bool: True if connected, False otherwise.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def health_check(self) -> dict[str, Any]:
        """Perform a health check on the ScyllaDB connection.

        Returns:
            dict[str, Any]: Health check result with status, latency_ms, and optional error.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def count(self, table: str, conditions: dict[str, Any] | None = None) -> int:
        """Count rows in a table.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

        Returns:
            int: The number of rows matching the conditions.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def exists(self, table: str, conditions: dict[str, Any]) -> bool:
        """Check if a row exists in a table.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

        Returns:
            bool: True if at least one row exists, False otherwise.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def get_pool_stats(self) -> dict[str, Any]:
        """Get connection pool statistics.

        Returns:
            dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.execute abstractmethod

execute(
    query: str, params: dict[str, Any] | None = None
) -> Any

Execute a CQL query.

Parameters:

Name Type Description Default
query str

The CQL query to execute.

required
params dict[str, Any] | None

Query parameters for parameterized queries.

None

Returns:

Name Type Description
Any Any

The query result set.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def execute(self, query: str, params: dict[str, Any] | None = None) -> Any:
    """Execute a CQL query.

    Args:
        query (str): The CQL query to execute.
        params (dict[str, Any] | None): Query parameters for parameterized queries.

    Returns:
        Any: The query result set.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.prepare abstractmethod

prepare(query: str) -> Any

Prepare a CQL statement for repeated execution.

Parameters:

Name Type Description Default
query str

The CQL query to prepare.

required

Returns:

Name Type Description
Any Any

The prepared statement object.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def prepare(self, query: str) -> Any:
    """Prepare a CQL statement for repeated execution.

    Args:
        query (str): The CQL query to prepare.

    Returns:
        Any: The prepared statement object.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.execute_prepared abstractmethod

execute_prepared(
    statement: Any, params: dict[str, Any] | None = None
) -> Any

Execute a prepared statement.

Parameters:

Name Type Description Default
statement Any

The prepared statement object.

required
params dict[str, Any] | None

Parameters to bind to the statement.

None

Returns:

Name Type Description
Any Any

The query result set.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def execute_prepared(self, statement: Any, params: dict[str, Any] | None = None) -> Any:
    """Execute a prepared statement.

    Args:
        statement (Any): The prepared statement object.
        params (dict[str, Any] | None): Parameters to bind to the statement.

    Returns:
        Any: The query result set.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.create_keyspace abstractmethod

create_keyspace(
    keyspace: str, replication_factor: int = 1
) -> None

Create a keyspace with simple replication strategy.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to create.

required
replication_factor int

The replication factor. Defaults to 1.

1

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
    """Create a keyspace with simple replication strategy.

    Args:
        keyspace (str): The name of the keyspace to create.
        replication_factor (int): The replication factor. Defaults to 1.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.drop_keyspace abstractmethod

drop_keyspace(keyspace: str) -> None

Drop a keyspace.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to drop.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def drop_keyspace(self, keyspace: str) -> None:
    """Drop a keyspace.

    Args:
        keyspace (str): The name of the keyspace to drop.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.use_keyspace abstractmethod

use_keyspace(keyspace: str) -> None

Switch to a different keyspace context.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to use.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def use_keyspace(self, keyspace: str) -> None:
    """Switch to a different keyspace context.

    Args:
        keyspace (str): The name of the keyspace to use.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.create_table abstractmethod

create_table(table_schema: str) -> None

Create a table using raw CQL DDL.

Parameters:

Name Type Description Default
table_schema str

The complete CREATE TABLE CQL statement.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def create_table(self, table_schema: str) -> None:
    """Create a table using raw CQL DDL.

    Args:
        table_schema (str): The complete CREATE TABLE CQL statement.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.drop_table abstractmethod

drop_table(table: str) -> None

Drop a table.

Parameters:

Name Type Description Default
table str

The name of the table to drop.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def drop_table(self, table: str) -> None:
    """Drop a table.

    Args:
        table (str): The name of the table to drop.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.insert abstractmethod

insert(
    table: str,
    data: dict[str, Any],
    ttl: int | None = None,
    if_not_exists: bool = False,
) -> None

Insert data into a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
data dict[str, Any]

Key-value pairs representing column names and values.

required
ttl int | None

Time to live in seconds. If None, data persists indefinitely.

None
if_not_exists bool

If True, use lightweight transaction (INSERT ... IF NOT EXISTS). This prevents errors on duplicate primary keys but is slow

False

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def insert(self, table: str, data: dict[str, Any], ttl: int | None = None, if_not_exists: bool = False) -> None:
    """Insert data into a table.

    Args:
        table (str): The name of the table.
        data (dict[str, Any]): Key-value pairs representing column names and values.
        ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
        if_not_exists (bool): If True, use lightweight transaction (INSERT ... IF NOT EXISTS).
                          This prevents errors on duplicate primary keys but is slow

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.select abstractmethod

select(
    table: str,
    columns: list[str] | None = None,
    conditions: dict[str, Any] | None = None,
) -> list[Any]

Select data from a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
columns list[str] | None

List of columns to select. If None, selects all (*).

None
conditions dict[str, Any] | None

WHERE clause conditions as key-value pairs.

None

Returns:

Type Description
list[Any]

list[Any]: List of result rows.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def select(
    self,
    table: str,
    columns: list[str] | None = None,
    conditions: dict[str, Any] | None = None,
) -> list[Any]:
    """Select data from a table.

    Args:
        table (str): The name of the table.
        columns (list[str] | None): List of columns to select. If None, selects all (*).
        conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

    Returns:
        list[Any]: List of result rows.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.update abstractmethod

update(
    table: str,
    data: dict[str, Any],
    conditions: dict[str, Any],
    ttl: int | None = None,
) -> None

Update data in a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
data dict[str, Any]

Key-value pairs for SET clause.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required
ttl int | None

Time to live in seconds. If None, data persists indefinitely.

None

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def update(self, table: str, data: dict[str, Any], conditions: dict[str, Any], ttl: int | None = None) -> None:
    """Update data in a table.

    Args:
        table (str): The name of the table.
        data (dict[str, Any]): Key-value pairs for SET clause.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
        ttl (int | None): Time to live in seconds. If None, data persists indefinitely.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.delete abstractmethod

delete(table: str, conditions: dict[str, Any]) -> None

Delete data from a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def delete(self, table: str, conditions: dict[str, Any]) -> None:
    """Delete data from a table.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.batch_execute abstractmethod

batch_execute(statements: list[str]) -> None

Execute multiple CQL statements in a batch.

Parameters:

Name Type Description Default
statements list[str]

List of CQL statements to execute in batch.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def batch_execute(self, statements: list[str]) -> None:
    """Execute multiple CQL statements in a batch.

    Args:
        statements (list[str]): List of CQL statements to execute in batch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.get_session abstractmethod

get_session() -> Any

Get the current session object.

Returns:

Name Type Description
Any Any

The active session object.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def get_session(self) -> Any:
    """Get the current session object.

    Returns:
        Any: The active session object.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.is_connected abstractmethod

is_connected() -> bool

Check if the adapter is connected to ScyllaDB cluster.

Returns:

Name Type Description
bool bool

True if connected, False otherwise.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def is_connected(self) -> bool:
    """Check if the adapter is connected to ScyllaDB cluster.

    Returns:
        bool: True if connected, False otherwise.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.health_check abstractmethod

health_check() -> dict[str, Any]

Perform a health check on the ScyllaDB connection.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Health check result with status, latency_ms, and optional error.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def health_check(self) -> dict[str, Any]:
    """Perform a health check on the ScyllaDB connection.

    Returns:
        dict[str, Any]: Health check result with status, latency_ms, and optional error.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.count abstractmethod

count(
    table: str, conditions: dict[str, Any] | None = None
) -> int

Count rows in a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any] | None

WHERE clause conditions as key-value pairs.

None

Returns:

Name Type Description
int int

The number of rows matching the conditions.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def count(self, table: str, conditions: dict[str, Any] | None = None) -> int:
    """Count rows in a table.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

    Returns:
        int: The number of rows matching the conditions.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.exists abstractmethod

exists(table: str, conditions: dict[str, Any]) -> bool

Check if a row exists in a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required

Returns:

Name Type Description
bool bool

True if at least one row exists, False otherwise.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def exists(self, table: str, conditions: dict[str, Any]) -> bool:
    """Check if a row exists in a table.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

    Returns:
        bool: True if at least one row exists, False otherwise.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.ScyllaDBPort.get_pool_stats abstractmethod

get_pool_stats() -> dict[str, Any]

Get connection pool statistics.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
def get_pool_stats(self) -> dict[str, Any]:
    """Get connection pool statistics.

    Returns:
        dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort

Bases: ABC

Interface for asynchronous ScyllaDB operations.

This interface defines the contract for async ScyllaDB adapters, ensuring consistent implementation of database operations. All methods are async variants of the synchronous port.

Note: Connection is established automatically during adapter initialization. Use the async context manager (async with statement) for automatic cleanup.

Source code in archipy/adapters/scylladb/ports.py
class AsyncScyllaDBPort(ABC):
    """Interface for asynchronous ScyllaDB operations.

    This interface defines the contract for async ScyllaDB adapters, ensuring consistent
    implementation of database operations. All methods are async variants of the
    synchronous port.

    Note: Connection is established automatically during adapter initialization.
    Use the async context manager (async with statement) for automatic cleanup.
    """

    @abstractmethod
    async def execute(self, query: str, params: dict[str, Any] | None = None) -> Any:
        """Execute a CQL query asynchronously.

        Args:
            query (str): The CQL query to execute.
            params (dict[str, Any] | None): Query parameters for parameterized queries.

        Returns:
            Any: The query result set.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def prepare(self, query: str) -> Any:
        """Prepare a CQL statement asynchronously.

        Args:
            query (str): The CQL query to prepare.

        Returns:
            Any: The prepared statement object.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def execute_prepared(self, statement: Any, params: dict[str, Any] | None = None) -> Any:
        """Execute a prepared statement asynchronously.

        Args:
            statement (Any): The prepared statement object.
            params (dict[str, Any] | None): Parameters to bind to the statement.

        Returns:
            Any: The query result set.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
        """Create a keyspace asynchronously.

        Args:
            keyspace (str): The name of the keyspace to create.
            replication_factor (int): The replication factor. Defaults to 1.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def drop_keyspace(self, keyspace: str) -> None:
        """Drop a keyspace asynchronously.

        Args:
            keyspace (str): The name of the keyspace to drop.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def use_keyspace(self, keyspace: str) -> None:
        """Switch to a different keyspace context asynchronously.

        Args:
            keyspace (str): The name of the keyspace to use.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def create_table(self, table_schema: str) -> None:
        """Create a table asynchronously.

        Args:
            table_schema (str): The complete CREATE TABLE CQL statement.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def drop_table(self, table: str) -> None:
        """Drop a table asynchronously.

        Args:
            table (str): The name of the table to drop.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def insert(
        self,
        table: str,
        data: dict[str, Any],
        ttl: int | None = None,
        if_not_exists: bool = False,
    ) -> None:
        """Insert data into a table asynchronously.

        Args:
            table (str): The name of the table.
            data (dict[str, Any]): Key-value pairs representing column names and values.
            ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
            if_not_exists (bool): If True, use lightweight transaction (INSERT ... IF NOT EXISTS).
                              This prevents errors on duplicate primary keys but is slow

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def select(
        self,
        table: str,
        columns: list[str] | None = None,
        conditions: dict[str, Any] | None = None,
    ) -> list[Any]:
        """Select data from a table asynchronously.

        Args:
            table (str): The name of the table.
            columns (list[str] | None): List of columns to select. If None, selects all (*).
            conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

        Returns:
            list[Any]: List of result rows.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def update(
        self,
        table: str,
        data: dict[str, Any],
        conditions: dict[str, Any],
        ttl: int | None = None,
    ) -> None:
        """Update data in a table asynchronously.

        Args:
            table (str): The name of the table.
            data (dict[str, Any]): Key-value pairs for SET clause.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
            ttl (int | None): Time to live in seconds. If None, data persists indefinitely.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def delete(self, table: str, conditions: dict[str, Any]) -> None:
        """Delete data from a table asynchronously.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def batch_execute(self, statements: list[str]) -> None:
        """Execute multiple CQL statements in a batch asynchronously.

        Args:
            statements (list[str]): List of CQL statements to execute in batch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def get_session(self) -> Any:
        """Get the current session object asynchronously.

        Returns:
            Any: The active session object.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def is_connected(self) -> bool:
        """Check if the adapter is connected to ScyllaDB cluster.

        Returns:
            bool: True if connected, False otherwise.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def health_check(self) -> dict[str, Any]:
        """Perform a health check on the ScyllaDB connection.

        Returns:
            dict[str, Any]: Health check result with status, latency_ms, and optional error.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def count(self, table: str, conditions: dict[str, Any] | None = None) -> int:
        """Count rows in a table asynchronously.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

        Returns:
            int: The number of rows matching the conditions.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def exists(self, table: str, conditions: dict[str, Any]) -> bool:
        """Check if a row exists in a table asynchronously.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

        Returns:
            bool: True if at least one row exists, False otherwise.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def get_pool_stats(self) -> dict[str, Any]:
        """Get connection pool statistics asynchronously.

        Returns:
            dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.execute abstractmethod async

execute(
    query: str, params: dict[str, Any] | None = None
) -> Any

Execute a CQL query asynchronously.

Parameters:

Name Type Description Default
query str

The CQL query to execute.

required
params dict[str, Any] | None

Query parameters for parameterized queries.

None

Returns:

Name Type Description
Any Any

The query result set.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def execute(self, query: str, params: dict[str, Any] | None = None) -> Any:
    """Execute a CQL query asynchronously.

    Args:
        query (str): The CQL query to execute.
        params (dict[str, Any] | None): Query parameters for parameterized queries.

    Returns:
        Any: The query result set.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.prepare abstractmethod async

prepare(query: str) -> Any

Prepare a CQL statement asynchronously.

Parameters:

Name Type Description Default
query str

The CQL query to prepare.

required

Returns:

Name Type Description
Any Any

The prepared statement object.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def prepare(self, query: str) -> Any:
    """Prepare a CQL statement asynchronously.

    Args:
        query (str): The CQL query to prepare.

    Returns:
        Any: The prepared statement object.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.execute_prepared abstractmethod async

execute_prepared(
    statement: Any, params: dict[str, Any] | None = None
) -> Any

Execute a prepared statement asynchronously.

Parameters:

Name Type Description Default
statement Any

The prepared statement object.

required
params dict[str, Any] | None

Parameters to bind to the statement.

None

Returns:

Name Type Description
Any Any

The query result set.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def execute_prepared(self, statement: Any, params: dict[str, Any] | None = None) -> Any:
    """Execute a prepared statement asynchronously.

    Args:
        statement (Any): The prepared statement object.
        params (dict[str, Any] | None): Parameters to bind to the statement.

    Returns:
        Any: The query result set.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.create_keyspace abstractmethod async

create_keyspace(
    keyspace: str, replication_factor: int = 1
) -> None

Create a keyspace asynchronously.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to create.

required
replication_factor int

The replication factor. Defaults to 1.

1

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
    """Create a keyspace asynchronously.

    Args:
        keyspace (str): The name of the keyspace to create.
        replication_factor (int): The replication factor. Defaults to 1.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.drop_keyspace abstractmethod async

drop_keyspace(keyspace: str) -> None

Drop a keyspace asynchronously.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to drop.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def drop_keyspace(self, keyspace: str) -> None:
    """Drop a keyspace asynchronously.

    Args:
        keyspace (str): The name of the keyspace to drop.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.use_keyspace abstractmethod async

use_keyspace(keyspace: str) -> None

Switch to a different keyspace context asynchronously.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to use.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def use_keyspace(self, keyspace: str) -> None:
    """Switch to a different keyspace context asynchronously.

    Args:
        keyspace (str): The name of the keyspace to use.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.create_table abstractmethod async

create_table(table_schema: str) -> None

Create a table asynchronously.

Parameters:

Name Type Description Default
table_schema str

The complete CREATE TABLE CQL statement.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def create_table(self, table_schema: str) -> None:
    """Create a table asynchronously.

    Args:
        table_schema (str): The complete CREATE TABLE CQL statement.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.drop_table abstractmethod async

drop_table(table: str) -> None

Drop a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table to drop.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def drop_table(self, table: str) -> None:
    """Drop a table asynchronously.

    Args:
        table (str): The name of the table to drop.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.insert abstractmethod async

insert(
    table: str,
    data: dict[str, Any],
    ttl: int | None = None,
    if_not_exists: bool = False,
) -> None

Insert data into a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
data dict[str, Any]

Key-value pairs representing column names and values.

required
ttl int | None

Time to live in seconds. If None, data persists indefinitely.

None
if_not_exists bool

If True, use lightweight transaction (INSERT ... IF NOT EXISTS). This prevents errors on duplicate primary keys but is slow

False

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def insert(
    self,
    table: str,
    data: dict[str, Any],
    ttl: int | None = None,
    if_not_exists: bool = False,
) -> None:
    """Insert data into a table asynchronously.

    Args:
        table (str): The name of the table.
        data (dict[str, Any]): Key-value pairs representing column names and values.
        ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
        if_not_exists (bool): If True, use lightweight transaction (INSERT ... IF NOT EXISTS).
                          This prevents errors on duplicate primary keys but is slow

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.select abstractmethod async

select(
    table: str,
    columns: list[str] | None = None,
    conditions: dict[str, Any] | None = None,
) -> list[Any]

Select data from a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
columns list[str] | None

List of columns to select. If None, selects all (*).

None
conditions dict[str, Any] | None

WHERE clause conditions as key-value pairs.

None

Returns:

Type Description
list[Any]

list[Any]: List of result rows.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def select(
    self,
    table: str,
    columns: list[str] | None = None,
    conditions: dict[str, Any] | None = None,
) -> list[Any]:
    """Select data from a table asynchronously.

    Args:
        table (str): The name of the table.
        columns (list[str] | None): List of columns to select. If None, selects all (*).
        conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

    Returns:
        list[Any]: List of result rows.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.update abstractmethod async

update(
    table: str,
    data: dict[str, Any],
    conditions: dict[str, Any],
    ttl: int | None = None,
) -> None

Update data in a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
data dict[str, Any]

Key-value pairs for SET clause.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required
ttl int | None

Time to live in seconds. If None, data persists indefinitely.

None

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def update(
    self,
    table: str,
    data: dict[str, Any],
    conditions: dict[str, Any],
    ttl: int | None = None,
) -> None:
    """Update data in a table asynchronously.

    Args:
        table (str): The name of the table.
        data (dict[str, Any]): Key-value pairs for SET clause.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
        ttl (int | None): Time to live in seconds. If None, data persists indefinitely.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.delete abstractmethod async

delete(table: str, conditions: dict[str, Any]) -> None

Delete data from a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def delete(self, table: str, conditions: dict[str, Any]) -> None:
    """Delete data from a table asynchronously.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.batch_execute abstractmethod async

batch_execute(statements: list[str]) -> None

Execute multiple CQL statements in a batch asynchronously.

Parameters:

Name Type Description Default
statements list[str]

List of CQL statements to execute in batch.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def batch_execute(self, statements: list[str]) -> None:
    """Execute multiple CQL statements in a batch asynchronously.

    Args:
        statements (list[str]): List of CQL statements to execute in batch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.get_session abstractmethod async

get_session() -> Any

Get the current session object asynchronously.

Returns:

Name Type Description
Any Any

The active session object.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def get_session(self) -> Any:
    """Get the current session object asynchronously.

    Returns:
        Any: The active session object.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.is_connected abstractmethod async

is_connected() -> bool

Check if the adapter is connected to ScyllaDB cluster.

Returns:

Name Type Description
bool bool

True if connected, False otherwise.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def is_connected(self) -> bool:
    """Check if the adapter is connected to ScyllaDB cluster.

    Returns:
        bool: True if connected, False otherwise.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.health_check abstractmethod async

health_check() -> dict[str, Any]

Perform a health check on the ScyllaDB connection.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Health check result with status, latency_ms, and optional error.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def health_check(self) -> dict[str, Any]:
    """Perform a health check on the ScyllaDB connection.

    Returns:
        dict[str, Any]: Health check result with status, latency_ms, and optional error.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.count abstractmethod async

count(
    table: str, conditions: dict[str, Any] | None = None
) -> int

Count rows in a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any] | None

WHERE clause conditions as key-value pairs.

None

Returns:

Name Type Description
int int

The number of rows matching the conditions.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def count(self, table: str, conditions: dict[str, Any] | None = None) -> int:
    """Count rows in a table asynchronously.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

    Returns:
        int: The number of rows matching the conditions.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.exists abstractmethod async

exists(table: str, conditions: dict[str, Any]) -> bool

Check if a row exists in a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required

Returns:

Name Type Description
bool bool

True if at least one row exists, False otherwise.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def exists(self, table: str, conditions: dict[str, Any]) -> bool:
    """Check if a row exists in a table asynchronously.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

    Returns:
        bool: True if at least one row exists, False otherwise.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.scylladb.ports.AsyncScyllaDBPort.get_pool_stats abstractmethod async

get_pool_stats() -> dict[str, Any]

Get connection pool statistics asynchronously.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/scylladb/ports.py
@abstractmethod
async def get_pool_stats(self) -> dict[str, Any]:
    """Get connection pool statistics asynchronously.

    Returns:
        dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

options: show_root_toc_entry: false heading_level: 3

Adapters

Concrete ScyllaDB adapter implementing session management and query execution for ScyllaDB/Cassandra clusters.

ScyllaDB adapter implementations for sync and async operations.

This module provides concrete implementations of the ScyllaDB port interfaces, supporting both synchronous and asynchronous database operations.

archipy.adapters.scylladb.adapters.logger module-attribute

logger = getLogger(__name__)

archipy.adapters.scylladb.adapters.ScyllaDBExceptionHandlerMixin

Mixin class to handle ScyllaDB/Cassandra exceptions in a consistent way.

Source code in archipy/adapters/scylladb/adapters.py
class ScyllaDBExceptionHandlerMixin:
    """Mixin class to handle ScyllaDB/Cassandra exceptions in a consistent way."""

    @classmethod
    def _handle_scylladb_exception(cls, exception: Exception, operation: str) -> NoReturn:
        """Handle ScyllaDB/Cassandra exceptions and map them to appropriate application errors.

        Args:
            exception: The original exception
            operation: The name of the operation that failed

        Raises:
            Various application-specific errors based on the exception type/content
        """
        error_msg = str(exception).lower()

        if "unconfigured table" in error_msg:
            table_name = operation or "unknown"
            raise NotFoundError(
                resource_type="table",
                additional_data={"table_name": table_name},
            ) from exception

        try:
            from cassandra import (
                AuthenticationFailed,
                InvalidRequest,
                OperationTimedOut,
                Unavailable,
            )
            from cassandra.cluster import NoHostAvailable

            if isinstance(exception, Unavailable) or "unavailable" in error_msg:
                raise ServiceUnavailableError(service="ScyllaDB") from exception

            if isinstance(exception, OperationTimedOut) or "timeout" in error_msg:
                raise ConnectionTimeoutError(service="ScyllaDB", timeout=None) from exception

            if isinstance(exception, AuthenticationFailed) or "authentication" in error_msg:
                raise InvalidCredentialsError() from exception

            if isinstance(exception, InvalidRequest):
                raise InvalidArgumentError(argument_name=operation) from exception

            if "protocol" in error_msg:
                raise ConfigurationError(operation="scylladb", reason="Protocol error") from exception

            # NoHostAvailable
            if isinstance(exception, NoHostAvailable) or "no host available" in error_msg:
                raise ServiceUnavailableError(service="ScyllaDB") from exception

        except ImportError:
            pass

        if "network" in error_msg or "connection" in error_msg or "socket" in error_msg:
            raise NetworkError(service="ScyllaDB") from exception

        if "configuration" in error_msg or ("config" in error_msg and "unconfigured" not in error_msg):
            raise ConfigurationError(operation="scylladb", reason="Configuration error") from exception

        if "connection" in operation.lower() or "connect" in operation.lower():
            raise DatabaseConnectionError(database="scylladb") from exception
        raise DatabaseQueryError(database="scylladb") from exception

archipy.adapters.scylladb.adapters.ScyllaDBAdapter

Bases: ScyllaDBPort, ScyllaDBExceptionHandlerMixin

Synchronous adapter for ScyllaDB operations.

This adapter implements the ScyllaDBPort interface to provide a consistent way to interact with ScyllaDB, abstracting the underlying driver implementation. It supports connection pooling, prepared statements, and batch operations.

Parameters:

Name Type Description Default
config ScyllaDBConfig | None

Configuration settings for ScyllaDB. If None, retrieves from global config. Defaults to None.

None
Source code in archipy/adapters/scylladb/adapters.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
class ScyllaDBAdapter(ScyllaDBPort, ScyllaDBExceptionHandlerMixin):
    """Synchronous adapter for ScyllaDB operations.

    This adapter implements the ScyllaDBPort interface to provide a consistent
    way to interact with ScyllaDB, abstracting the underlying driver implementation.
    It supports connection pooling, prepared statements, and batch operations.

    Args:
        config (ScyllaDBConfig | None): Configuration settings for ScyllaDB.
            If None, retrieves from global config. Defaults to None.
    """

    def __init__(self, config: ScyllaDBConfig | None = None) -> None:
        """Initialize the ScyllaDBAdapter with configuration settings.

        Args:
            config (ScyllaDBConfig | None): Configuration settings for ScyllaDB.
                If None, retrieves from global config. Defaults to None.
        """
        if config is not None:
            self.config = config
        else:
            try:
                self.config = BaseConfig.global_config().SCYLLADB
            except AttributeError:
                # SCYLLADB not configured, use defaults
                self.config = ScyllaDBConfig()
        self.__post_init__()
        try:
            self._cluster = self._create_cluster()
            self._session = self._cluster.connect()
            self._session.default_timeout = self.config.REQUEST_TIMEOUT
            if self.config.KEYSPACE:
                self._session.set_keyspace(self.config.KEYSPACE)

        except Exception as e:
            self._handle_scylladb_exception(e, "connect")
            raise

    def _get_consistency_level(self) -> int:
        """Get ConsistencyLevel enum from config string.

        Returns:
            int: The consistency level enum value.
        """
        consistency_map: dict[str, int] = {
            "ONE": ConsistencyLevel.ONE,
            "TWO": ConsistencyLevel.TWO,
            "THREE": ConsistencyLevel.THREE,
            "QUORUM": ConsistencyLevel.QUORUM,
            "ALL": ConsistencyLevel.ALL,
            "LOCAL_QUORUM": ConsistencyLevel.LOCAL_QUORUM,
            "EACH_QUORUM": ConsistencyLevel.EACH_QUORUM,
            "LOCAL_ONE": ConsistencyLevel.LOCAL_ONE,
            "ANY": ConsistencyLevel.ANY,
        }
        # get() returns int | None, but we provide a default
        consistency = consistency_map.get(self.config.CONSISTENCY_LEVEL.upper())
        if consistency is None:
            return ConsistencyLevel.ONE
        return consistency

    def _create_cluster(self) -> Any:
        """Create and configure the Cluster instance.

        Returns:
            Cluster: Configured cluster instance.
        """
        auth_provider = None
        if self.config.USERNAME and self.config.PASSWORD:
            auth_provider = PlainTextAuthProvider(
                username=self.config.USERNAME,
                password=self.config.PASSWORD.get_secret_value(),
            )

        # Configure load balancing policy with optional datacenter awareness
        if self.config.LOCAL_DC:
            base_policy = DCAwareRoundRobinPolicy(local_dc=self.config.LOCAL_DC)
            load_balancing_policy = TokenAwarePolicy(base_policy)
        else:
            load_balancing_policy = TokenAwarePolicy(RoundRobinPolicy())

        if self.config.RETRY_POLICY == "FALLTHROUGH":
            retry_policy = FallthroughRetryPolicy()
        else:  # EXPONENTIAL_BACKOFF (default)
            retry_policy = ExponentialBackoffRetryPolicy(
                max_num_retries=self.config.RETRY_MAX_NUM_RETRIES,
                min_interval=self.config.RETRY_MIN_INTERVAL,
                max_interval=self.config.RETRY_MAX_INTERVAL,
            )
        # Shard awareness disabled for Docker/NAT environments
        shard_aware_options = None
        if self.config.DISABLE_SHARD_AWARENESS:
            shard_aware_options = {"disable": True}

        # Address translation for Docker/NAT environments where gossip-discovered
        # internal container IPs are unreachable from the host
        address_translator = None
        if self.config.ADDRESS_TRANSLATION_ENABLED:
            address_translator = _FixedAddressTranslator(self.config.CONTACT_POINTS[0])

        # Cluster is from cassandra.cluster, properly typed
        cluster = Cluster(
            contact_points=self.config.CONTACT_POINTS,
            port=self.config.PORT,
            auth_provider=auth_provider,
            protocol_version=self.config.PROTOCOL_VERSION,
            compression=bool(self.config.COMPRESSION),
            connect_timeout=self.config.CONNECT_TIMEOUT,
            load_balancing_policy=load_balancing_policy,
            default_retry_policy=retry_policy,
            shard_aware_options=shard_aware_options,
            address_translator=address_translator,
        )

        # Configure connection pool settings
        if cluster.profile_manager is not None:
            profile = cluster.profile_manager.default
            profile.request_timeout = self.config.REQUEST_TIMEOUT
            # Configure connection pool limits per host
            profile.max_connections_per_host = self.config.MAX_CONNECTIONS_PER_HOST
            profile.min_connections_per_host = self.config.MIN_CONNECTIONS_PER_HOST
            profile.core_connections_per_host = self.config.CORE_CONNECTIONS_PER_HOST

        # Set pool configuration
        cluster.connection_class.max_requests_per_connection = self.config.MAX_REQUESTS_PER_CONNECTION

        return cluster

    @override
    def execute(self, query: str, params: dict[str, Any] | tuple | list | None = None) -> Any:
        """Execute a CQL query.

        Args:
            query (str): The CQL query to execute.
            params (dict[str, Any] | tuple | list | None): Query parameters for parameterized queries.

        Returns:
            Any: The query result set.
        """
        session = self.get_session()
        try:
            if params:
                result = session.execute(query, params)
            else:
                result = session.execute(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "execute")
            raise
        else:
            return result

    @override
    def prepare(self, query: str) -> PreparedStatement:
        """Prepare a CQL statement for repeated execution.

        Args:
            query (str): The CQL query to prepare.

        Returns:
            PreparedStatement: The prepared statement object.
        """
        session = self.get_session()
        try:
            if self.config.ENABLE_PREPARED_STATEMENT_CACHE:
                # Use cached version if available - call the cached method
                cached_method: Any = getattr(self, "_prepare_cached", None)
                if cached_method is not None:
                    return cached_method(query)
            # Direct prepare without cache
            prepared = session.prepare(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "prepare")
            raise
        else:
            return prepared

    def __post_init__(self) -> None:
        """Post-initialization hook to apply cache decorator if enabled."""
        if self.config.ENABLE_PREPARED_STATEMENT_CACHE:
            # Create a method to cache
            def _prepare_internal(query: str) -> PreparedStatement:
                """Internal cached method to prepare a CQL statement."""
                session = self.get_session()
                try:
                    prepared = session.prepare(query)
                except Exception as e:
                    self._handle_scylladb_exception(e, "prepare")
                    raise
                else:
                    return prepared

            # Apply cache decorator
            cached_prepare = ttl_cache_decorator(
                ttl_seconds=self.config.PREPARED_STATEMENT_CACHE_TTL_SECONDS,
                maxsize=self.config.PREPARED_STATEMENT_CACHE_SIZE,
            )(_prepare_internal)
            # Store the cached version using setattr for dynamic attribute
            self._prepare_cached = cached_prepare

    @override
    def execute_prepared(self, statement: PreparedStatement, params: dict[str, Any] | None = None) -> Any:
        """Execute a prepared statement.

        Args:
            statement (PreparedStatement): The prepared statement object.
            params (dict[str, Any] | None): Parameters to bind to the statement.

        Returns:
            Any: The query result set.
        """
        session = self.get_session()
        try:
            if params:
                result = session.execute(statement, params)
            else:
                result = session.execute(statement)
        except Exception as e:
            self._handle_scylladb_exception(e, "execute_prepared")
            raise
        else:
            return result

    @override
    def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
        """Create a keyspace with simple replication strategy.

        Args:
            keyspace (str): The name of the keyspace to create.
            replication_factor (int): The replication factor. Defaults to 1.
        """
        # Use configured replication strategy
        if self.config.REPLICATION_STRATEGY == "NetworkTopologyStrategy" and self.config.REPLICATION_CONFIG:
            # Build replication config for NetworkTopologyStrategy
            replication_parts = ["'class': 'NetworkTopologyStrategy'"]
            for dc, rf in self.config.REPLICATION_CONFIG.items():
                replication_parts.append(f"'{dc}': {rf}")
            replication_str = ", ".join(replication_parts)
            query = f"""
                CREATE KEYSPACE IF NOT EXISTS {keyspace}
                WITH replication = {{{replication_str}}}
            """
        else:
            # Use SimpleStrategy (default)
            query = f"""
                CREATE KEYSPACE IF NOT EXISTS {keyspace}
                WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': {replication_factor}}}
            """
        try:
            self.execute(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "create_keyspace")
            raise

    @override
    def drop_keyspace(self, keyspace: str) -> None:
        """Drop a keyspace.

        Args:
            keyspace (str): The name of the keyspace to drop.
        """
        query = f"DROP KEYSPACE IF EXISTS {keyspace}"
        try:
            self.execute(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "drop_keyspace")
            raise

    @override
    def use_keyspace(self, keyspace: str) -> None:
        """Switch to a different keyspace context.

        Args:
            keyspace (str): The name of the keyspace to use.
        """
        session = self.get_session()
        try:
            session.set_keyspace(keyspace)
        except Exception as e:
            self._handle_scylladb_exception(e, "use_keyspace")
            raise

    @override
    def create_table(self, table_schema: str) -> None:
        """Create a table using raw CQL DDL.

        Args:
            table_schema (str): The complete CREATE TABLE CQL statement.
        """
        try:
            self.execute(table_schema)
        except Exception as e:
            self._handle_scylladb_exception(e, "create_table")
            raise

    @override
    def drop_table(self, table: str) -> None:
        """Drop a table.

        Args:
            table (str): The name of the table to drop.
        """
        query = f"DROP TABLE IF EXISTS {table}"
        try:
            self.execute(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "drop_table")
            raise

    @override
    def insert(self, table: str, data: dict[str, Any], ttl: int | None = None, if_not_exists: bool = False) -> None:
        """Insert data into a table.

        Args:
            table (str): The name of the table.
            data (dict[str, Any]): Key-value pairs representing column names and values.
            ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
            if_not_exists (bool): If True, use lightweight transaction (INSERT ... IF NOT EXISTS).
                              This prevents errors on duplicate primary keys but is slow
        """
        columns = ", ".join(data.keys())
        placeholders = ", ".join(["%s" for _ in data])
        query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"

        if if_not_exists:
            query += " IF NOT EXISTS"

        if ttl is not None:
            query += f" USING TTL {ttl}"

        try:
            self.execute(query, tuple(data.values()))
        except Exception as e:
            self._handle_scylladb_exception(e, "insert")
            raise

    @override
    def select(
        self,
        table: str,
        columns: list[str] | None = None,
        conditions: dict[str, Any] | None = None,
    ) -> list[Any]:
        """Select data from a table.

        Args:
            table (str): The name of the table.
            columns (list[str] | None): List of columns to select. If None, selects all (*).
            conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

        Returns:
            list[Any]: List of result rows.
        """
        cols = ", ".join(columns) if columns else "*"
        query = f"SELECT {cols} FROM {table}"

        params = None
        if conditions:
            where_clause = " AND ".join([f"{key} = %s" for key in conditions])
            query += f" WHERE {where_clause}"
            params = tuple(conditions.values())

        try:
            result = self.execute(query, params)
            return list(result)
        except Exception as e:
            self._handle_scylladb_exception(e, "select")
            raise

    @override
    def update(self, table: str, data: dict[str, Any], conditions: dict[str, Any], ttl: int | None = None) -> None:
        """Update data in a table.

        Args:
            table (str): The name of the table.
            data (dict[str, Any]): Key-value pairs for SET clause.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
            ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
        """
        set_clause = ", ".join([f"{key} = %s" for key in data])
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query = f"UPDATE {table}"

        if ttl is not None:
            query += f" USING TTL {ttl}"

        query += f" SET {set_clause} WHERE {where_clause}"

        # Combine params: SET values first, then WHERE values
        params = tuple(data.values()) + tuple(conditions.values())

        try:
            self.execute(query, params)
        except Exception as e:
            self._handle_scylladb_exception(e, "update")
            raise

    @override
    def delete(self, table: str, conditions: dict[str, Any]) -> None:
        """Delete data from a table.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
        """
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query = f"DELETE FROM {table} WHERE {where_clause}"

        try:
            self.execute(query, tuple(conditions.values()))
        except Exception as e:
            self._handle_scylladb_exception(e, "delete")
            raise

    @override
    def batch_execute(self, statements: list[str]) -> None:
        """Execute multiple CQL statements in a batch.

        Args:
            statements (list[str]): List of CQL statements to execute in batch.
        """
        session = self.get_session()
        batch = BatchStatement(consistency_level=self._get_consistency_level())

        try:
            for stmt in statements:
                batch.add(SimpleStatement(stmt))

            session.execute(batch)
        except Exception as e:
            self._handle_scylladb_exception(e, "batch_execute")
            raise

    @override
    def get_session(self) -> Any:
        """Get the current session object.

        Returns:
            Any: The active session object.
        """
        return self._session

    @override
    def is_connected(self) -> bool:
        """Check if the adapter is connected to ScyllaDB cluster.

        Returns:
            bool: True if connected, False otherwise.
        """
        return self._session is not None and not self._session.is_shutdown

    def close(self) -> None:
        """Close the connection and clean up resources.

        This method should be called when the adapter is no longer needed
        to properly release resources.
        """
        try:
            if hasattr(self, "_session") and self._session is not None:
                self._session.shutdown()
            if hasattr(self, "_cluster") and self._cluster is not None:
                self._cluster.shutdown()
        except Exception as e:
            # Ignore errors during cleanup, but log them
            logger.debug(f"Error during ScyllaDB adapter cleanup: {e}")

    def __del__(self) -> None:
        """Destructor to ensure resources are cleaned up."""
        try:
            self.close()
        except Exception as e:
            # Ignore errors during destructor cleanup
            logger.debug(f"Error in ScyllaDB adapter destructor: {e}")

    @override
    def health_check(self) -> dict[str, Any]:
        """Perform a health check on the ScyllaDB connection.

        Returns:
            dict[str, Any]: Health check result with status, latency_ms, and optional error.
        """
        if not self.is_connected():
            return {
                "status": "unhealthy",
                "latency_ms": 0.0,
                "error": "Not connected to cluster",
            }

        try:
            start_time = time.time()
            session = self.get_session()
            original_timeout = session.default_timeout
            session.default_timeout = self.config.HEALTH_CHECK_TIMEOUT
            try:
                session.execute("SELECT now() FROM system.local")
            finally:
                session.default_timeout = original_timeout
            latency_ms = (time.time() - start_time) * 1000
        except Exception as e:
            return {
                "status": "unhealthy",
                "latency_ms": 0.0,
                "error": str(e),
            }
        else:
            return {
                "status": "healthy",
                "latency_ms": latency_ms,
                "error": None,
            }

    @override
    def count(self, table: str, conditions: dict[str, Any] | None = None) -> int:
        """Count rows in a table.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

        Returns:
            int: The number of rows matching the conditions.
        """
        query = f"SELECT COUNT(*) FROM {table}"

        params = None
        if conditions:
            where_clause = " AND ".join([f"{key} = %s" for key in conditions])
            query += f" WHERE {where_clause} ALLOW FILTERING"
            params = tuple(conditions.values())

        try:
            result = self.execute(query, params)
            row = result.one()
        except Exception as e:
            self._handle_scylladb_exception(e, "count")
            raise
        else:
            return row.count if row else 0

    @override
    def exists(self, table: str, conditions: dict[str, Any]) -> bool:
        """Check if a row exists in a table.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

        Returns:
            bool: True if at least one row exists, False otherwise.
        """
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query = f"SELECT COUNT(*) FROM {table} WHERE {where_clause} LIMIT 1 ALLOW FILTERING"

        try:
            result = self.execute(query, tuple(conditions.values()))
            row = result.one()
        except Exception as e:
            self._handle_scylladb_exception(e, "exists")
            raise
        else:
            return row.count > 0 if row else False

    @override
    def get_pool_stats(self) -> dict[str, Any]:
        """Get connection pool statistics.

        Returns:
            dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.
        """
        if not self.config.ENABLE_CONNECTION_POOL_MONITORING:
            return {
                "monitoring_enabled": False,
                "message": "Connection pool monitoring is disabled",
            }

        stats: dict[str, Any] = {"monitoring_enabled": True}

        try:
            session = self.get_session()
            cluster = self._cluster

            # Get pool state for each host
            hosts_stats = []
            for host in cluster.metadata.all_hosts():
                host_pool = session.get_pool_state(host)
                if host_pool:
                    hosts_stats.append(
                        {
                            "host": str(host),
                            "open_connections": host_pool.get("open_count", 0),
                            "in_flight_queries": host_pool.get("in_flight", 0),
                        },
                    )

            stats["hosts"] = hosts_stats
            stats["total_hosts"] = len(hosts_stats)
            stats["total_open_connections"] = sum(h.get("open_connections", 0) for h in hosts_stats)
            stats["total_in_flight_queries"] = sum(h.get("in_flight_queries", 0) for h in hosts_stats)

        except Exception as e:
            stats["error"] = str(e)

        return stats

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.config instance-attribute

config = SCYLLADB

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.execute

execute(
    query: str,
    params: dict[str, Any] | tuple | list | None = None,
) -> Any

Execute a CQL query.

Parameters:

Name Type Description Default
query str

The CQL query to execute.

required
params dict[str, Any] | tuple | list | None

Query parameters for parameterized queries.

None

Returns:

Name Type Description
Any Any

The query result set.

Source code in archipy/adapters/scylladb/adapters.py
@override
def execute(self, query: str, params: dict[str, Any] | tuple | list | None = None) -> Any:
    """Execute a CQL query.

    Args:
        query (str): The CQL query to execute.
        params (dict[str, Any] | tuple | list | None): Query parameters for parameterized queries.

    Returns:
        Any: The query result set.
    """
    session = self.get_session()
    try:
        if params:
            result = session.execute(query, params)
        else:
            result = session.execute(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "execute")
        raise
    else:
        return result

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.prepare

prepare(query: str) -> PreparedStatement

Prepare a CQL statement for repeated execution.

Parameters:

Name Type Description Default
query str

The CQL query to prepare.

required

Returns:

Name Type Description
PreparedStatement PreparedStatement

The prepared statement object.

Source code in archipy/adapters/scylladb/adapters.py
@override
def prepare(self, query: str) -> PreparedStatement:
    """Prepare a CQL statement for repeated execution.

    Args:
        query (str): The CQL query to prepare.

    Returns:
        PreparedStatement: The prepared statement object.
    """
    session = self.get_session()
    try:
        if self.config.ENABLE_PREPARED_STATEMENT_CACHE:
            # Use cached version if available - call the cached method
            cached_method: Any = getattr(self, "_prepare_cached", None)
            if cached_method is not None:
                return cached_method(query)
        # Direct prepare without cache
        prepared = session.prepare(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "prepare")
        raise
    else:
        return prepared

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.execute_prepared

execute_prepared(
    statement: PreparedStatement,
    params: dict[str, Any] | None = None,
) -> Any

Execute a prepared statement.

Parameters:

Name Type Description Default
statement PreparedStatement

The prepared statement object.

required
params dict[str, Any] | None

Parameters to bind to the statement.

None

Returns:

Name Type Description
Any Any

The query result set.

Source code in archipy/adapters/scylladb/adapters.py
@override
def execute_prepared(self, statement: PreparedStatement, params: dict[str, Any] | None = None) -> Any:
    """Execute a prepared statement.

    Args:
        statement (PreparedStatement): The prepared statement object.
        params (dict[str, Any] | None): Parameters to bind to the statement.

    Returns:
        Any: The query result set.
    """
    session = self.get_session()
    try:
        if params:
            result = session.execute(statement, params)
        else:
            result = session.execute(statement)
    except Exception as e:
        self._handle_scylladb_exception(e, "execute_prepared")
        raise
    else:
        return result

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.create_keyspace

create_keyspace(
    keyspace: str, replication_factor: int = 1
) -> None

Create a keyspace with simple replication strategy.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to create.

required
replication_factor int

The replication factor. Defaults to 1.

1
Source code in archipy/adapters/scylladb/adapters.py
@override
def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
    """Create a keyspace with simple replication strategy.

    Args:
        keyspace (str): The name of the keyspace to create.
        replication_factor (int): The replication factor. Defaults to 1.
    """
    # Use configured replication strategy
    if self.config.REPLICATION_STRATEGY == "NetworkTopologyStrategy" and self.config.REPLICATION_CONFIG:
        # Build replication config for NetworkTopologyStrategy
        replication_parts = ["'class': 'NetworkTopologyStrategy'"]
        for dc, rf in self.config.REPLICATION_CONFIG.items():
            replication_parts.append(f"'{dc}': {rf}")
        replication_str = ", ".join(replication_parts)
        query = f"""
            CREATE KEYSPACE IF NOT EXISTS {keyspace}
            WITH replication = {{{replication_str}}}
        """
    else:
        # Use SimpleStrategy (default)
        query = f"""
            CREATE KEYSPACE IF NOT EXISTS {keyspace}
            WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': {replication_factor}}}
        """
    try:
        self.execute(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "create_keyspace")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.drop_keyspace

drop_keyspace(keyspace: str) -> None

Drop a keyspace.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to drop.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
def drop_keyspace(self, keyspace: str) -> None:
    """Drop a keyspace.

    Args:
        keyspace (str): The name of the keyspace to drop.
    """
    query = f"DROP KEYSPACE IF EXISTS {keyspace}"
    try:
        self.execute(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "drop_keyspace")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.use_keyspace

use_keyspace(keyspace: str) -> None

Switch to a different keyspace context.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to use.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
def use_keyspace(self, keyspace: str) -> None:
    """Switch to a different keyspace context.

    Args:
        keyspace (str): The name of the keyspace to use.
    """
    session = self.get_session()
    try:
        session.set_keyspace(keyspace)
    except Exception as e:
        self._handle_scylladb_exception(e, "use_keyspace")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.create_table

create_table(table_schema: str) -> None

Create a table using raw CQL DDL.

Parameters:

Name Type Description Default
table_schema str

The complete CREATE TABLE CQL statement.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
def create_table(self, table_schema: str) -> None:
    """Create a table using raw CQL DDL.

    Args:
        table_schema (str): The complete CREATE TABLE CQL statement.
    """
    try:
        self.execute(table_schema)
    except Exception as e:
        self._handle_scylladb_exception(e, "create_table")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.drop_table

drop_table(table: str) -> None

Drop a table.

Parameters:

Name Type Description Default
table str

The name of the table to drop.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
def drop_table(self, table: str) -> None:
    """Drop a table.

    Args:
        table (str): The name of the table to drop.
    """
    query = f"DROP TABLE IF EXISTS {table}"
    try:
        self.execute(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "drop_table")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.insert

insert(
    table: str,
    data: dict[str, Any],
    ttl: int | None = None,
    if_not_exists: bool = False,
) -> None

Insert data into a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
data dict[str, Any]

Key-value pairs representing column names and values.

required
ttl int | None

Time to live in seconds. If None, data persists indefinitely.

None
if_not_exists bool

If True, use lightweight transaction (INSERT ... IF NOT EXISTS). This prevents errors on duplicate primary keys but is slow

False
Source code in archipy/adapters/scylladb/adapters.py
@override
def insert(self, table: str, data: dict[str, Any], ttl: int | None = None, if_not_exists: bool = False) -> None:
    """Insert data into a table.

    Args:
        table (str): The name of the table.
        data (dict[str, Any]): Key-value pairs representing column names and values.
        ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
        if_not_exists (bool): If True, use lightweight transaction (INSERT ... IF NOT EXISTS).
                          This prevents errors on duplicate primary keys but is slow
    """
    columns = ", ".join(data.keys())
    placeholders = ", ".join(["%s" for _ in data])
    query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"

    if if_not_exists:
        query += " IF NOT EXISTS"

    if ttl is not None:
        query += f" USING TTL {ttl}"

    try:
        self.execute(query, tuple(data.values()))
    except Exception as e:
        self._handle_scylladb_exception(e, "insert")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.select

select(
    table: str,
    columns: list[str] | None = None,
    conditions: dict[str, Any] | None = None,
) -> list[Any]

Select data from a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
columns list[str] | None

List of columns to select. If None, selects all (*).

None
conditions dict[str, Any] | None

WHERE clause conditions as key-value pairs.

None

Returns:

Type Description
list[Any]

list[Any]: List of result rows.

Source code in archipy/adapters/scylladb/adapters.py
@override
def select(
    self,
    table: str,
    columns: list[str] | None = None,
    conditions: dict[str, Any] | None = None,
) -> list[Any]:
    """Select data from a table.

    Args:
        table (str): The name of the table.
        columns (list[str] | None): List of columns to select. If None, selects all (*).
        conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

    Returns:
        list[Any]: List of result rows.
    """
    cols = ", ".join(columns) if columns else "*"
    query = f"SELECT {cols} FROM {table}"

    params = None
    if conditions:
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query += f" WHERE {where_clause}"
        params = tuple(conditions.values())

    try:
        result = self.execute(query, params)
        return list(result)
    except Exception as e:
        self._handle_scylladb_exception(e, "select")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.update

update(
    table: str,
    data: dict[str, Any],
    conditions: dict[str, Any],
    ttl: int | None = None,
) -> None

Update data in a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
data dict[str, Any]

Key-value pairs for SET clause.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required
ttl int | None

Time to live in seconds. If None, data persists indefinitely.

None
Source code in archipy/adapters/scylladb/adapters.py
@override
def update(self, table: str, data: dict[str, Any], conditions: dict[str, Any], ttl: int | None = None) -> None:
    """Update data in a table.

    Args:
        table (str): The name of the table.
        data (dict[str, Any]): Key-value pairs for SET clause.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
        ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
    """
    set_clause = ", ".join([f"{key} = %s" for key in data])
    where_clause = " AND ".join([f"{key} = %s" for key in conditions])
    query = f"UPDATE {table}"

    if ttl is not None:
        query += f" USING TTL {ttl}"

    query += f" SET {set_clause} WHERE {where_clause}"

    # Combine params: SET values first, then WHERE values
    params = tuple(data.values()) + tuple(conditions.values())

    try:
        self.execute(query, params)
    except Exception as e:
        self._handle_scylladb_exception(e, "update")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.delete

delete(table: str, conditions: dict[str, Any]) -> None

Delete data from a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
def delete(self, table: str, conditions: dict[str, Any]) -> None:
    """Delete data from a table.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
    """
    where_clause = " AND ".join([f"{key} = %s" for key in conditions])
    query = f"DELETE FROM {table} WHERE {where_clause}"

    try:
        self.execute(query, tuple(conditions.values()))
    except Exception as e:
        self._handle_scylladb_exception(e, "delete")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.batch_execute

batch_execute(statements: list[str]) -> None

Execute multiple CQL statements in a batch.

Parameters:

Name Type Description Default
statements list[str]

List of CQL statements to execute in batch.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
def batch_execute(self, statements: list[str]) -> None:
    """Execute multiple CQL statements in a batch.

    Args:
        statements (list[str]): List of CQL statements to execute in batch.
    """
    session = self.get_session()
    batch = BatchStatement(consistency_level=self._get_consistency_level())

    try:
        for stmt in statements:
            batch.add(SimpleStatement(stmt))

        session.execute(batch)
    except Exception as e:
        self._handle_scylladb_exception(e, "batch_execute")
        raise

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.get_session

get_session() -> Any

Get the current session object.

Returns:

Name Type Description
Any Any

The active session object.

Source code in archipy/adapters/scylladb/adapters.py
@override
def get_session(self) -> Any:
    """Get the current session object.

    Returns:
        Any: The active session object.
    """
    return self._session

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.is_connected

is_connected() -> bool

Check if the adapter is connected to ScyllaDB cluster.

Returns:

Name Type Description
bool bool

True if connected, False otherwise.

Source code in archipy/adapters/scylladb/adapters.py
@override
def is_connected(self) -> bool:
    """Check if the adapter is connected to ScyllaDB cluster.

    Returns:
        bool: True if connected, False otherwise.
    """
    return self._session is not None and not self._session.is_shutdown

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.close

close() -> None

Close the connection and clean up resources.

This method should be called when the adapter is no longer needed to properly release resources.

Source code in archipy/adapters/scylladb/adapters.py
def close(self) -> None:
    """Close the connection and clean up resources.

    This method should be called when the adapter is no longer needed
    to properly release resources.
    """
    try:
        if hasattr(self, "_session") and self._session is not None:
            self._session.shutdown()
        if hasattr(self, "_cluster") and self._cluster is not None:
            self._cluster.shutdown()
    except Exception as e:
        # Ignore errors during cleanup, but log them
        logger.debug(f"Error during ScyllaDB adapter cleanup: {e}")

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.health_check

health_check() -> dict[str, Any]

Perform a health check on the ScyllaDB connection.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Health check result with status, latency_ms, and optional error.

Source code in archipy/adapters/scylladb/adapters.py
@override
def health_check(self) -> dict[str, Any]:
    """Perform a health check on the ScyllaDB connection.

    Returns:
        dict[str, Any]: Health check result with status, latency_ms, and optional error.
    """
    if not self.is_connected():
        return {
            "status": "unhealthy",
            "latency_ms": 0.0,
            "error": "Not connected to cluster",
        }

    try:
        start_time = time.time()
        session = self.get_session()
        original_timeout = session.default_timeout
        session.default_timeout = self.config.HEALTH_CHECK_TIMEOUT
        try:
            session.execute("SELECT now() FROM system.local")
        finally:
            session.default_timeout = original_timeout
        latency_ms = (time.time() - start_time) * 1000
    except Exception as e:
        return {
            "status": "unhealthy",
            "latency_ms": 0.0,
            "error": str(e),
        }
    else:
        return {
            "status": "healthy",
            "latency_ms": latency_ms,
            "error": None,
        }

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.count

count(
    table: str, conditions: dict[str, Any] | None = None
) -> int

Count rows in a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any] | None

WHERE clause conditions as key-value pairs.

None

Returns:

Name Type Description
int int

The number of rows matching the conditions.

Source code in archipy/adapters/scylladb/adapters.py
@override
def count(self, table: str, conditions: dict[str, Any] | None = None) -> int:
    """Count rows in a table.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

    Returns:
        int: The number of rows matching the conditions.
    """
    query = f"SELECT COUNT(*) FROM {table}"

    params = None
    if conditions:
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query += f" WHERE {where_clause} ALLOW FILTERING"
        params = tuple(conditions.values())

    try:
        result = self.execute(query, params)
        row = result.one()
    except Exception as e:
        self._handle_scylladb_exception(e, "count")
        raise
    else:
        return row.count if row else 0

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.exists

exists(table: str, conditions: dict[str, Any]) -> bool

Check if a row exists in a table.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required

Returns:

Name Type Description
bool bool

True if at least one row exists, False otherwise.

Source code in archipy/adapters/scylladb/adapters.py
@override
def exists(self, table: str, conditions: dict[str, Any]) -> bool:
    """Check if a row exists in a table.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

    Returns:
        bool: True if at least one row exists, False otherwise.
    """
    where_clause = " AND ".join([f"{key} = %s" for key in conditions])
    query = f"SELECT COUNT(*) FROM {table} WHERE {where_clause} LIMIT 1 ALLOW FILTERING"

    try:
        result = self.execute(query, tuple(conditions.values()))
        row = result.one()
    except Exception as e:
        self._handle_scylladb_exception(e, "exists")
        raise
    else:
        return row.count > 0 if row else False

archipy.adapters.scylladb.adapters.ScyllaDBAdapter.get_pool_stats

get_pool_stats() -> dict[str, Any]

Get connection pool statistics.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.

Source code in archipy/adapters/scylladb/adapters.py
@override
def get_pool_stats(self) -> dict[str, Any]:
    """Get connection pool statistics.

    Returns:
        dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.
    """
    if not self.config.ENABLE_CONNECTION_POOL_MONITORING:
        return {
            "monitoring_enabled": False,
            "message": "Connection pool monitoring is disabled",
        }

    stats: dict[str, Any] = {"monitoring_enabled": True}

    try:
        session = self.get_session()
        cluster = self._cluster

        # Get pool state for each host
        hosts_stats = []
        for host in cluster.metadata.all_hosts():
            host_pool = session.get_pool_state(host)
            if host_pool:
                hosts_stats.append(
                    {
                        "host": str(host),
                        "open_connections": host_pool.get("open_count", 0),
                        "in_flight_queries": host_pool.get("in_flight", 0),
                    },
                )

        stats["hosts"] = hosts_stats
        stats["total_hosts"] = len(hosts_stats)
        stats["total_open_connections"] = sum(h.get("open_connections", 0) for h in hosts_stats)
        stats["total_in_flight_queries"] = sum(h.get("in_flight_queries", 0) for h in hosts_stats)

    except Exception as e:
        stats["error"] = str(e)

    return stats

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter

Bases: AsyncScyllaDBPort, ScyllaDBExceptionHandlerMixin

Asynchronous adapter for ScyllaDB operations.

This adapter implements the AsyncScyllaDBPort interface to provide async database operations using the ScyllaDB driver's async capabilities.

Parameters:

Name Type Description Default
config ScyllaDBConfig | None

Configuration settings for ScyllaDB. If None, retrieves from global config. Defaults to None.

None
Source code in archipy/adapters/scylladb/adapters.py
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
class AsyncScyllaDBAdapter(AsyncScyllaDBPort, ScyllaDBExceptionHandlerMixin):
    """Asynchronous adapter for ScyllaDB operations.

    This adapter implements the AsyncScyllaDBPort interface to provide async
    database operations using the ScyllaDB driver's async capabilities.

    Args:
        config (ScyllaDBConfig | None): Configuration settings for ScyllaDB.
            If None, retrieves from global config. Defaults to None.
    """

    def __init__(self, config: ScyllaDBConfig | None = None) -> None:
        """Initialize the AsyncScyllaDBAdapter with configuration settings.

        Args:
            config (ScyllaDBConfig | None): Configuration settings for ScyllaDB.
                If None, retrieves from global config. Defaults to None.
        """
        if config is not None:
            self.config = config
        else:
            try:
                self.config = BaseConfig.global_config().SCYLLADB
            except AttributeError:
                # SCYLLADB not configured, use defaults
                self.config = ScyllaDBConfig()
        self.__post_init__()
        try:
            self._cluster = self._create_cluster()
            self._session = self._cluster.connect()
            self._session.default_timeout = self.config.REQUEST_TIMEOUT
            if self.config.KEYSPACE:
                self._session.set_keyspace(self.config.KEYSPACE)

        except Exception as e:
            self._handle_scylladb_exception(e, "connect")
            raise

    def _get_consistency_level(self) -> int:
        """Get ConsistencyLevel enum from config string.

        Returns:
            int: The consistency level enum value.
        """
        consistency_map: dict[str, int] = {
            "ONE": ConsistencyLevel.ONE,
            "TWO": ConsistencyLevel.TWO,
            "THREE": ConsistencyLevel.THREE,
            "QUORUM": ConsistencyLevel.QUORUM,
            "ALL": ConsistencyLevel.ALL,
            "LOCAL_QUORUM": ConsistencyLevel.LOCAL_QUORUM,
            "EACH_QUORUM": ConsistencyLevel.EACH_QUORUM,
            "LOCAL_ONE": ConsistencyLevel.LOCAL_ONE,
            "ANY": ConsistencyLevel.ANY,
        }
        # get() returns int | None, but we provide a default
        consistency = consistency_map.get(self.config.CONSISTENCY_LEVEL.upper())
        if consistency is None:
            return ConsistencyLevel.ONE
        return consistency

    def _create_cluster(self) -> Any:
        """Create and configure the Cluster instance.

        Returns:
            Cluster: Configured cluster instance.
        """
        auth_provider = None
        if self.config.USERNAME and self.config.PASSWORD:
            auth_provider = PlainTextAuthProvider(
                username=self.config.USERNAME,
                password=self.config.PASSWORD.get_secret_value(),
            )

        # Configure load balancing policy with optional datacenter awareness
        if self.config.LOCAL_DC:
            base_policy = DCAwareRoundRobinPolicy(local_dc=self.config.LOCAL_DC)
            load_balancing_policy = TokenAwarePolicy(base_policy)
        else:
            load_balancing_policy = TokenAwarePolicy(RoundRobinPolicy())

        if self.config.RETRY_POLICY == "FALLTHROUGH":
            retry_policy = FallthroughRetryPolicy()
        else:  # EXPONENTIAL_BACKOFF (default)
            retry_policy = ExponentialBackoffRetryPolicy(
                max_num_retries=self.config.RETRY_MAX_NUM_RETRIES,
                min_interval=self.config.RETRY_MIN_INTERVAL,
                max_interval=self.config.RETRY_MAX_INTERVAL,
            )
        # Shard awareness disabled for Docker/NAT environments
        shard_aware_options = None
        if self.config.DISABLE_SHARD_AWARENESS:
            shard_aware_options = {"disable": True}

        # Address translation for Docker/NAT environments where gossip-discovered
        # internal container IPs are unreachable from the host
        address_translator = None
        if self.config.ADDRESS_TRANSLATION_ENABLED:
            address_translator = _FixedAddressTranslator(self.config.CONTACT_POINTS[0])

        # Cluster is from cassandra.cluster, properly typed
        cluster = Cluster(
            contact_points=self.config.CONTACT_POINTS,
            port=self.config.PORT,
            auth_provider=auth_provider,
            protocol_version=self.config.PROTOCOL_VERSION,
            compression=bool(self.config.COMPRESSION),
            connect_timeout=self.config.CONNECT_TIMEOUT,
            load_balancing_policy=load_balancing_policy,
            default_retry_policy=retry_policy,
            shard_aware_options=shard_aware_options,
            address_translator=address_translator,
        )

        # Configure connection pool settings
        if cluster.profile_manager is not None:
            profile = cluster.profile_manager.default
            profile.request_timeout = self.config.REQUEST_TIMEOUT
            # Configure connection pool limits per host
            profile.max_connections_per_host = self.config.MAX_CONNECTIONS_PER_HOST
            profile.min_connections_per_host = self.config.MIN_CONNECTIONS_PER_HOST
            profile.core_connections_per_host = self.config.CORE_CONNECTIONS_PER_HOST

        # Set pool configuration
        cluster.connection_class.max_requests_per_connection = self.config.MAX_REQUESTS_PER_CONNECTION

        return cluster

    async def _await_future(self, future: Any) -> Any:
        """Convert ResponseFuture to awaitable.

        Args:
            future (ResponseFuture): The response future from async execution.

        Returns:
            Any: The result from the future.
        """
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, future.result)

    @override
    async def execute(self, query: str, params: dict[str, Any] | tuple | list | None = None) -> Any:
        """Execute a CQL query asynchronously.

        Args:
            query (str): The CQL query to execute.
            params (dict[str, Any] | tuple | list | None): Query parameters for parameterized queries.

        Returns:
            Any: The query result set.
        """
        session = await self.get_session()
        try:
            if params:
                future = session.execute_async(query, params)
            else:
                future = session.execute_async(query)
            result = await self._await_future(future)
        except Exception as e:
            self._handle_scylladb_exception(e, "execute")
            raise
        else:
            return result

    @override
    async def prepare(self, query: str) -> PreparedStatement:
        """Prepare a CQL statement asynchronously.

        Args:
            query (str): The CQL query to prepare.

        Returns:
            PreparedStatement: The prepared statement object.
        """
        session = await self.get_session()
        try:
            if self.config.ENABLE_PREPARED_STATEMENT_CACHE:
                # Use cached version if available - call the cached method
                cached_method: Any = getattr(self, "_prepare_cached", None)
                if cached_method is not None:
                    return await cached_method(query)
            # Direct prepare without cache
            prepared = session.prepare(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "prepare")
            raise
        else:
            return prepared

    def __post_init__(self) -> None:
        """Post-initialization hook to apply cache decorator if enabled."""
        if self.config.ENABLE_PREPARED_STATEMENT_CACHE:
            # Create an async method to cache
            async def _prepare_internal(query: str) -> PreparedStatement:
                """Internal cached method to prepare a CQL statement asynchronously."""
                session = await self.get_session()
                try:
                    prepared = session.prepare(query)
                except Exception as e:
                    self._handle_scylladb_exception(e, "prepare")
                    raise
                else:
                    return prepared

            # Apply async cache decorator
            cached_prepare = alru_cache(
                ttl=self.config.PREPARED_STATEMENT_CACHE_TTL_SECONDS,
                maxsize=self.config.PREPARED_STATEMENT_CACHE_SIZE,
            )(_prepare_internal)
            # Store the cached version using setattr for dynamic attribute
            self._prepare_cached = cached_prepare

    @override
    async def execute_prepared(self, statement: PreparedStatement, params: dict[str, Any] | None = None) -> Any:
        """Execute a prepared statement asynchronously.

        Args:
            statement (PreparedStatement): The prepared statement object.
            params (dict[str, Any] | None): Parameters to bind to the statement.

        Returns:
            Any: The query result set.
        """
        session = await self.get_session()
        try:
            if params:
                future = session.execute_async(statement, params)
            else:
                future = session.execute_async(statement)
            result = await self._await_future(future)
        except Exception as e:
            self._handle_scylladb_exception(e, "execute_prepared")
            raise
        else:
            return result

    @override
    async def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
        """Create a keyspace asynchronously.

        Args:
            keyspace (str): The name of the keyspace to create.
            replication_factor (int): The replication factor. Defaults to 1.
        """
        # Use configured replication strategy
        if self.config.REPLICATION_STRATEGY == "NetworkTopologyStrategy" and self.config.REPLICATION_CONFIG:
            # Build replication config for NetworkTopologyStrategy
            replication_parts = ["'class': 'NetworkTopologyStrategy'"]
            for dc, rf in self.config.REPLICATION_CONFIG.items():
                replication_parts.append(f"'{dc}': {rf}")
            replication_str = ", ".join(replication_parts)
            query = f"""
                CREATE KEYSPACE IF NOT EXISTS {keyspace}
                WITH replication = {{{replication_str}}}
            """
        else:
            # Use SimpleStrategy (default)
            query = f"""
                CREATE KEYSPACE IF NOT EXISTS {keyspace}
                WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': {replication_factor}}}
            """
        try:
            await self.execute(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "create_keyspace")
            raise

    @override
    async def drop_keyspace(self, keyspace: str) -> None:
        """Drop a keyspace asynchronously.

        Args:
            keyspace (str): The name of the keyspace to drop.
        """
        query = f"DROP KEYSPACE IF EXISTS {keyspace}"
        try:
            await self.execute(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "drop_keyspace")
            raise

    @override
    async def use_keyspace(self, keyspace: str) -> None:
        """Switch to a different keyspace context asynchronously.

        Args:
            keyspace (str): The name of the keyspace to use.
        """
        session = await self.get_session()
        try:
            session.set_keyspace(keyspace)
        except Exception as e:
            self._handle_scylladb_exception(e, "use_keyspace")
            raise

    @override
    async def create_table(self, table_schema: str) -> None:
        """Create a table asynchronously.

        Args:
            table_schema (str): The complete CREATE TABLE CQL statement.
        """
        try:
            await self.execute(table_schema)
        except Exception as e:
            self._handle_scylladb_exception(e, "create_table")
            raise

    @override
    async def drop_table(self, table: str) -> None:
        """Drop a table asynchronously.

        Args:
            table (str): The name of the table to drop.
        """
        query = f"DROP TABLE IF EXISTS {table}"
        try:
            await self.execute(query)
        except Exception as e:
            self._handle_scylladb_exception(e, "drop_table")
            raise

    @override
    async def insert(
        self,
        table: str,
        data: dict[str, Any],
        ttl: int | None = None,
        if_not_exists: bool = False,
    ) -> None:
        """Insert data into a table asynchronously.

        Args:
            table (str): The name of the table.
            data (dict[str, Any]): Key-value pairs representing column names and values.
            ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
            if_not_exists (bool): If True, use lightweight transaction (INSERT ... IF NOT EXISTS).
                              This prevents errors on duplicate primary keys but is slow
        """
        columns = ", ".join(data.keys())
        placeholders = ", ".join(["%s" for _ in data])
        query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"

        if if_not_exists:
            query += " IF NOT EXISTS"

        if ttl is not None:
            query += f" USING TTL {ttl}"

        try:
            await self.execute(query, tuple(data.values()))
        except Exception as e:
            self._handle_scylladb_exception(e, "insert")
            raise

    @override
    async def select(
        self,
        table: str,
        columns: list[str] | None = None,
        conditions: dict[str, Any] | None = None,
    ) -> list[Any]:
        """Select data from a table asynchronously.

        Args:
            table (str): The name of the table.
            columns (list[str] | None): List of columns to select. If None, selects all (*).
            conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

        Returns:
            list[Any]: List of result rows.
        """
        cols = ", ".join(columns) if columns else "*"
        query = f"SELECT {cols} FROM {table}"

        params = None
        if conditions:
            where_clause = " AND ".join([f"{key} = %s" for key in conditions])
            query += f" WHERE {where_clause}"
            params = tuple(conditions.values())

        try:
            result = await self.execute(query, params)
            return list(result)
        except Exception as e:
            self._handle_scylladb_exception(e, "select")
            raise

    @override
    async def update(
        self,
        table: str,
        data: dict[str, Any],
        conditions: dict[str, Any],
        ttl: int | None = None,
    ) -> None:
        """Update data in a table asynchronously.

        Args:
            table (str): The name of the table.
            data (dict[str, Any]): Key-value pairs for SET clause.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
            ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
        """
        set_clause = ", ".join([f"{key} = %s" for key in data])
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query = f"UPDATE {table}"

        if ttl is not None:
            query += f" USING TTL {ttl}"

        query += f" SET {set_clause} WHERE {where_clause}"

        # Combine params: SET values first, then WHERE values
        params = tuple(data.values()) + tuple(conditions.values())

        try:
            await self.execute(query, params)
        except Exception as e:
            self._handle_scylladb_exception(e, "update")
            raise

    @override
    async def delete(self, table: str, conditions: dict[str, Any]) -> None:
        """Delete data from a table asynchronously.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
        """
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query = f"DELETE FROM {table} WHERE {where_clause}"

        try:
            await self.execute(query, tuple(conditions.values()))
        except Exception as e:
            self._handle_scylladb_exception(e, "delete")
            raise

    @override
    async def batch_execute(self, statements: list[str]) -> None:
        """Execute multiple CQL statements in a batch asynchronously.

        Args:
            statements (list[str]): List of CQL statements to execute in batch.
        """
        session = await self.get_session()
        batch = BatchStatement(consistency_level=self._get_consistency_level())

        try:
            for stmt in statements:
                batch.add(SimpleStatement(stmt))

            future = session.execute_async(batch)
            await self._await_future(future)
        except Exception as e:
            self._handle_scylladb_exception(e, "batch_execute")
            raise

    @override
    async def get_session(self) -> Any:
        """Get the current session object asynchronously.

        Returns:
            Any: The active session object.
        """
        # Session is from cassandra.cluster, properly typed
        return self._session

    @override
    async def is_connected(self) -> bool:
        """Check if the adapter is connected to ScyllaDB cluster.

        Returns:
            bool: True if connected, False otherwise.
        """
        return self._session is not None and not self._session.is_shutdown

    async def close(self) -> None:
        """Close the connection and clean up resources asynchronously.

        This method should be called when the adapter is no longer needed
        to properly release resources.
        """
        try:
            if hasattr(self, "_session") and self._session is not None:
                self._session.shutdown()
            if hasattr(self, "_cluster") and self._cluster is not None:
                self._cluster.shutdown()
        except Exception as e:
            # Ignore errors during cleanup, but log them
            logger.debug(f"Error during async ScyllaDB adapter cleanup: {e}")

    def __del__(self) -> None:
        """Destructor to ensure resources are cleaned up."""
        try:
            if hasattr(self, "_session") and self._session is not None:
                self._session.shutdown()
            if hasattr(self, "_cluster") and self._cluster is not None:
                self._cluster.shutdown()
        except Exception as e:
            # Ignore errors during destructor cleanup
            logger.debug(f"Error in async ScyllaDB adapter destructor: {e}")

    @override
    async def health_check(self) -> dict[str, Any]:
        """Perform a health check on the ScyllaDB connection.

        Returns:
            dict[str, Any]: Health check result with status, latency_ms, and optional error.
        """
        if not await self.is_connected():
            return {
                "status": "unhealthy",
                "latency_ms": 0.0,
                "error": "Not connected to cluster",
            }

        try:
            start_time = time.time()
            session = await self.get_session()
            original_timeout = session.default_timeout
            session.default_timeout = self.config.HEALTH_CHECK_TIMEOUT
            try:
                future = session.execute_async("SELECT now() FROM system.local")
                await self._await_future(future)
            finally:
                session.default_timeout = original_timeout
            latency_ms = (time.time() - start_time) * 1000
        except Exception as e:
            return {
                "status": "unhealthy",
                "latency_ms": 0.0,
                "error": str(e),
            }
        else:
            return {
                "status": "healthy",
                "latency_ms": latency_ms,
                "error": None,
            }

    @override
    async def count(self, table: str, conditions: dict[str, Any] | None = None) -> int:
        """Count rows in a table asynchronously.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

        Returns:
            int: The number of rows matching the conditions.
        """
        query = f"SELECT COUNT(*) FROM {table}"

        params = None
        if conditions:
            where_clause = " AND ".join([f"{key} = %s" for key in conditions])
            query += f" WHERE {where_clause} ALLOW FILTERING"
            params = tuple(conditions.values())

        try:
            result = await self.execute(query, params)
            row = result.one()
        except Exception as e:
            self._handle_scylladb_exception(e, "count")
            raise
        else:
            return row.count if row else 0

    @override
    async def exists(self, table: str, conditions: dict[str, Any]) -> bool:
        """Check if a row exists in a table asynchronously.

        Args:
            table (str): The name of the table.
            conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

        Returns:
            bool: True if at least one row exists, False otherwise.
        """
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query = f"SELECT COUNT(*) FROM {table} WHERE {where_clause} LIMIT 1 ALLOW FILTERING"

        try:
            result = await self.execute(query, tuple(conditions.values()))
            row = result.one()
        except Exception as e:
            self._handle_scylladb_exception(e, "exists")
            raise
        else:
            return row.count > 0 if row else False

    @override
    async def get_pool_stats(self) -> dict[str, Any]:
        """Get connection pool statistics asynchronously.

        Returns:
            dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.
        """
        if not self.config.ENABLE_CONNECTION_POOL_MONITORING:
            return {
                "monitoring_enabled": False,
                "message": "Connection pool monitoring is disabled",
            }

        stats: dict[str, Any] = {"monitoring_enabled": True}

        try:
            session = await self.get_session()
            cluster = self._cluster

            # Get pool state for each host
            hosts_stats = []
            for host in cluster.metadata.all_hosts():
                host_pool = session.get_pool_state(host)
                if host_pool:
                    hosts_stats.append(
                        {
                            "host": str(host),
                            "open_connections": host_pool.get("open_count", 0),
                            "in_flight_queries": host_pool.get("in_flight", 0),
                        },
                    )

            stats["hosts"] = hosts_stats
            stats["total_hosts"] = len(hosts_stats)
            stats["total_open_connections"] = sum(h.get("open_connections", 0) for h in hosts_stats)
            stats["total_in_flight_queries"] = sum(h.get("in_flight_queries", 0) for h in hosts_stats)

        except Exception as e:
            stats["error"] = str(e)

        return stats

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.config instance-attribute

config = SCYLLADB

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.execute async

execute(
    query: str,
    params: dict[str, Any] | tuple | list | None = None,
) -> Any

Execute a CQL query asynchronously.

Parameters:

Name Type Description Default
query str

The CQL query to execute.

required
params dict[str, Any] | tuple | list | None

Query parameters for parameterized queries.

None

Returns:

Name Type Description
Any Any

The query result set.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def execute(self, query: str, params: dict[str, Any] | tuple | list | None = None) -> Any:
    """Execute a CQL query asynchronously.

    Args:
        query (str): The CQL query to execute.
        params (dict[str, Any] | tuple | list | None): Query parameters for parameterized queries.

    Returns:
        Any: The query result set.
    """
    session = await self.get_session()
    try:
        if params:
            future = session.execute_async(query, params)
        else:
            future = session.execute_async(query)
        result = await self._await_future(future)
    except Exception as e:
        self._handle_scylladb_exception(e, "execute")
        raise
    else:
        return result

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.prepare async

prepare(query: str) -> PreparedStatement

Prepare a CQL statement asynchronously.

Parameters:

Name Type Description Default
query str

The CQL query to prepare.

required

Returns:

Name Type Description
PreparedStatement PreparedStatement

The prepared statement object.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def prepare(self, query: str) -> PreparedStatement:
    """Prepare a CQL statement asynchronously.

    Args:
        query (str): The CQL query to prepare.

    Returns:
        PreparedStatement: The prepared statement object.
    """
    session = await self.get_session()
    try:
        if self.config.ENABLE_PREPARED_STATEMENT_CACHE:
            # Use cached version if available - call the cached method
            cached_method: Any = getattr(self, "_prepare_cached", None)
            if cached_method is not None:
                return await cached_method(query)
        # Direct prepare without cache
        prepared = session.prepare(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "prepare")
        raise
    else:
        return prepared

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.execute_prepared async

execute_prepared(
    statement: PreparedStatement,
    params: dict[str, Any] | None = None,
) -> Any

Execute a prepared statement asynchronously.

Parameters:

Name Type Description Default
statement PreparedStatement

The prepared statement object.

required
params dict[str, Any] | None

Parameters to bind to the statement.

None

Returns:

Name Type Description
Any Any

The query result set.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def execute_prepared(self, statement: PreparedStatement, params: dict[str, Any] | None = None) -> Any:
    """Execute a prepared statement asynchronously.

    Args:
        statement (PreparedStatement): The prepared statement object.
        params (dict[str, Any] | None): Parameters to bind to the statement.

    Returns:
        Any: The query result set.
    """
    session = await self.get_session()
    try:
        if params:
            future = session.execute_async(statement, params)
        else:
            future = session.execute_async(statement)
        result = await self._await_future(future)
    except Exception as e:
        self._handle_scylladb_exception(e, "execute_prepared")
        raise
    else:
        return result

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.create_keyspace async

create_keyspace(
    keyspace: str, replication_factor: int = 1
) -> None

Create a keyspace asynchronously.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to create.

required
replication_factor int

The replication factor. Defaults to 1.

1
Source code in archipy/adapters/scylladb/adapters.py
@override
async def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
    """Create a keyspace asynchronously.

    Args:
        keyspace (str): The name of the keyspace to create.
        replication_factor (int): The replication factor. Defaults to 1.
    """
    # Use configured replication strategy
    if self.config.REPLICATION_STRATEGY == "NetworkTopologyStrategy" and self.config.REPLICATION_CONFIG:
        # Build replication config for NetworkTopologyStrategy
        replication_parts = ["'class': 'NetworkTopologyStrategy'"]
        for dc, rf in self.config.REPLICATION_CONFIG.items():
            replication_parts.append(f"'{dc}': {rf}")
        replication_str = ", ".join(replication_parts)
        query = f"""
            CREATE KEYSPACE IF NOT EXISTS {keyspace}
            WITH replication = {{{replication_str}}}
        """
    else:
        # Use SimpleStrategy (default)
        query = f"""
            CREATE KEYSPACE IF NOT EXISTS {keyspace}
            WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': {replication_factor}}}
        """
    try:
        await self.execute(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "create_keyspace")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.drop_keyspace async

drop_keyspace(keyspace: str) -> None

Drop a keyspace asynchronously.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to drop.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
async def drop_keyspace(self, keyspace: str) -> None:
    """Drop a keyspace asynchronously.

    Args:
        keyspace (str): The name of the keyspace to drop.
    """
    query = f"DROP KEYSPACE IF EXISTS {keyspace}"
    try:
        await self.execute(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "drop_keyspace")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.use_keyspace async

use_keyspace(keyspace: str) -> None

Switch to a different keyspace context asynchronously.

Parameters:

Name Type Description Default
keyspace str

The name of the keyspace to use.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
async def use_keyspace(self, keyspace: str) -> None:
    """Switch to a different keyspace context asynchronously.

    Args:
        keyspace (str): The name of the keyspace to use.
    """
    session = await self.get_session()
    try:
        session.set_keyspace(keyspace)
    except Exception as e:
        self._handle_scylladb_exception(e, "use_keyspace")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.create_table async

create_table(table_schema: str) -> None

Create a table asynchronously.

Parameters:

Name Type Description Default
table_schema str

The complete CREATE TABLE CQL statement.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
async def create_table(self, table_schema: str) -> None:
    """Create a table asynchronously.

    Args:
        table_schema (str): The complete CREATE TABLE CQL statement.
    """
    try:
        await self.execute(table_schema)
    except Exception as e:
        self._handle_scylladb_exception(e, "create_table")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.drop_table async

drop_table(table: str) -> None

Drop a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table to drop.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
async def drop_table(self, table: str) -> None:
    """Drop a table asynchronously.

    Args:
        table (str): The name of the table to drop.
    """
    query = f"DROP TABLE IF EXISTS {table}"
    try:
        await self.execute(query)
    except Exception as e:
        self._handle_scylladb_exception(e, "drop_table")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.insert async

insert(
    table: str,
    data: dict[str, Any],
    ttl: int | None = None,
    if_not_exists: bool = False,
) -> None

Insert data into a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
data dict[str, Any]

Key-value pairs representing column names and values.

required
ttl int | None

Time to live in seconds. If None, data persists indefinitely.

None
if_not_exists bool

If True, use lightweight transaction (INSERT ... IF NOT EXISTS). This prevents errors on duplicate primary keys but is slow

False
Source code in archipy/adapters/scylladb/adapters.py
@override
async def insert(
    self,
    table: str,
    data: dict[str, Any],
    ttl: int | None = None,
    if_not_exists: bool = False,
) -> None:
    """Insert data into a table asynchronously.

    Args:
        table (str): The name of the table.
        data (dict[str, Any]): Key-value pairs representing column names and values.
        ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
        if_not_exists (bool): If True, use lightweight transaction (INSERT ... IF NOT EXISTS).
                          This prevents errors on duplicate primary keys but is slow
    """
    columns = ", ".join(data.keys())
    placeholders = ", ".join(["%s" for _ in data])
    query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"

    if if_not_exists:
        query += " IF NOT EXISTS"

    if ttl is not None:
        query += f" USING TTL {ttl}"

    try:
        await self.execute(query, tuple(data.values()))
    except Exception as e:
        self._handle_scylladb_exception(e, "insert")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.select async

select(
    table: str,
    columns: list[str] | None = None,
    conditions: dict[str, Any] | None = None,
) -> list[Any]

Select data from a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
columns list[str] | None

List of columns to select. If None, selects all (*).

None
conditions dict[str, Any] | None

WHERE clause conditions as key-value pairs.

None

Returns:

Type Description
list[Any]

list[Any]: List of result rows.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def select(
    self,
    table: str,
    columns: list[str] | None = None,
    conditions: dict[str, Any] | None = None,
) -> list[Any]:
    """Select data from a table asynchronously.

    Args:
        table (str): The name of the table.
        columns (list[str] | None): List of columns to select. If None, selects all (*).
        conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

    Returns:
        list[Any]: List of result rows.
    """
    cols = ", ".join(columns) if columns else "*"
    query = f"SELECT {cols} FROM {table}"

    params = None
    if conditions:
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query += f" WHERE {where_clause}"
        params = tuple(conditions.values())

    try:
        result = await self.execute(query, params)
        return list(result)
    except Exception as e:
        self._handle_scylladb_exception(e, "select")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.update async

update(
    table: str,
    data: dict[str, Any],
    conditions: dict[str, Any],
    ttl: int | None = None,
) -> None

Update data in a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
data dict[str, Any]

Key-value pairs for SET clause.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required
ttl int | None

Time to live in seconds. If None, data persists indefinitely.

None
Source code in archipy/adapters/scylladb/adapters.py
@override
async def update(
    self,
    table: str,
    data: dict[str, Any],
    conditions: dict[str, Any],
    ttl: int | None = None,
) -> None:
    """Update data in a table asynchronously.

    Args:
        table (str): The name of the table.
        data (dict[str, Any]): Key-value pairs for SET clause.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
        ttl (int | None): Time to live in seconds. If None, data persists indefinitely.
    """
    set_clause = ", ".join([f"{key} = %s" for key in data])
    where_clause = " AND ".join([f"{key} = %s" for key in conditions])
    query = f"UPDATE {table}"

    if ttl is not None:
        query += f" USING TTL {ttl}"

    query += f" SET {set_clause} WHERE {where_clause}"

    # Combine params: SET values first, then WHERE values
    params = tuple(data.values()) + tuple(conditions.values())

    try:
        await self.execute(query, params)
    except Exception as e:
        self._handle_scylladb_exception(e, "update")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.delete async

delete(table: str, conditions: dict[str, Any]) -> None

Delete data from a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
async def delete(self, table: str, conditions: dict[str, Any]) -> None:
    """Delete data from a table asynchronously.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.
    """
    where_clause = " AND ".join([f"{key} = %s" for key in conditions])
    query = f"DELETE FROM {table} WHERE {where_clause}"

    try:
        await self.execute(query, tuple(conditions.values()))
    except Exception as e:
        self._handle_scylladb_exception(e, "delete")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.batch_execute async

batch_execute(statements: list[str]) -> None

Execute multiple CQL statements in a batch asynchronously.

Parameters:

Name Type Description Default
statements list[str]

List of CQL statements to execute in batch.

required
Source code in archipy/adapters/scylladb/adapters.py
@override
async def batch_execute(self, statements: list[str]) -> None:
    """Execute multiple CQL statements in a batch asynchronously.

    Args:
        statements (list[str]): List of CQL statements to execute in batch.
    """
    session = await self.get_session()
    batch = BatchStatement(consistency_level=self._get_consistency_level())

    try:
        for stmt in statements:
            batch.add(SimpleStatement(stmt))

        future = session.execute_async(batch)
        await self._await_future(future)
    except Exception as e:
        self._handle_scylladb_exception(e, "batch_execute")
        raise

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.get_session async

get_session() -> Any

Get the current session object asynchronously.

Returns:

Name Type Description
Any Any

The active session object.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def get_session(self) -> Any:
    """Get the current session object asynchronously.

    Returns:
        Any: The active session object.
    """
    # Session is from cassandra.cluster, properly typed
    return self._session

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.is_connected async

is_connected() -> bool

Check if the adapter is connected to ScyllaDB cluster.

Returns:

Name Type Description
bool bool

True if connected, False otherwise.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def is_connected(self) -> bool:
    """Check if the adapter is connected to ScyllaDB cluster.

    Returns:
        bool: True if connected, False otherwise.
    """
    return self._session is not None and not self._session.is_shutdown

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.close async

close() -> None

Close the connection and clean up resources asynchronously.

This method should be called when the adapter is no longer needed to properly release resources.

Source code in archipy/adapters/scylladb/adapters.py
async def close(self) -> None:
    """Close the connection and clean up resources asynchronously.

    This method should be called when the adapter is no longer needed
    to properly release resources.
    """
    try:
        if hasattr(self, "_session") and self._session is not None:
            self._session.shutdown()
        if hasattr(self, "_cluster") and self._cluster is not None:
            self._cluster.shutdown()
    except Exception as e:
        # Ignore errors during cleanup, but log them
        logger.debug(f"Error during async ScyllaDB adapter cleanup: {e}")

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.health_check async

health_check() -> dict[str, Any]

Perform a health check on the ScyllaDB connection.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Health check result with status, latency_ms, and optional error.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def health_check(self) -> dict[str, Any]:
    """Perform a health check on the ScyllaDB connection.

    Returns:
        dict[str, Any]: Health check result with status, latency_ms, and optional error.
    """
    if not await self.is_connected():
        return {
            "status": "unhealthy",
            "latency_ms": 0.0,
            "error": "Not connected to cluster",
        }

    try:
        start_time = time.time()
        session = await self.get_session()
        original_timeout = session.default_timeout
        session.default_timeout = self.config.HEALTH_CHECK_TIMEOUT
        try:
            future = session.execute_async("SELECT now() FROM system.local")
            await self._await_future(future)
        finally:
            session.default_timeout = original_timeout
        latency_ms = (time.time() - start_time) * 1000
    except Exception as e:
        return {
            "status": "unhealthy",
            "latency_ms": 0.0,
            "error": str(e),
        }
    else:
        return {
            "status": "healthy",
            "latency_ms": latency_ms,
            "error": None,
        }

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.count async

count(
    table: str, conditions: dict[str, Any] | None = None
) -> int

Count rows in a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any] | None

WHERE clause conditions as key-value pairs.

None

Returns:

Name Type Description
int int

The number of rows matching the conditions.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def count(self, table: str, conditions: dict[str, Any] | None = None) -> int:
    """Count rows in a table asynchronously.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any] | None): WHERE clause conditions as key-value pairs.

    Returns:
        int: The number of rows matching the conditions.
    """
    query = f"SELECT COUNT(*) FROM {table}"

    params = None
    if conditions:
        where_clause = " AND ".join([f"{key} = %s" for key in conditions])
        query += f" WHERE {where_clause} ALLOW FILTERING"
        params = tuple(conditions.values())

    try:
        result = await self.execute(query, params)
        row = result.one()
    except Exception as e:
        self._handle_scylladb_exception(e, "count")
        raise
    else:
        return row.count if row else 0

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.exists async

exists(table: str, conditions: dict[str, Any]) -> bool

Check if a row exists in a table asynchronously.

Parameters:

Name Type Description Default
table str

The name of the table.

required
conditions dict[str, Any]

WHERE clause conditions as key-value pairs.

required

Returns:

Name Type Description
bool bool

True if at least one row exists, False otherwise.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def exists(self, table: str, conditions: dict[str, Any]) -> bool:
    """Check if a row exists in a table asynchronously.

    Args:
        table (str): The name of the table.
        conditions (dict[str, Any]): WHERE clause conditions as key-value pairs.

    Returns:
        bool: True if at least one row exists, False otherwise.
    """
    where_clause = " AND ".join([f"{key} = %s" for key in conditions])
    query = f"SELECT COUNT(*) FROM {table} WHERE {where_clause} LIMIT 1 ALLOW FILTERING"

    try:
        result = await self.execute(query, tuple(conditions.values()))
        row = result.one()
    except Exception as e:
        self._handle_scylladb_exception(e, "exists")
        raise
    else:
        return row.count > 0 if row else False

archipy.adapters.scylladb.adapters.AsyncScyllaDBAdapter.get_pool_stats async

get_pool_stats() -> dict[str, Any]

Get connection pool statistics asynchronously.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.

Source code in archipy/adapters/scylladb/adapters.py
@override
async def get_pool_stats(self) -> dict[str, Any]:
    """Get connection pool statistics asynchronously.

    Returns:
        dict[str, Any]: Pool statistics including open connections, in-flight requests, etc.
    """
    if not self.config.ENABLE_CONNECTION_POOL_MONITORING:
        return {
            "monitoring_enabled": False,
            "message": "Connection pool monitoring is disabled",
        }

    stats: dict[str, Any] = {"monitoring_enabled": True}

    try:
        session = await self.get_session()
        cluster = self._cluster

        # Get pool state for each host
        hosts_stats = []
        for host in cluster.metadata.all_hosts():
            host_pool = session.get_pool_state(host)
            if host_pool:
                hosts_stats.append(
                    {
                        "host": str(host),
                        "open_connections": host_pool.get("open_count", 0),
                        "in_flight_queries": host_pool.get("in_flight", 0),
                    },
                )

        stats["hosts"] = hosts_stats
        stats["total_hosts"] = len(hosts_stats)
        stats["total_open_connections"] = sum(h.get("open_connections", 0) for h in hosts_stats)
        stats["total_in_flight_queries"] = sum(h.get("in_flight_queries", 0) for h in hosts_stats)

    except Exception as e:
        stats["error"] = str(e)

    return stats

options: show_root_toc_entry: false heading_level: 3