StarRocks Adapter¶
This example demonstrates how to use the StarRocks adapter for database operations.
Basic Usage¶
from archipy.adapters.starrocks.sqlalchemy.adapters import StarrocksSQLAlchemyAdapter
from archipy.models.entities.sqlalchemy.base_entities import BaseEntity
from sqlalchemy import Column, String, Integer, DateTime
# Define a model
class User(BaseEntity):
__tablename__ = "users"
username = Column(String(100), unique=True)
email = Column(String(255), unique=True)
age = Column(Integer)
created_at = Column(DateTime)
# Create adapter
adapter = StarrocksSQLAlchemyAdapter()
# Create tables
BaseEntity.metadata.create_all(adapter.session_manager.engine)
# Basic operations
with adapter.session() as session:
# Create
user = User(
username="john_doe",
email="john@example.com",
age=30,
created_at=datetime.now()
)
session.add(user)
session.commit()
# Read
user = session.query(User).filter_by(username="john_doe").first()
print(user.email) # john@example.com
# Update
user.age = 31
session.commit()
# Delete
session.delete(user)
session.commit()
Using Transactions¶
from archipy.helpers.decorators.sqlalchemy_atomic import starrocks_sqlalchemy_atomic_decorator
@starrocks_sqlalchemy_atomic_decorator
def create_user_with_profile(username, email, age, profile_data):
user = User(username=username, email=email, age=age)
adapter.create(user)
profile = Profile(user_id=user.test_uuid, **profile_data)
adapter.create(profile)
return user
Async Operations¶
from archipy.adapters.starrocks.sqlalchemy.adapters import AsyncStarrocksSQLAlchemyAdapter
from archipy.helpers.decorators.sqlalchemy_atomic import async_starrocks_sqlalchemy_atomic_decorator
async def main():
adapter = AsyncStarrocksSQLAlchemyAdapter()
@async_starrocks_sqlalchemy_atomic_decorator
async def create_user_async(username, email, age):
user = User(username=username, email=email, age=age)
return await adapter.create(user)
user = await create_user_async("jane_doe", "jane@example.com", 28)
print(user.username) # jane_doe
Error Handling¶
from archipy.models.errors import (
AlreadyExistsError,
NotFoundError,
InternalError
)
try:
user = adapter.get_by_id(User, user_id)
if not user:
raise NotFoundError(resource_type="user")
except AlreadyExistsError as e:
print(f"User already exists: {e.message}")
except InternalError as e:
print(f"Database error: {e.message}")
Advanced Queries¶
# Complex filtering
users = adapter.query(User).filter(
User.age > 25,
User.created_at >= datetime.now() - timedelta(days=30)
).all()
# Aggregation
from sqlalchemy import func
age_stats = adapter.query(
func.avg(User.age).label('avg_age'),
func.max(User.age).label('max_age'),
func.min(User.age).label('min_age')
).first()
# Joins
from sqlalchemy import join
user_profiles = adapter.query(User).join(
Profile,
User.test_uuid == Profile.user_id
).all()
Batch Operations¶
# Batch insert
users = [
User(username=f"user{i}", email=f"user{i}@example.com", age=20+i)
for i in range(100)
]
adapter.bulk_create(users)
# Batch update
adapter.query(User).filter(User.age < 30).update(
{"age": User.age + 1},
synchronize_session=False
)