StarRocks Adapter¶
This example demonstrates how to use the StarRocks adapter for analytical database operations with proper exception handling and Python 3.13 type hints.
Basic Usage¶
import logging
from datetime import datetime
from archipy.adapters.starrocks.sqlalchemy.adapters import StarrocksSQLAlchemyAdapter
from archipy.models.entities.sqlalchemy.base_entities import BaseEntity
from archipy.models.errors import DatabaseQueryError, DatabaseConnectionError
from sqlalchemy import Column, String, Integer, DateTime
# Configure logging
logger = logging.getLogger(__name__)
# Define a model
class User(BaseEntity):
    __tablename__ = "users"
    username = Column(String(100), unique=True)
    email = Column(String(255), unique=True)
    age = Column(Integer)
    created_at = Column(DateTime)
# Create adapter
try:
    adapter = StarrocksSQLAlchemyAdapter()
except Exception as e:
    logger.error(f"Failed to create adapter: {e}")
    raise DatabaseConnectionError() from e
else:
    logger.info("StarRocks adapter created successfully")
# Create tables
try:
    BaseEntity.metadata.create_all(adapter.session_manager.engine)
except Exception as e:
    logger.error(f"Failed to create tables: {e}")
    raise DatabaseQueryError() from e
else:
    logger.info("Database tables created")
# Basic operations
try:
    with adapter.session() as session:
        # Create
        user = User(
            username="john_doe",
            email="john@example.com",
            age=30,
            created_at=datetime.now()
        )
        session.add(user)
        session.commit()
        # Read
        user = session.query(User).filter_by(username="john_doe").first()
        logger.info(f"User email: {user.email}")  # john@example.com
        # Update
        user.age = 31
        session.commit()
        # Delete
        session.delete(user)
        session.commit()
except (DatabaseQueryError, DatabaseConnectionError) as e:
    logger.error(f"Database operation failed: {e}")
    raise
else:
    logger.info("All database operations completed successfully")
Using Transactions¶
import logging
from archipy.helpers.decorators.sqlalchemy_atomic import starrocks_sqlalchemy_atomic_decorator
from archipy.models.errors import DatabaseQueryError
# Configure logging
logger = logging.getLogger(__name__)
@starrocks_sqlalchemy_atomic_decorator
def create_user_with_profile(
    username: str,
    email: str,
    age: int,
    profile_data: dict[str, str]
) -> User:
    """Create a user and profile in a single transaction.
    Args:
        username: User's username
        email: User's email address
        age: User's age
        profile_data: Profile information dictionary
    Returns:
        Created user object
    Raises:
        DatabaseQueryError: If database operation fails
    """
    try:
        user = User(username=username, email=email, age=age)
        adapter.create(user)
        profile = Profile(user_id=user.uuid, **profile_data)
        adapter.create(profile)
    except Exception as e:
        logger.error(f"Failed to create user with profile: {e}")
        raise DatabaseQueryError() from e
    else:
        logger.info(f"User and profile created: {username}")
        return user
Async Operations¶
import asyncio
import logging
from archipy.adapters.starrocks.sqlalchemy.adapters import AsyncStarrocksSQLAlchemyAdapter
from archipy.helpers.decorators.sqlalchemy_atomic import async_starrocks_sqlalchemy_atomic_decorator
from archipy.models.errors import DatabaseQueryError, DatabaseConnectionError
# Configure logging
logger = logging.getLogger(__name__)
async def main() -> None:
    """Main async function demonstrating StarRocks async operations."""
    try:
        adapter = AsyncStarrocksSQLAlchemyAdapter()
    except Exception as e:
        logger.error(f"Failed to create async adapter: {e}")
        raise DatabaseConnectionError() from e
    else:
        logger.info("Async StarRocks adapter created")
    @async_starrocks_sqlalchemy_atomic_decorator
    async def create_user_async(username: str, email: str, age: int) -> User:
        """Create a user asynchronously.
        Args:
            username: User's username
            email: User's email address
            age: User's age
        Returns:
            Created user object
        Raises:
            DatabaseQueryError: If database operation fails
        """
        try:
            user = User(username=username, email=email, age=age)
            result = await adapter.create(user)
        except Exception as e:
            logger.error(f"Failed to create user: {e}")
            raise DatabaseQueryError() from e
        else:
            logger.info(f"User created: {username}")
            return result
    try:
        user = await create_user_async("jane_doe", "jane@example.com", 28)
    except (DatabaseQueryError, DatabaseConnectionError) as e:
        logger.error(f"Async operation failed: {e}")
        raise
    else:
        logger.info(f"Created user: {user.username}")  # jane_doe
# Run async operations
asyncio.run(main())
Error Handling¶
import logging
from uuid import UUID
from archipy.models.errors import (
    AlreadyExistsError,
    NotFoundError,
    DatabaseQueryError,
    DatabaseConnectionError
)
# Configure logging
logger = logging.getLogger(__name__)
def get_user_by_id(user_id: UUID) -> User | None:
    """Get a user by their UUID.
    Args:
        user_id: User's unique identifier
    Returns:
        User object if found, None otherwise
    Raises:
        NotFoundError: If user doesn't exist
        DatabaseQueryError: If database query fails
        DatabaseConnectionError: If database connection fails
    """
    try:
        user = adapter.get_by_uuid(User, user_id)
    except (DatabaseQueryError, DatabaseConnectionError) as e:
        logger.error(f"Failed to get user: {e}")
        raise
    else:
        if not user:
            raise NotFoundError(
                resource_type="user",
                additional_data={"user_id": str(user_id)}
            )
        logger.info(f"User retrieved: {user.username}")
        return user
Advanced Queries¶
StarRocks is optimized for analytical queries. Here are some advanced patterns:
import logging
from datetime import datetime, timedelta
from sqlalchemy import func
# Configure logging
logger = logging.getLogger(__name__)
try:
    # Complex filtering for analytics
    recent_users = adapter.query(User).filter(
        User.age > 25,
        User.created_at >= datetime.now() - timedelta(days=30)
    ).all()
except DatabaseQueryError as e:
    logger.error(f"Failed to query recent users: {e}")
    raise
else:
    logger.info(f"Found {len(recent_users)} recent users")
try:
    # Aggregation for reporting
    age_stats = adapter.query(
        func.avg(User.age).label('avg_age'),
        func.max(User.age).label('max_age'),
        func.min(User.age).label('min_age')
    ).first()
except DatabaseQueryError as e:
    logger.error(f"Failed to compute age statistics: {e}")
    raise
else:
    logger.info(f"Age statistics: avg={age_stats.avg_age}, max={age_stats.max_age}, min={age_stats.min_age}")
try:
    # Joins for complex analytics
    from sqlalchemy import join
    user_profiles = adapter.query(User).join(
        Profile,
        User.uuid == Profile.user_id
    ).all()
except DatabaseQueryError as e:
    logger.error(f"Failed to join users and profiles: {e}")
    raise
else:
    logger.info(f"Retrieved {len(user_profiles)} user profiles")
Batch Operations¶
StarRocks excels at batch operations for analytical workloads:
import logging
from archipy.models.errors import DatabaseQueryError
# Configure logging
logger = logging.getLogger(__name__)
try:
    # Batch insert
    users = [
        User(
            username=f"user{i}",
            email=f"user{i}@example.com",
            age=20 + i,
            created_at=datetime.now()
        )
        for i in range(1000)
    ]
    adapter.bulk_create(users)
except DatabaseQueryError as e:
    logger.error(f"Batch insert failed: {e}")
    raise
else:
    logger.info(f"Successfully inserted {len(users)} users")
try:
    # Batch update
    rows_updated = adapter.query(User).filter(User.age < 30).update(
        {"age": User.age + 1},
        synchronize_session=False
    )
except DatabaseQueryError as e:
    logger.error(f"Batch update failed: {e}")
    raise
else:
    logger.info(f"Updated {rows_updated} users")
Configuration¶
import logging
from archipy.configs.config_template import StarrocksConfig
from archipy.adapters.starrocks.sqlalchemy.adapters import StarrocksSQLAlchemyAdapter
from archipy.models.errors import ConfigurationError, DatabaseConnectionError
# Configure logging
logger = logging.getLogger(__name__)
# Configure StarRocks connection
try:
    config = StarrocksConfig(
        HOST="localhost",
        PORT=9030,
        USER="root",
        PASSWORD="password",
        DATABASE="analytics_db"
    )
except Exception as e:
    logger.error(f"Invalid configuration: {e}")
    raise ConfigurationError() from e
else:
    logger.info("StarRocks configuration created")
# Create adapter with custom config
try:
    adapter = StarrocksSQLAlchemyAdapter(config=config)
except Exception as e:
    logger.error(f"Failed to create adapter with config: {e}")
    raise DatabaseConnectionError() from e
else:
    logger.info("StarRocks adapter initialized with custom config")
Integration with FastAPI¶
import logging
from uuid import UUID
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
from archipy.adapters.starrocks.sqlalchemy.adapters import StarrocksSQLAlchemyAdapter
from archipy.models.errors import NotFoundError, DatabaseQueryError
# Configure logging
logger = logging.getLogger(__name__)
app = FastAPI()
adapter = StarrocksSQLAlchemyAdapter()
class UserCreate(BaseModel):
    username: str
    email: EmailStr
    age: int
class UserResponse(BaseModel):
    uuid: UUID
    username: str
    email: str
    age: int
@app.post("/users", response_model=UserResponse)
async def create_user(user_data: UserCreate) -> dict[str, str | UUID | int]:
    """Create a new user for analytics."""
    try:
        user = User(
            username=user_data.username,
            email=user_data.email,
            age=user_data.age,
            created_at=datetime.now()
        )
        created_user = adapter.create(user)
    except DatabaseQueryError as e:
        logger.error(f"Failed to create user: {e}")
        raise HTTPException(status_code=500, detail="Database error") from e
    else:
        logger.info(f"User created: {user_data.username}")
        return {
            "uuid": created_user.uuid,
            "username": created_user.username,
            "email": created_user.email,
            "age": created_user.age
        }
@app.get("/analytics/age-distribution")
async def get_age_distribution() -> dict[str, list[dict[str, int]]]:
    """Get age distribution analytics."""
    try:
        from sqlalchemy import func
        distribution = adapter.query(
            User.age,
            func.count(User.uuid).label('count')
        ).group_by(User.age).order_by(User.age).all()
    except DatabaseQueryError as e:
        logger.error(f"Analytics query failed: {e}")
        raise HTTPException(status_code=500, detail="Query failed") from e
    else:
        result = [{"age": row.age, "count": row.count} for row in distribution]
        logger.info(f"Retrieved age distribution with {len(result)} groups")
        return {"distribution": result}
See Also¶
- Error Handling - Exception handling patterns with proper chaining
 - Configuration Management - StarRocks configuration setup
 - BDD Testing - Testing database operations
 - PostgreSQL Adapter - Similar patterns for PostgreSQL
 - SQLAlchemy Decorators - Transaction decorator usage
 - API Reference - Full StarRocks adapter API documentation