Skip to content

Helpers

The helpers module provides utility functions, decorators, interceptors, and metaclasses to support common development tasks and patterns.

Overview

The helpers module provides utility functions and classes to simplify common development tasks.

Installation

This module is included in the base ArchiPy installation:

# Add ArchiPy to your project
poetry add archipy

For development features:

# Add ArchiPy with development extras
poetry add archipy -E dev

Source Code

📁 Location: archipy/helpers/

🔗 Browse Source

API Stability

Component Status Notes
Decorators 🟢 Stable Production-ready
Utils 🟢 Stable Production-ready
Interceptors 🟡 Beta API may change
Metaclasses 🟢 Stable Production-ready

Submodules

Utils

See Utils Documentation for full documentation.

General utility functions for common operations:

  • String manipulation
  • Date and time handling
  • Error utilities
  • File operations
  • Password utilities
  • JWT token handling
  • TOTP generation

Decorators

See Decorators Documentation for full documentation.

Function and class decorators for:

  • Method deprecation
  • Class deprecation
  • Timing operations
  • Retry logic

Interceptors

See Interceptors Documentation for full documentation.

Classes for cross-cutting concerns:

  • Logging
  • Performance monitoring
  • Request/response tracking

Overview

The helpers module offers utilities, decorators, and interceptors to enhance productivity and simplify common development tasks, such as retry logic, rate limiting, and tracing.

See Examples: Examples Helpers

Decorators

Tip: See Examples Helpers Decorators for practical examples of decorators.

Retry Decorator

The retry decorator provides a mechanism to automatically retry failed operations with configurable backoff strategies.

from archipy.helpers.decorators.retry import retry

@retry(max_attempts=3, delay=1, backoff=2)
def risky_operation():
    # Operation that might fail
    result = some_unreliable_function()
    return result

# Will retry up to 3 times with exponential backoff
result = risky_operation()

archipy.helpers.decorators.retry.retry_decorator(max_retries=3, delay=1, retry_on=None, ignore=None, resource_type=None, lang=LanguageType.FA)

A decorator that retries a function when it raises an exception.

Parameters:

Name Type Description Default
max_retries int

The maximum number of retry attempts. Defaults to 3.

3
delay float

The delay (in seconds) between retries. Defaults to 1.

1
retry_on Optional[Tuple[Type[Exception], ...]]

A tuple of errors to retry on. If None, retries on all errors. Defaults to None.

None
ignore Optional[Tuple[Type[Exception], ...]]

A tuple of errors to ignore (not retry on). If None, no errors are ignored. Defaults to None.

None
resource_type Optional[str]

The type of resource being exhausted. Defaults to None.

None
lang str

The language for the error message (default: "fa").

FA

Returns:

Name Type Description
Callable Callable[[F], F]

The decorated function with retry logic.

Example

To use this decorator, apply it to a function:

@retry_decorator(max_retries=3, delay=1, retry_on=(ValueError,), ignore=(TypeError,), resource_type="API")
def unreliable_function():
    if random.random() < 0.5:
        raise ValueError("Temporary failure")
    return "Success"

result = unreliable_function()

Output:

2023-10-10 12:00:00,000 - WARNING - Attempt 1 failed: Temporary failure
2023-10-10 12:00:01,000 - INFO - Attempt 2 succeeded.
Success

Source code in archipy/helpers/decorators/retry.py
def retry_decorator(
    max_retries: int = 3,
    delay: float = 1,
    retry_on: tuple[type[Exception], ...] | None = None,
    ignore: tuple[type[Exception], ...] | None = None,
    resource_type: str | None = None,
    lang: LanguageType = LanguageType.FA,
) -> Callable[[F], F]:
    """A decorator that retries a function when it raises an exception.

    Args:
        max_retries (int): The maximum number of retry attempts. Defaults to 3.
        delay (float): The delay (in seconds) between retries. Defaults to 1.
        retry_on (Optional[Tuple[Type[Exception], ...]]): A tuple of errors to retry on.
            If None, retries on all errors. Defaults to None.
        ignore (Optional[Tuple[Type[Exception], ...]]): A tuple of errors to ignore (not retry on).
            If None, no errors are ignored. Defaults to None.
        resource_type (Optional[str]): The type of resource being exhausted. Defaults to None.
        lang (str): The language for the error message (default: "fa").

    Returns:
        Callable: The decorated function with retry logic.

    Example:
        To use this decorator, apply it to a function:

        ```python
        @retry_decorator(max_retries=3, delay=1, retry_on=(ValueError,), ignore=(TypeError,), resource_type="API")
        def unreliable_function():
            if random.random() < 0.5:
                raise ValueError("Temporary failure")
            return "Success"

        result = unreliable_function()
        ```

        Output:
        ```
        2023-10-10 12:00:00,000 - WARNING - Attempt 1 failed: Temporary failure
        2023-10-10 12:00:01,000 - INFO - Attempt 2 succeeded.
        Success
        ```
    """

    def decorator(func: F) -> F:
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            retries = 0
            while retries < max_retries:
                try:
                    result = func(*args, **kwargs)
                    if retries > 0:
                        logging.info("Attempt %d succeeded.", retries + 1)
                except Exception as e:
                    retries += 1
                    # Check if the exception should be ignored
                    if ignore and isinstance(e, ignore):
                        raise
                    # Check if the exception should be retried
                    if retry_on and not isinstance(e, retry_on):
                        raise
                    logging.warning("Attempt %d failed: %s", retries, e)
                    if retries < max_retries:
                        time.sleep(delay)
                    continue
                return result
            raise ResourceExhaustedError(resource_type=resource_type, lang=lang)

        return cast(F, wrapper)

    return decorator

options: show_root_heading: true show_source: true

Singleton Decorator

The singleton decorator ensures that a class has only one instance throughout the application lifecycle.

from archipy.helpers.decorators.singleton import singleton

@singleton
class DatabaseConnection:
    def __init__(self):
        self.connection = create_connection()

# Both instances will be the same
db1 = DatabaseConnection()
db2 = DatabaseConnection()
assert db1 is db2

archipy.helpers.decorators.singleton.singleton_decorator(*, thread_safe=True)

A decorator to create thread-safe Singleton classes.

This decorator ensures that only one instance of a class is created. It supports an optional thread_safe parameter to control whether thread-safety mechanisms (e.g., locks) should be used.

Parameters:

Name Type Description Default
thread_safe bool

If True, enables thread-safety for instance creation. Defaults to True.

True

Returns:

Name Type Description
function Callable[[type[Any]], Callable[..., Any]]

A decorator function that can be applied to a class.

Example

To create a Singleton class, apply the singleton decorator and optionally specify whether thread-safety should be enabled:

@singleton(thread_safe=True)
class MySingletonClass:
    def __init__(self, value):
        self.value = value

# Create instances of MySingletonClass
instance1 = MySingletonClass(10)
instance2 = MySingletonClass(20)

# Verify that both instances are the same
print(instance1.value)  # Output: 10
print(instance2.value)  # Output: 10
print(instance1 is instance2)  # Output: True
Source code in archipy/helpers/decorators/singleton.py
def singleton_decorator(*, thread_safe: bool = True) -> Callable[[type[Any]], Callable[..., Any]]:
    """A decorator to create thread-safe Singleton classes.

    This decorator ensures that only one instance of a class is created. It supports an optional
    `thread_safe` parameter to control whether thread-safety mechanisms (e.g., locks) should be used.

    Args:
        thread_safe (bool, optional): If True, enables thread-safety for instance creation.
                                      Defaults to True.

    Returns:
        function: A decorator function that can be applied to a class.

    Example:
        To create a Singleton class, apply the `singleton` decorator and optionally specify
        whether thread-safety should be enabled:

        ```python
        @singleton(thread_safe=True)
        class MySingletonClass:
            def __init__(self, value):
                self.value = value

        # Create instances of MySingletonClass
        instance1 = MySingletonClass(10)
        instance2 = MySingletonClass(20)

        # Verify that both instances are the same
        print(instance1.value)  # Output: 10
        print(instance2.value)  # Output: 10
        print(instance1 is instance2)  # Output: True
        ```
    """

    def decorator(cls: type[Any]) -> Callable[..., Any]:
        """The inner decorator function that implements the Singleton pattern.

        Args:
            cls: The class to be decorated as a Singleton.

        Returns:
            function: A function that returns the Singleton instance of the class.
        """
        instances = {}  # Stores instances of Singleton classes
        lock: threading.Lock | None = (
            threading.Lock() if thread_safe else None
        )  # Lock for thread-safe instance creation

        def get_instance(*args: Any, **kwargs: Any) -> Any:
            """Create or return the Singleton instance of the class.

            If `thread_safe` is True, a lock is used to ensure that only one instance is created
            even in a multi-threaded environment. If `thread_safe` is False, no locking mechanism
            is used, which may result in multiple instances being created in a multi-threaded context.

            Args:
                *args: Positional arguments to pass to the class constructor.
                **kwargs: Keyword arguments to pass to the class constructor.

            Returns:
                object: The Singleton instance of the class.
            """
            if cls not in instances:
                if thread_safe:
                    if lock is not None:
                        with lock:
                            if cls not in instances:
                                instances[cls] = cls(*args, **kwargs)
                else:
                    instances[cls] = cls(*args, **kwargs)
            return instances[cls]

        return get_instance

    return decorator

options: show_root_heading: true show_source: true

SQLAlchemy Atomic Decorator

The SQLAlchemy atomic decorator provides transaction management for database operations.

from archipy.helpers.decorators.sqlalchemy_atomic import sqlalchemy_atomic

@sqlalchemy_atomic
def create_user(username: str, email: str):
    user = User(username=username, email=email)
    db.session.add(user)
    # If any operation fails, the entire transaction is rolled back
    db.session.commit()

SQLAlchemy atomic transaction decorators.

This module provides decorators for managing SQLAlchemy transactions with automatic commit/rollback and support for different database types (PostgreSQL, SQLite, StarRocks).

archipy.helpers.decorators.sqlalchemy_atomic.sqlalchemy_atomic_decorator(db_type, is_async=False, function=None)

Factory for creating SQLAlchemy atomic transaction decorators.

This decorator ensures that a function runs within a database transaction for the specified database type. If the function succeeds, the transaction is committed; otherwise, it is rolled back. Supports both synchronous and asynchronous functions.

Parameters:

Name Type Description Default
db_type str

The database type ("postgres", "sqlite", or "starrocks").

required
is_async bool

Whether the function is asynchronous. Defaults to False.

False
function Callable | None

The function to wrap. If None, returns a partial function.

None

Returns:

Type Description
Callable[..., Any] | partial[Callable[..., Any]]

Callable | partial: The wrapped function or a partial function for later use.

Raises:

Type Description
ValueError

If an invalid db_type is provided.

DatabaseSerializationError

If a serialization failure is detected.

DatabaseDeadlockError

If an operational error occurs due to a deadlock.

DatabaseTransactionError

If a transaction-related error occurs.

DatabaseQueryError

If a query-related error occurs.

DatabaseConnectionError

If a connection-related error occurs.

DatabaseConstraintError

If a constraint violation is detected.

DatabaseIntegrityError

If an integrity violation is detected.

DatabaseTimeoutError

If a database operation times out.

DatabaseConfigurationError

If there's an error in the database configuration.

Example

Synchronous PostgreSQL example

@sqlalchemy_atomic_decorator(db_type="postgres") def update_user(id: int, name: str) -> None: # Database operations pass

Asynchronous SQLite example

@sqlalchemy_atomic_decorator(db_type="sqlite", is_async=True) async def update_record(id: int, data: str) -> None: # Async database operations pass

Source code in archipy/helpers/decorators/sqlalchemy_atomic.py
def sqlalchemy_atomic_decorator(
    db_type: str,
    is_async: bool = False,
    function: Callable[..., Any] | None = None,
) -> Callable[..., Any] | partial[Callable[..., Any]]:
    """Factory for creating SQLAlchemy atomic transaction decorators.

    This decorator ensures that a function runs within a database transaction for the specified
    database type. If the function succeeds, the transaction is committed; otherwise, it is rolled back.
    Supports both synchronous and asynchronous functions.

    Args:
        db_type (str): The database type ("postgres", "sqlite", or "starrocks").
        is_async (bool): Whether the function is asynchronous. Defaults to False.
        function (Callable | None): The function to wrap. If None, returns a partial function.

    Returns:
        Callable | partial: The wrapped function or a partial function for later use.

    Raises:
        ValueError: If an invalid db_type is provided.
        DatabaseSerializationError: If a serialization failure is detected.
        DatabaseDeadlockError: If an operational error occurs due to a deadlock.
        DatabaseTransactionError: If a transaction-related error occurs.
        DatabaseQueryError: If a query-related error occurs.
        DatabaseConnectionError: If a connection-related error occurs.
        DatabaseConstraintError: If a constraint violation is detected.
        DatabaseIntegrityError: If an integrity violation is detected.
        DatabaseTimeoutError: If a database operation times out.
        DatabaseConfigurationError: If there's an error in the database configuration.

    Example:
        # Synchronous PostgreSQL example
        @sqlalchemy_atomic_decorator(db_type="postgres")
        def update_user(id: int, name: str) -> None:
            # Database operations
            pass

        # Asynchronous SQLite example
        @sqlalchemy_atomic_decorator(db_type="sqlite", is_async=True)
        async def update_record(id: int, data: str) -> None:
            # Async database operations
            pass
    """
    if db_type not in ATOMIC_BLOCK_CONFIGS:
        raise ValueError(f"Invalid db_type: {db_type}. Must be one of {list(ATOMIC_BLOCK_CONFIGS.keys())}")

    atomic_flag = ATOMIC_BLOCK_CONFIGS[db_type]["flag"]

    # Dynamically import the registry class
    def get_registry() -> type[SessionManagerRegistry]:
        """Get the session manager registry for the specified database type.

        Returns:
            type[SessionManagerRegistry]: The session manager registry class.

        Raises:
            DatabaseConfigurationError: If the registry cannot be loaded.
        """
        try:
            import importlib

            module_path, class_name = ATOMIC_BLOCK_CONFIGS[db_type]["registry"].rsplit(".", 1)
            module = importlib.import_module(module_path)
            return getattr(module, class_name)
        except (ImportError, AttributeError) as e:
            raise DatabaseConfigurationError(
                database=db_type,
                additional_data={"registry_path": ATOMIC_BLOCK_CONFIGS[db_type]["registry"]},
            ) from e

    def decorator(func: Callable[..., R]) -> Callable[..., R]:
        """Create a transaction-aware wrapper for the given function.

        Args:
            func (Callable[..., R]): The function to wrap with transaction management.

        Returns:
            Callable[..., R]: The wrapped function that manages transactions.
        """
        if is_async:

            @wraps(func)
            async def async_wrapper(*args: Any, **kwargs: Any) -> R:
                """Async wrapper for managing database transactions.

                Args:
                    *args: Positional arguments to pass to the wrapped function.
                    **kwargs: Keyword arguments to pass to the wrapped function.

                Returns:
                    R: The result of the wrapped function.

                Raises:
                    DatabaseSerializationError: If a serialization failure is detected.
                    DatabaseDeadlockError: If an operational error occurs due to a deadlock.
                    DatabaseTransactionError: If a transaction-related error occurs.
                    DatabaseQueryError: If a query-related error occurs.
                    DatabaseConnectionError: If a connection-related error occurs.
                    DatabaseConstraintError: If a constraint violation is detected.
                    DatabaseIntegrityError: If an integrity violation is detected.
                    DatabaseTimeoutError: If a database operation times out.
                """
                registry = get_registry()
                session_manager: AsyncSessionManagerPort = registry.get_async_manager()
                session = session_manager.get_session()
                is_nested = session.info.get(atomic_flag, False)
                if not is_nested:
                    session.info[atomic_flag] = True

                try:
                    if session.in_transaction():
                        result = await func(*args, **kwargs)
                        if not is_nested:
                            await session.commit()
                        return result
                    async with session.begin():
                        return await func(*args, **kwargs)
                except Exception as exception:
                    await session.rollback()
                    _handle_db_exception(exception, db_type, func.__name__)
                finally:
                    if not session.in_transaction():
                        await session.close()
                        await session_manager.remove_session()

            return async_wrapper
        else:

            @wraps(func)
            def sync_wrapper(*args: Any, **kwargs: Any) -> R:
                """Synchronous wrapper for managing database transactions.

                Args:
                    *args: Positional arguments to pass to the wrapped function.
                    **kwargs: Keyword arguments to pass to the wrapped function.

                Returns:
                    R: The result of the wrapped function.

                Raises:
                    DatabaseSerializationError: If a serialization failure is detected.
                    DatabaseDeadlockError: If an operational error occurs due to a deadlock.
                    DatabaseTransactionError: If a transaction-related error occurs.
                    DatabaseQueryError: If a query-related error occurs.
                    DatabaseConnectionError: If a connection-related error occurs.
                    DatabaseConstraintError: If a constraint violation is detected.
                    DatabaseIntegrityError: If an integrity violation is detected.
                    DatabaseTimeoutError: If a database operation times out.
                """
                registry = get_registry()
                session_manager: SessionManagerPort = registry.get_sync_manager()
                session = session_manager.get_session()
                is_nested = session.info.get(atomic_flag, False)
                if not is_nested:
                    session.info[atomic_flag] = True

                try:
                    if session.in_transaction():
                        result = func(*args, **kwargs)
                        if not is_nested:
                            session.commit()
                        return result
                    with session.begin():
                        return func(*args, **kwargs)
                except Exception as exception:
                    session.rollback()
                    _handle_db_exception(exception, db_type, func.__name__)
                finally:
                    if not session.in_transaction():
                        session.close()
                        session_manager.remove_session()

            return sync_wrapper

    return decorator(function) if function else partial(sqlalchemy_atomic_decorator, db_type=db_type, is_async=is_async)

archipy.helpers.decorators.sqlalchemy_atomic.postgres_sqlalchemy_atomic_decorator(function=None)

Decorator for PostgreSQL atomic transactions.

Parameters:

Name Type Description Default
function Callable | None

The function to wrap. If None, returns a partial function.

None

Returns:

Type Description
Callable[..., Any] | partial

Callable | partial: The wrapped function or a partial function for later use.

Source code in archipy/helpers/decorators/sqlalchemy_atomic.py
def postgres_sqlalchemy_atomic_decorator(function: Callable[..., Any] | None = None) -> Callable[..., Any] | partial:
    """Decorator for PostgreSQL atomic transactions.

    Args:
        function (Callable | None): The function to wrap. If None, returns a partial function.

    Returns:
        Callable | partial: The wrapped function or a partial function for later use.
    """
    return sqlalchemy_atomic_decorator(db_type="postgres", function=function)

archipy.helpers.decorators.sqlalchemy_atomic.async_postgres_sqlalchemy_atomic_decorator(function=None)

Decorator for asynchronous PostgreSQL atomic transactions.

Parameters:

Name Type Description Default
function Callable | None

The function to wrap. If None, returns a partial function.

None

Returns:

Type Description
Callable[..., Any] | partial

Callable | partial: The wrapped function or a partial function for later use.

Source code in archipy/helpers/decorators/sqlalchemy_atomic.py
def async_postgres_sqlalchemy_atomic_decorator(
    function: Callable[..., Any] | None = None,
) -> Callable[..., Any] | partial:
    """Decorator for asynchronous PostgreSQL atomic transactions.

    Args:
        function (Callable | None): The function to wrap. If None, returns a partial function.

    Returns:
        Callable | partial: The wrapped function or a partial function for later use.
    """
    return sqlalchemy_atomic_decorator(db_type="postgres", is_async=True, function=function)

archipy.helpers.decorators.sqlalchemy_atomic.sqlite_sqlalchemy_atomic_decorator(function=None)

Decorator for SQLite atomic transactions.

Parameters:

Name Type Description Default
function Callable | None

The function to wrap. If None, returns a partial function.

None

Returns:

Type Description
Callable[..., Any] | partial

Callable | partial: The wrapped function or a partial function for later use.

Source code in archipy/helpers/decorators/sqlalchemy_atomic.py
def sqlite_sqlalchemy_atomic_decorator(function: Callable[..., Any] | None = None) -> Callable[..., Any] | partial:
    """Decorator for SQLite atomic transactions.

    Args:
        function (Callable | None): The function to wrap. If None, returns a partial function.

    Returns:
        Callable | partial: The wrapped function or a partial function for later use.
    """
    return sqlalchemy_atomic_decorator(db_type="sqlite", function=function)

archipy.helpers.decorators.sqlalchemy_atomic.async_sqlite_sqlalchemy_atomic_decorator(function=None)

Decorator for asynchronous SQLite atomic transactions.

Parameters:

Name Type Description Default
function Callable | None

The function to wrap. If None, returns a partial function.

None

Returns:

Type Description
Callable[..., Any] | partial

Callable | partial: The wrapped function or a partial function for later use.

Source code in archipy/helpers/decorators/sqlalchemy_atomic.py
def async_sqlite_sqlalchemy_atomic_decorator(
    function: Callable[..., Any] | None = None,
) -> Callable[..., Any] | partial:
    """Decorator for asynchronous SQLite atomic transactions.

    Args:
        function (Callable | None): The function to wrap. If None, returns a partial function.

    Returns:
        Callable | partial: The wrapped function or a partial function for later use.
    """
    return sqlalchemy_atomic_decorator(db_type="sqlite", is_async=True, function=function)

archipy.helpers.decorators.sqlalchemy_atomic.starrocks_sqlalchemy_atomic_decorator(function=None)

Decorator for StarRocks atomic transactions.

Parameters:

Name Type Description Default
function Callable | None

The function to wrap. If None, returns a partial function.

None

Returns:

Type Description
Callable[..., Any] | partial

Callable | partial: The wrapped function or a partial function for later use.

Source code in archipy/helpers/decorators/sqlalchemy_atomic.py
def starrocks_sqlalchemy_atomic_decorator(
    function: Callable[..., Any] | None = None,
) -> Callable[..., Any] | partial:
    """Decorator for StarRocks atomic transactions.

    Args:
        function (Callable | None): The function to wrap. If None, returns a partial function.

    Returns:
        Callable | partial: The wrapped function or a partial function for later use.
    """
    return sqlalchemy_atomic_decorator(db_type="starrocks", function=function)

archipy.helpers.decorators.sqlalchemy_atomic.async_starrocks_sqlalchemy_atomic_decorator(function=None)

Decorator for asynchronous StarRocks atomic transactions.

Parameters:

Name Type Description Default
function Callable | None

The function to wrap. If None, returns a partial function.

None

Returns:

Type Description
Callable[..., Any] | partial

Callable | partial: The wrapped function or a partial function for later use.

Source code in archipy/helpers/decorators/sqlalchemy_atomic.py
def async_starrocks_sqlalchemy_atomic_decorator(
    function: Callable[..., Any] | None = None,
) -> Callable[..., Any] | partial:
    """Decorator for asynchronous StarRocks atomic transactions.

    Args:
        function (Callable | None): The function to wrap. If None, returns a partial function.

    Returns:
        Callable | partial: The wrapped function or a partial function for later use.
    """
    return sqlalchemy_atomic_decorator(db_type="starrocks", is_async=True, function=function)

options: show_root_heading: true show_source: true

Interceptors

FastAPI Interceptors

FastAPI Rest Rate Limit Handler

Provides rate limiting functionality for FastAPI endpoints.

from archipy.helpers.interceptors.fastapi.rate_limit import FastAPIRestRateLimitHandler
from fastapi import FastAPI

app = FastAPI()
rate_limit_handler = FastAPIRestRateLimitHandler(
    redis_client=redis_client,
    rate_limit=100,  # requests per minute
    rate_limit_period=60
)

@app.get("/api/data")
@rate_limit_handler
async def get_data():
    return {"data": "protected by rate limit"}

archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler

A rate-limiting handler for FastAPI REST endpoints using Redis for tracking.

This class provides rate-limiting functionality by tracking the number of requests made to a specific endpoint within a defined time window. If the request limit is exceeded, it raises an HTTP 429 Too Many Requests error.

Parameters:

Name Type Description Default
calls_count StrictInt

The maximum number of allowed requests within the time window.

1
milliseconds StrictInt

The time window in milliseconds.

0
seconds StrictInt

The time window in seconds.

0
minutes StrictInt

The time window in minutes.

0
hours StrictInt

The time window in hours.

0
days StrictInt

The time window in days.

0
query_params set(StrictStr

request query parameters for rate-limiting based on query params.

None
Source code in archipy/helpers/interceptors/fastapi/rate_limit/fastapi_rest_rate_limit_handler.py
class FastAPIRestRateLimitHandler:
    """A rate-limiting handler for FastAPI REST endpoints using Redis for tracking.

    This class provides rate-limiting functionality by tracking the number of requests
    made to a specific endpoint within a defined time window. If the request limit is
    exceeded, it raises an HTTP 429 Too Many Requests error.

    Args:
        calls_count (StrictInt): The maximum number of allowed requests within the time window.
        milliseconds (StrictInt): The time window in milliseconds.
        seconds (StrictInt): The time window in seconds.
        minutes (StrictInt): The time window in minutes.
        hours (StrictInt): The time window in hours.
        days (StrictInt): The time window in days.
        query_params (set(StrictStr)): request query parameters for rate-limiting based on query params.
    """

    def __init__(
        self,
        calls_count: StrictInt = 1,
        milliseconds: StrictInt = 0,
        seconds: StrictInt = 0,
        minutes: StrictInt = 0,
        hours: StrictInt = 0,
        days: StrictInt = 0,
        query_params: set[StrictStr] | None = None,
    ) -> None:
        """Initialize the rate limit handler with specified time window and request limits.

        The time window is calculated by combining all time unit parameters into milliseconds.
        At least one time unit parameter should be greater than 0 to create a valid window.

        Args:
            calls_count (StrictInt, optional): Maximum number of allowed requests within the time window.
                Defaults to 1.
            milliseconds (StrictInt, optional): Number of milliseconds in the time window.
                Defaults to 0.
            seconds (StrictInt, optional): Number of seconds in the time window.
                Defaults to 0.
            minutes (StrictInt, optional): Number of minutes in the time window.
                Defaults to 0.
            hours (StrictInt, optional): Number of hours in the time window.
                Defaults to 0.
            days (StrictInt, optional): Number of days in the time window.
                Defaults to 0.
            query_params (set[StrictStr] | None, optional): Set of query parameter names to include
                in rate limit key generation. If None, no query parameters will be used.
                Defaults to None.

        Example:
            >>> # Allow 100 requests per minute
            >>> handler = FastAPIRestRateLimitHandler(calls_count=100, minutes=1)
            >>>
            >>> # Allow 1000 requests per day with specific query params
            >>> handler = FastAPIRestRateLimitHandler(
            ...     calls_count=1000,
            ...     days=1,
            ...     query_params={'user_id', 'action'}
            ... )
        """
        self.query_params = query_params or set()
        self.calls_count = calls_count
        # Calculate total time in milliseconds directly for better readability
        self.milliseconds = (
            milliseconds + 1000 * seconds + 60 * 1000 * minutes + 60 * 60 * 1000 * hours + 24 * 60 * 60 * 1000 * days
        )
        self.redis_client = AsyncRedisAdapter()

    async def _check(self, key: str) -> RedisResponseType:
        """Checks if the request count for the given key exceeds the allowed limit.

        Args:
            key (str): The Redis key used to track the request count.

        Returns:
            int: The remaining time-to-live (TTL) in milliseconds if the limit is exceeded, otherwise 0.
        """
        # Use await for getting value from Redis as it's asynchronous
        current_request = await self.redis_client.get(key)
        if current_request is None:
            await self.redis_client.set(key, 1, px=self.milliseconds)
            return 0

        current_request = int(current_request)  # type:ignore[arg-type]
        if current_request < self.calls_count:
            await self.redis_client.incrby(key)
            return 0

        ttl = await self.redis_client.pttl(key)
        if ttl == -1:
            await self.redis_client.delete(key)
        return ttl

    async def __call__(self, request: Request) -> None:
        """Handles the rate-limiting logic for incoming requests.

        Args:
            request (Request): The incoming FastAPI request.

        Raises:
            HTTPException: If the rate limit is exceeded, an HTTP 429 Too Many Requests error is raised.
        """
        rate_key = await self._get_identifier(request)
        key = f"RateLimitHandler:{rate_key}:{request.scope['path']}:{request.method}"
        pexpire = await self._check(key)  # Awaiting the function since it is an async call
        if pexpire != 0:
            await self._create_callback(pexpire)  # type:ignore[arg-type]

    @staticmethod
    async def _create_callback(pexpire: int) -> None:
        """Raises an HTTP 429 Too Many Requests error with the appropriate headers.

        Args:
            pexpire (int): The remaining time-to-live (TTL) in milliseconds before the rate limit resets.

        Raises:
            HTTPException: An HTTP 429 Too Many Requests error with the `Retry-After` header.
        """
        expire = ceil(pexpire / 1000)
        raise HTTPException(
            status_code=HTTP_429_TOO_MANY_REQUESTS,
            detail="Too Many Requests",
            headers={"Retry-After": str(expire)},
        )

    async def _get_identifier(self, request: Request) -> str:
        """Extracts a unique identifier from the request, typically an IP address and endpoint.

        Args:
            request (Request): The FastAPI request object containing headers and client info.

        Returns:
            str: A Redis key generated using the extracted identifier and request information.

        Note:
            - Validates IP addresses for proper formatting
            - Handles forwarded IPs with comma-separated values
            - Filters out private, loopback, link-local, and multicast IPs in X-Forwarded-For
            - Falls back to client.host if no valid IP is found
        """
        base_identifier = await self._extract_client_ip(request)
        return self._generate_redis_key(request=request, base_identifier=base_identifier)

    async def _extract_client_ip(self, request: Request) -> str:
        """Extracts and validates client IP from request headers.

        Args:
            request (Request): The FastAPI request object.

        Returns:
            str: Validated IP address or client host.
        """
        try:
            # Check X-Real-IP header first
            if real_ip := self._validate_ip_from_header(request.headers.get("X-Real-IP")):
                return real_ip

            # Then check X-Forwarded-For header
            if forwarded_for := self._validate_forwarded_for_header(request.headers.get("X-Forwarded-For")):
                return forwarded_for
            # Fallback to client host
        except ValueError:
            return request.client.host  # type:ignore[union-attr]
        else:
            return request.client.host  # type:ignore[union-attr]

    def _validate_ip_from_header(self, header_value: str | None) -> str | None:
        """Validates IP address from header value.

        Args:
            header_value (Optional[str]): IP address from header.

        Returns:
            Optional[str]: First valid IP address or None.
        """
        if not header_value:
            return None
        try:
            ip_str = header_value.split(",")[0].strip()
            ip = ip_address(ip_str)  # Validate IP format
            if not (ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast):
                return ip_str
        except ValueError:
            return None
        else:
            return None

    def _validate_forwarded_for_header(self, forwarded_for: str | None) -> str | None:
        """Validates IP from X-Forwarded-For header.

        Args:
            forwarded_for (Optional[str]): X-Forwarded-For header value.

        Returns:
            Optional[str]: Valid non-private IP, the header or None.
        """
        if not forwarded_for:
            return None

        try:
            ip_str = forwarded_for.split(",")[0].strip()
            ip = ip_address(ip_str)

            if not (ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast):
                return ip_str

        except ValueError:
            pass
        return forwarded_for

    def _generate_redis_key(self, request: Request, base_identifier: str) -> str:
        """Generates a Redis key for rate limiting based on the request and base identifier.

        Args:
            request (Request): The FastAPI request object containing path and query parameters.
            base_identifier (str): The base identifier (typically an IP address) for the request.

        Returns:
            str: A Redis key string with optional query parameters.
        """
        path_key = f"{base_identifier}:{request.scope['path']}"

        if not self.query_params:
            return path_key

        return self._append_query_params(path_key, request.query_params)

    def _append_query_params(self, base_key: str, query_params: QueryParams) -> str:
        """Appends sorted query parameters to the Redis key.

        Args:
            base_key (str): Base Redis key without query parameters.
            query_params (dict[str, str]): Request query parameters.

        Returns:
            str: Redis key with appended query parameters.
        """
        filtered_params = {k: v for k, v in query_params.items() if k in self.query_params and v is not None}

        if not filtered_params:
            return base_key

        sorted_params = sorted(filtered_params.items())
        query_string = "&".join(f"{k}={v}" for k, v in sorted_params)
        return f"{base_key}?{query_string}"

archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler.__init__(calls_count=1, milliseconds=0, seconds=0, minutes=0, hours=0, days=0, query_params=None)

Initialize the rate limit handler with specified time window and request limits.

The time window is calculated by combining all time unit parameters into milliseconds. At least one time unit parameter should be greater than 0 to create a valid window.

Parameters:

Name Type Description Default
calls_count StrictInt

Maximum number of allowed requests within the time window. Defaults to 1.

1
milliseconds StrictInt

Number of milliseconds in the time window. Defaults to 0.

0
seconds StrictInt

Number of seconds in the time window. Defaults to 0.

0
minutes StrictInt

Number of minutes in the time window. Defaults to 0.

0
hours StrictInt

Number of hours in the time window. Defaults to 0.

0
days StrictInt

Number of days in the time window. Defaults to 0.

0
query_params set[StrictStr] | None

Set of query parameter names to include in rate limit key generation. If None, no query parameters will be used. Defaults to None.

None
Example

Allow 100 requests per minute

handler = FastAPIRestRateLimitHandler(calls_count=100, minutes=1)

Allow 1000 requests per day with specific query params

handler = FastAPIRestRateLimitHandler( ... calls_count=1000, ... days=1, ... query_params={'user_id', 'action'} ... )

Source code in archipy/helpers/interceptors/fastapi/rate_limit/fastapi_rest_rate_limit_handler.py
def __init__(
    self,
    calls_count: StrictInt = 1,
    milliseconds: StrictInt = 0,
    seconds: StrictInt = 0,
    minutes: StrictInt = 0,
    hours: StrictInt = 0,
    days: StrictInt = 0,
    query_params: set[StrictStr] | None = None,
) -> None:
    """Initialize the rate limit handler with specified time window and request limits.

    The time window is calculated by combining all time unit parameters into milliseconds.
    At least one time unit parameter should be greater than 0 to create a valid window.

    Args:
        calls_count (StrictInt, optional): Maximum number of allowed requests within the time window.
            Defaults to 1.
        milliseconds (StrictInt, optional): Number of milliseconds in the time window.
            Defaults to 0.
        seconds (StrictInt, optional): Number of seconds in the time window.
            Defaults to 0.
        minutes (StrictInt, optional): Number of minutes in the time window.
            Defaults to 0.
        hours (StrictInt, optional): Number of hours in the time window.
            Defaults to 0.
        days (StrictInt, optional): Number of days in the time window.
            Defaults to 0.
        query_params (set[StrictStr] | None, optional): Set of query parameter names to include
            in rate limit key generation. If None, no query parameters will be used.
            Defaults to None.

    Example:
        >>> # Allow 100 requests per minute
        >>> handler = FastAPIRestRateLimitHandler(calls_count=100, minutes=1)
        >>>
        >>> # Allow 1000 requests per day with specific query params
        >>> handler = FastAPIRestRateLimitHandler(
        ...     calls_count=1000,
        ...     days=1,
        ...     query_params={'user_id', 'action'}
        ... )
    """
    self.query_params = query_params or set()
    self.calls_count = calls_count
    # Calculate total time in milliseconds directly for better readability
    self.milliseconds = (
        milliseconds + 1000 * seconds + 60 * 1000 * minutes + 60 * 60 * 1000 * hours + 24 * 60 * 60 * 1000 * days
    )
    self.redis_client = AsyncRedisAdapter()

archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler.__call__(request) async

Handles the rate-limiting logic for incoming requests.

Parameters:

Name Type Description Default
request Request

The incoming FastAPI request.

required

Raises:

Type Description
HTTPException

If the rate limit is exceeded, an HTTP 429 Too Many Requests error is raised.

Source code in archipy/helpers/interceptors/fastapi/rate_limit/fastapi_rest_rate_limit_handler.py
async def __call__(self, request: Request) -> None:
    """Handles the rate-limiting logic for incoming requests.

    Args:
        request (Request): The incoming FastAPI request.

    Raises:
        HTTPException: If the rate limit is exceeded, an HTTP 429 Too Many Requests error is raised.
    """
    rate_key = await self._get_identifier(request)
    key = f"RateLimitHandler:{rate_key}:{request.scope['path']}:{request.method}"
    pexpire = await self._check(key)  # Awaiting the function since it is an async call
    if pexpire != 0:
        await self._create_callback(pexpire)  # type:ignore[arg-type]

options: show_root_heading: true show_source: true

gRPC Interceptors

gRPC interceptors for tracing and monitoring:

archipy.helpers.interceptors.grpc.trace.client_interceptor.GrpcClientTraceInterceptor

Bases: BaseGrpcClientInterceptor

A gRPC client interceptor for tracing requests using Elastic APM and Sentry APM.

This interceptor injects the Elastic APM trace parent header into gRPC client requests to enable distributed tracing across services. It also creates Sentry transactions to monitor the performance of gRPC calls.

Source code in archipy/helpers/interceptors/grpc/trace/client_interceptor.py
class GrpcClientTraceInterceptor(BaseGrpcClientInterceptor):
    """A gRPC client interceptor for tracing requests using Elastic APM and Sentry APM.

    This interceptor injects the Elastic APM trace parent header into gRPC client requests
    to enable distributed tracing across services. It also creates Sentry transactions
    to monitor the performance of gRPC calls.
    """

    def intercept(self, method: Callable, request_or_iterator: Any, call_details: grpc.ClientCallDetails):
        """Intercepts a gRPC client call to inject the Elastic APM trace parent header and monitor performance with Sentry.

        Args:
            method (Callable): The gRPC method being intercepted.
            request_or_iterator (Any): The request or request iterator.
            call_details (grpc.ClientCallDetails): Details of the gRPC call.

        Returns:
            Any: The result of the intercepted gRPC method.

        Notes:
            - If Elastic APM is disabled, the interceptor does nothing and passes the call through.
            - If no trace parent header is available, the interceptor does nothing and passes the call through.
        """
        # Skip tracing if Elastic APM is disabled
        if not BaseConfig.global_config().ELASTIC_APM.IS_ENABLED:
            return method(request_or_iterator, call_details)

        # Skip tracing if no trace parent header is available
        if not (trace_parent_id := elasticapm.get_trace_parent_header()):
            return method(request_or_iterator, call_details)

        # Inject the trace parent header into the call details
        new_details = ClientCallDetails(
            method=call_details.method,
            timeout=call_details.timeout,
            metadata=[(TRACEPARENT_HEADER_NAME, f"{trace_parent_id}")],
            credentials=call_details.credentials,
            wait_for_ready=call_details.wait_for_ready,
            compression=call_details.compression,
        )

        # Execute the gRPC method with the updated call details
        return method(request_or_iterator, new_details)

archipy.helpers.interceptors.grpc.trace.client_interceptor.GrpcClientTraceInterceptor.intercept(method, request_or_iterator, call_details)

Intercepts a gRPC client call to inject the Elastic APM trace parent header and monitor performance with Sentry.

Parameters:

Name Type Description Default
method Callable

The gRPC method being intercepted.

required
request_or_iterator Any

The request or request iterator.

required
call_details ClientCallDetails

Details of the gRPC call.

required

Returns:

Name Type Description
Any

The result of the intercepted gRPC method.

Notes
  • If Elastic APM is disabled, the interceptor does nothing and passes the call through.
  • If no trace parent header is available, the interceptor does nothing and passes the call through.
Source code in archipy/helpers/interceptors/grpc/trace/client_interceptor.py
def intercept(self, method: Callable, request_or_iterator: Any, call_details: grpc.ClientCallDetails):
    """Intercepts a gRPC client call to inject the Elastic APM trace parent header and monitor performance with Sentry.

    Args:
        method (Callable): The gRPC method being intercepted.
        request_or_iterator (Any): The request or request iterator.
        call_details (grpc.ClientCallDetails): Details of the gRPC call.

    Returns:
        Any: The result of the intercepted gRPC method.

    Notes:
        - If Elastic APM is disabled, the interceptor does nothing and passes the call through.
        - If no trace parent header is available, the interceptor does nothing and passes the call through.
    """
    # Skip tracing if Elastic APM is disabled
    if not BaseConfig.global_config().ELASTIC_APM.IS_ENABLED:
        return method(request_or_iterator, call_details)

    # Skip tracing if no trace parent header is available
    if not (trace_parent_id := elasticapm.get_trace_parent_header()):
        return method(request_or_iterator, call_details)

    # Inject the trace parent header into the call details
    new_details = ClientCallDetails(
        method=call_details.method,
        timeout=call_details.timeout,
        metadata=[(TRACEPARENT_HEADER_NAME, f"{trace_parent_id}")],
        credentials=call_details.credentials,
        wait_for_ready=call_details.wait_for_ready,
        compression=call_details.compression,
    )

    # Execute the gRPC method with the updated call details
    return method(request_or_iterator, new_details)

archipy.helpers.interceptors.grpc.trace.client_interceptor.AsyncGrpcClientTraceInterceptor

Bases: BaseAsyncGrpcClientInterceptor

An asynchronous gRPC client interceptor for tracing requests using Elastic APM.

This interceptor injects the Elastic APM trace parent header into asynchronous gRPC client requests to enable distributed tracing across services.

Source code in archipy/helpers/interceptors/grpc/trace/client_interceptor.py
class AsyncGrpcClientTraceInterceptor(BaseAsyncGrpcClientInterceptor):
    """An asynchronous gRPC client interceptor for tracing requests using Elastic APM.

    This interceptor injects the Elastic APM trace parent header into asynchronous gRPC client requests
    to enable distributed tracing across services.
    """

    async def intercept(self, method: Callable, request_or_iterator: Any, call_details: grpc.aio.ClientCallDetails):
        """Intercepts an asynchronous gRPC client call to inject the Elastic APM trace parent header.

        Args:
            method (Callable): The asynchronous gRPC method being intercepted.
            request_or_iterator (Any): The request or request iterator.
            call_details (grpc.aio.ClientCallDetails): Details of the gRPC call.

        Returns:
            Any: The result of the intercepted gRPC method.

        Notes:
            - If Elastic APM is disabled, the interceptor does nothing and passes the call through.
            - If no trace parent header is available, the interceptor does nothing and passes the call through.
        """
        # Skip tracing if Elastic APM is disabled
        if not BaseConfig.global_config().ELASTIC_APM.IS_ENABLED:
            return await method(request_or_iterator, call_details)

        # Skip tracing if no trace parent header is available
        if not (trace_parent_id := elasticapm.get_trace_parent_header()):
            return await method(request_or_iterator, call_details)

        # Inject the trace parent header into the call details
        new_details = AsyncClientCallDetails(
            method=call_details.method,
            timeout=call_details.timeout,
            metadata=[(TRACEPARENT_HEADER_NAME, f"{trace_parent_id}")],
            credentials=call_details.credentials,
            wait_for_ready=call_details.wait_for_ready,
        )

        # Execute the gRPC method with the updated call details
        return await method(request_or_iterator, new_details)

archipy.helpers.interceptors.grpc.trace.client_interceptor.AsyncGrpcClientTraceInterceptor.intercept(method, request_or_iterator, call_details) async

Intercepts an asynchronous gRPC client call to inject the Elastic APM trace parent header.

Parameters:

Name Type Description Default
method Callable

The asynchronous gRPC method being intercepted.

required
request_or_iterator Any

The request or request iterator.

required
call_details ClientCallDetails

Details of the gRPC call.

required

Returns:

Name Type Description
Any

The result of the intercepted gRPC method.

Notes
  • If Elastic APM is disabled, the interceptor does nothing and passes the call through.
  • If no trace parent header is available, the interceptor does nothing and passes the call through.
Source code in archipy/helpers/interceptors/grpc/trace/client_interceptor.py
async def intercept(self, method: Callable, request_or_iterator: Any, call_details: grpc.aio.ClientCallDetails):
    """Intercepts an asynchronous gRPC client call to inject the Elastic APM trace parent header.

    Args:
        method (Callable): The asynchronous gRPC method being intercepted.
        request_or_iterator (Any): The request or request iterator.
        call_details (grpc.aio.ClientCallDetails): Details of the gRPC call.

    Returns:
        Any: The result of the intercepted gRPC method.

    Notes:
        - If Elastic APM is disabled, the interceptor does nothing and passes the call through.
        - If no trace parent header is available, the interceptor does nothing and passes the call through.
    """
    # Skip tracing if Elastic APM is disabled
    if not BaseConfig.global_config().ELASTIC_APM.IS_ENABLED:
        return await method(request_or_iterator, call_details)

    # Skip tracing if no trace parent header is available
    if not (trace_parent_id := elasticapm.get_trace_parent_header()):
        return await method(request_or_iterator, call_details)

    # Inject the trace parent header into the call details
    new_details = AsyncClientCallDetails(
        method=call_details.method,
        timeout=call_details.timeout,
        metadata=[(TRACEPARENT_HEADER_NAME, f"{trace_parent_id}")],
        credentials=call_details.credentials,
        wait_for_ready=call_details.wait_for_ready,
    )

    # Execute the gRPC method with the updated call details
    return await method(request_or_iterator, new_details)

options: show_root_heading: true show_source: true

archipy.helpers.interceptors.grpc.trace.server_interceptor.GrpcServerTraceInterceptor

Bases: BaseGrpcServerInterceptor

A gRPC server interceptor for tracing requests using Elastic APM.

This interceptor captures and traces gRPC server requests, enabling distributed tracing across services. It integrates with Elastic APM to monitor and log transactions.

Source code in archipy/helpers/interceptors/grpc/trace/server_interceptor.py
class GrpcServerTraceInterceptor(BaseGrpcServerInterceptor):
    """A gRPC server interceptor for tracing requests using Elastic APM.

    This interceptor captures and traces gRPC server requests, enabling distributed tracing
    across services. It integrates with Elastic APM to monitor and log transactions.
    """

    def intercept(self, method: Callable, request: Any, context: grpc.ServicerContext):
        """Intercepts a gRPC server call to trace the request using Elastic APM.

        Args:
            method (Callable): The gRPC method being intercepted.
            request (Any): The request object passed to the method.
            context (grpc.ServicerContext): The context of the gRPC call.

        Returns:
            Any: The result of the intercepted gRPC method.

        Raises:
            Exception: If an exception occurs during the method execution, it is captured and logged.

        Notes:
            - If Elastic APM is disabled, the interceptor does nothing and passes the call through.
            - If a trace parent header is present in the metadata, it is used to link the transaction
              to the distributed trace.
            - If no trace parent header is present, a new transaction is started.
        """
        try:
            # Skip tracing if Elastic APM is disabled
            config = BaseConfig.global_config()
            if not config.ELASTIC_APM.IS_ENABLED:
                return method(request, context)

            # Get the Elastic APM client
            client = elasticapm.Client(config.ELASTIC_APM.model_dump())

            # Extract method name details from the context
            method_name_model = context.method_name_model

            # Convert metadata to a dictionary for easier access
            metadata_dict = dict(context.invocation_metadata())

            # Check if a trace parent header is present in the metadata
            if parent := elasticapm.trace_parent_from_headers(metadata_dict):
                # Start a transaction linked to the distributed trace
                client.begin_transaction(transaction_type="request", trace_parent=parent)
                try:
                    # Execute the gRPC method
                    result = method(request, context)

                    # End the transaction with a success status
                    client.end_transaction(name=method_name_model.full_name, result="success")
                    return result
                except Exception as e:
                    # End the transaction with a failure status if an exception occurs
                    client.end_transaction(name=method_name_model.full_name, result="failure")
                    raise e
            else:
                # Start a new transaction if no trace parent header is present
                client.begin_transaction(transaction_type="request")
                try:
                    # Execute the gRPC method
                    result = method(request, context)

                    # End the transaction with a success status
                    client.end_transaction(name=method_name_model.full_name, result="success")
                    return result
                except Exception as e:
                    # End the transaction with a failure status if an exception occurs
                    client.end_transaction(name=method_name_model.full_name, result="failure")
                    raise e

        except Exception as exception:
            BaseUtils.capture_exception(exception)

archipy.helpers.interceptors.grpc.trace.server_interceptor.GrpcServerTraceInterceptor.intercept(method, request, context)

Intercepts a gRPC server call to trace the request using Elastic APM.

Parameters:

Name Type Description Default
method Callable

The gRPC method being intercepted.

required
request Any

The request object passed to the method.

required
context ServicerContext

The context of the gRPC call.

required

Returns:

Name Type Description
Any

The result of the intercepted gRPC method.

Raises:

Type Description
Exception

If an exception occurs during the method execution, it is captured and logged.

Notes
  • If Elastic APM is disabled, the interceptor does nothing and passes the call through.
  • If a trace parent header is present in the metadata, it is used to link the transaction to the distributed trace.
  • If no trace parent header is present, a new transaction is started.
Source code in archipy/helpers/interceptors/grpc/trace/server_interceptor.py
def intercept(self, method: Callable, request: Any, context: grpc.ServicerContext):
    """Intercepts a gRPC server call to trace the request using Elastic APM.

    Args:
        method (Callable): The gRPC method being intercepted.
        request (Any): The request object passed to the method.
        context (grpc.ServicerContext): The context of the gRPC call.

    Returns:
        Any: The result of the intercepted gRPC method.

    Raises:
        Exception: If an exception occurs during the method execution, it is captured and logged.

    Notes:
        - If Elastic APM is disabled, the interceptor does nothing and passes the call through.
        - If a trace parent header is present in the metadata, it is used to link the transaction
          to the distributed trace.
        - If no trace parent header is present, a new transaction is started.
    """
    try:
        # Skip tracing if Elastic APM is disabled
        config = BaseConfig.global_config()
        if not config.ELASTIC_APM.IS_ENABLED:
            return method(request, context)

        # Get the Elastic APM client
        client = elasticapm.Client(config.ELASTIC_APM.model_dump())

        # Extract method name details from the context
        method_name_model = context.method_name_model

        # Convert metadata to a dictionary for easier access
        metadata_dict = dict(context.invocation_metadata())

        # Check if a trace parent header is present in the metadata
        if parent := elasticapm.trace_parent_from_headers(metadata_dict):
            # Start a transaction linked to the distributed trace
            client.begin_transaction(transaction_type="request", trace_parent=parent)
            try:
                # Execute the gRPC method
                result = method(request, context)

                # End the transaction with a success status
                client.end_transaction(name=method_name_model.full_name, result="success")
                return result
            except Exception as e:
                # End the transaction with a failure status if an exception occurs
                client.end_transaction(name=method_name_model.full_name, result="failure")
                raise e
        else:
            # Start a new transaction if no trace parent header is present
            client.begin_transaction(transaction_type="request")
            try:
                # Execute the gRPC method
                result = method(request, context)

                # End the transaction with a success status
                client.end_transaction(name=method_name_model.full_name, result="success")
                return result
            except Exception as e:
                # End the transaction with a failure status if an exception occurs
                client.end_transaction(name=method_name_model.full_name, result="failure")
                raise e

    except Exception as exception:
        BaseUtils.capture_exception(exception)

options: show_root_heading: true show_source: true

Metaclasses

Singleton Metaclass

A metaclass implementation of the singleton pattern.

from archipy.helpers.metaclasses.singleton import Singleton

class DatabaseConnection(metaclass=Singleton):
    def __init__(self):
        self.connection = create_connection()

# Both instances will be the same
db1 = DatabaseConnection()
db2 = DatabaseConnection()
assert db1 is db2

archipy.helpers.metaclasses.singleton.Singleton

Bases: type

A thread-safe Singleton metaclass that ensures only one instance of a class is created.

This metaclass can be used to create Singleton classes. It supports an optional thread_safe parameter to control whether thread-safety mechanisms (e.g., locks) should be used.

Attributes:

Name Type Description
_instances dict

A dictionary to store instances of Singleton classes.

_lock Lock

A lock to ensure thread-safe instance creation.

Example

To create a Singleton class, use the Singleton metaclass and optionally specify whether thread-safety should be enabled:

class MySingletonClass(metaclass=Singleton, thread_safe=True):
    def __init__(self, value):
        self.value = value

# Create instances of MySingletonClass
instance1 = MySingletonClass(10)
instance2 = MySingletonClass(20)

# Verify that both instances are the same
print(instance1.value)  # Output: 10
print(instance2.value)  # Output: 10
print(instance1 is instance2)  # Output: True
Source code in archipy/helpers/metaclasses/singleton.py
class Singleton(type):
    """A thread-safe Singleton metaclass that ensures only one instance of a class is created.

    This metaclass can be used to create Singleton classes. It supports an optional `thread_safe`
    parameter to control whether thread-safety mechanisms (e.g., locks) should be used.

    Attributes:
        _instances (dict): A dictionary to store instances of Singleton classes.
        _lock (threading.Lock): A lock to ensure thread-safe instance creation.

    Example:
        To create a Singleton class, use the `Singleton` metaclass and optionally specify
        whether thread-safety should be enabled:

        ```python
        class MySingletonClass(metaclass=Singleton, thread_safe=True):
            def __init__(self, value):
                self.value = value

        # Create instances of MySingletonClass
        instance1 = MySingletonClass(10)
        instance2 = MySingletonClass(20)

        # Verify that both instances are the same
        print(instance1.value)  # Output: 10
        print(instance2.value)  # Output: 10
        print(instance1 is instance2)  # Output: True
        ```
    """

    _instances: ClassVar[dict[type, SingletonInstance]] = {}  # Stores instances of Singleton classes
    _lock: ClassVar[threading.Lock] = threading.Lock()  # Lock for thread-safe instance creation

    def __new__(
        cls,
        name: str,
        bases: ClassBases,
        dct: ClassDict,
        **kwargs: object,
    ) -> type:
        """Create a new Singleton metaclass instance.

        Args:
            name (str): The name of the class.
            bases (tuple): The base classes of the class.
            dct (dict): The namespace containing the class attributes.
            **kwargs: Additional keyword arguments, including `thread_safe`.

        Returns:
            type: A new metaclass instance.
        """
        # Extract the `thread_safe` parameter from kwargs
        thread_safe = kwargs.pop("thread_safe", True)
        # Create the new class
        new_class = super().__new__(cls, name, bases, dct, **kwargs)
        # Store thread_safe as an attribute using setattr to avoid linting errors
        setattr(new_class, "__thread_safe", thread_safe)
        return new_class

    def __call__(cls, *args: object, **kwargs: object) -> SingletonInstance:
        """Create or return the Singleton instance of the class.

        If `thread_safe` is True, a lock is used to ensure that only one instance is created
        even in a multi-threaded environment. If `thread_safe` is False, no locking mechanism
        is used, which may result in multiple instances being created in a multi-threaded context.

        Args:
            *args: Positional arguments to pass to the class constructor.
            **kwargs: Keyword arguments to pass to the class constructor.

        Returns:
            object: The Singleton instance of the class.
        """
        if cls not in cls._instances:
            if getattr(cls, "__thread_safe", True):
                with cls._lock:
                    if cls not in cls._instances:
                        cls._instances[cls] = super().__call__(*args, **kwargs)
            else:
                cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

archipy.helpers.metaclasses.singleton.Singleton.__new__(name, bases, dct, **kwargs)

Create a new Singleton metaclass instance.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases tuple

The base classes of the class.

required
dct dict

The namespace containing the class attributes.

required
**kwargs object

Additional keyword arguments, including thread_safe.

{}

Returns:

Name Type Description
type type

A new metaclass instance.

Source code in archipy/helpers/metaclasses/singleton.py
def __new__(
    cls,
    name: str,
    bases: ClassBases,
    dct: ClassDict,
    **kwargs: object,
) -> type:
    """Create a new Singleton metaclass instance.

    Args:
        name (str): The name of the class.
        bases (tuple): The base classes of the class.
        dct (dict): The namespace containing the class attributes.
        **kwargs: Additional keyword arguments, including `thread_safe`.

    Returns:
        type: A new metaclass instance.
    """
    # Extract the `thread_safe` parameter from kwargs
    thread_safe = kwargs.pop("thread_safe", True)
    # Create the new class
    new_class = super().__new__(cls, name, bases, dct, **kwargs)
    # Store thread_safe as an attribute using setattr to avoid linting errors
    setattr(new_class, "__thread_safe", thread_safe)
    return new_class

archipy.helpers.metaclasses.singleton.Singleton.__call__(*args, **kwargs)

Create or return the Singleton instance of the class.

If thread_safe is True, a lock is used to ensure that only one instance is created even in a multi-threaded environment. If thread_safe is False, no locking mechanism is used, which may result in multiple instances being created in a multi-threaded context.

Parameters:

Name Type Description Default
*args object

Positional arguments to pass to the class constructor.

()
**kwargs object

Keyword arguments to pass to the class constructor.

{}

Returns:

Name Type Description
object SingletonInstance

The Singleton instance of the class.

Source code in archipy/helpers/metaclasses/singleton.py
def __call__(cls, *args: object, **kwargs: object) -> SingletonInstance:
    """Create or return the Singleton instance of the class.

    If `thread_safe` is True, a lock is used to ensure that only one instance is created
    even in a multi-threaded environment. If `thread_safe` is False, no locking mechanism
    is used, which may result in multiple instances being created in a multi-threaded context.

    Args:
        *args: Positional arguments to pass to the class constructor.
        **kwargs: Keyword arguments to pass to the class constructor.

    Returns:
        object: The Singleton instance of the class.
    """
    if cls not in cls._instances:
        if getattr(cls, "__thread_safe", True):
            with cls._lock:
                if cls not in cls._instances:
                    cls._instances[cls] = super().__call__(*args, **kwargs)
        else:
            cls._instances[cls] = super().__call__(*args, **kwargs)
    return cls._instances[cls]

options: show_root_heading: true show_source: true

Key Classes

Retry Decorator

Function: archipy.helpers.decorators.retry.retry

A decorator that retries a function call when it fails, with configurable:

  • Maximum number of attempts
  • Delay between attempts
  • Backoff strategy
  • Exception types to catch

Singleton

Class: archipy.helpers.metaclasses.singleton.Singleton

A metaclass that ensures a class has only one instance. Features:

  • Thread-safe implementation
  • Lazy initialization
  • Support for inheritance
  • Clear instance access

FastAPIRestRateLimitHandler

Class: archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler

A rate limiting handler for FastAPI applications that:

  • Supports Redis-based rate limiting
  • Configurable rate limits and periods
  • Customizable response handling
  • Support for multiple rate limit rules