Skip to content

Utils

The utils module provides helper classes with static methods for common operations across the application.

datetime_utils

Utilities for date and time operations.

from archipy.helpers.utils.datetime_utils import DateTimeUtils

# Get current UTC time
now = DateTimeUtils.get_utc_now()

# Format datetime
formatted = DateTimeUtils.format_datetime(now, format="%Y-%m-%d %H:%M:%S")

archipy.helpers.utils.datetime_utils.DatetimeUtils

A utility class for handling date and time operations, including conversions, caching, and API integrations.

This class provides methods for working with both Gregorian and Jalali (Persian) calendars, as well as utility functions for timezone-aware datetime objects, date ranges, and string formatting.

Source code in archipy/helpers/utils/datetime_utils.py
class DatetimeUtils:
    """A utility class for handling date and time operations, including conversions, caching, and API integrations.

    This class provides methods for working with both Gregorian and Jalali (Persian) calendars, as well as
    utility functions for timezone-aware datetime objects, date ranges, and string formatting.
    """

    """A class-level cache for storing holiday statuses to avoid redundant API calls."""
    _holiday_cache: ClassVar[dict[str, tuple[bool, datetime]]] = {}

    @staticmethod
    def convert_to_jalali(target_date: date) -> jdatetime.date:
        """Converts a Gregorian date to a Jalali (Persian) date.

        Args:
            target_date (date): The Gregorian date to convert.

        Returns:
            jdatetime.date: The corresponding Jalali date.
        """
        return jdatetime.date.fromgregorian(date=target_date)

    @classmethod
    def is_holiday_in_iran(cls, target_date: date) -> bool:
        """Determines if the target date is a holiday in Iran.

        This method leverages caching and an external API to check if the given date is a holiday.

        Args:
            target_date (date): The date to check for holiday status.

        Returns:
            bool: True if the date is a holiday, False otherwise.
        """
        # Convert to Jalali date first
        jalali_date = cls.convert_to_jalali(target_date)
        date_str = target_date.strftime("%Y-%m-%d")
        current_time = cls.get_datetime_utc_now()

        # Check cache first
        is_cached, is_holiday = cls._check_cache(date_str, current_time)
        if is_cached:
            return is_holiday

        # Fetch holiday status and cache it
        return cls._fetch_and_cache_holiday_status(jalali_date, date_str, current_time)

    @classmethod
    def _check_cache(cls, date_str: str, current_time: datetime) -> tuple[bool, bool]:
        """Checks the cache for holiday status to avoid redundant API calls.

        Args:
            date_str (str): The date string to check in the cache.
            current_time (datetime): The current time to compare against cache expiration.

        Returns:
            tuple[bool, bool]: A tuple where the first element indicates if the cache was hit,
                               and the second element is the cached holiday status.
        """
        cached_data = cls._holiday_cache.get(date_str)
        if cached_data:
            is_holiday, expiry_time = cached_data
            if current_time < expiry_time:
                return True, is_holiday

            # Remove expired cache entry
            del cls._holiday_cache[date_str]

        return False, False

    @classmethod
    def _fetch_and_cache_holiday_status(
        cls,
        jalali_date: jdatetime.date,
        date_str: str,
        current_time: datetime,
    ) -> bool:
        """Fetches holiday status from the API and caches the result.

        This method calls an external API to determine if the given Jalali date is a holiday.
        If the API call is successful, the result is cached with an expiration time to avoid
        redundant API calls. If the API call fails, an `UnknownException` is raised.

        Args:
            jalali_date (jdatetime.date): The Jalali date to check for holiday status.
            date_str (str): The date string to use as a cache key.
            current_time (datetime): The current time to set cache expiration.

        Returns:
            bool: True if the date is a holiday, False otherwise.

        Raises:
            UnknownError: If the API request fails due to a network issue or other request-related errors.
        """
        try:
            config: Any = BaseConfig.global_config()
            response = cls._call_holiday_api(jalali_date)
            is_holiday = cls._parse_holiday_response(response, jalali_date)

            # Cache the result with expiration
            expiry_time = current_time + timedelta(seconds=config.DATETIME.CACHE_TTL)
            cls._holiday_cache[date_str] = (is_holiday, expiry_time)
        except requests.RequestException as exception:
            raise UnknownError from exception

        return is_holiday

    @staticmethod
    def _call_holiday_api(jalali_date: jdatetime.date) -> dict[str, Any]:
        """Calls the Time.ir API to fetch holiday data for the given Jalali date.

        Args:
            jalali_date (jdatetime.date): The Jalali date to fetch data for.

        Returns:
            Dict[str, Any]: The JSON response from the API.

        Raises:
            requests.RequestException: If the API request fails.
        """
        config: Any = BaseConfig.global_config()
        retry_strategy = Retry(
            total=config.DATETIME.MAX_RETRIES,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS"],
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session = requests.Session()
        session.mount("https://", adapter)

        url = DatetimeUtils._build_api_url(jalali_date)
        headers = {"x-api-key": config.DATETIME.TIME_IR_API_KEY}
        response = session.get(url, headers=headers, timeout=config.DATETIME.REQUEST_TIMEOUT)
        response.raise_for_status()
        result: dict[str, Any] = response.json()
        return result

    @staticmethod
    def _build_api_url(jalali_date: jdatetime.date) -> str:
        """Builds the API URL with Jalali date parameters.

        Args:
            jalali_date (jdatetime.date): The Jalali date to include in the URL.

        Returns:
            str: The constructed API URL.
        """
        config: Any = BaseConfig.global_config()
        base_url = config.DATETIME.TIME_IR_API_ENDPOINT
        return f"{base_url}?year={jalali_date.year}&month={jalali_date.month}&day={jalali_date.day}"

    @staticmethod
    def _parse_holiday_response(response_data: dict[str, Any], jalali_date: jdatetime.date) -> bool:
        """Parses the API response to extract and return the holiday status.

        Args:
            response_data (Dict[str, Any]): The JSON response from the API.
            jalali_date (jdatetime.date): The Jalali date to check.

        Returns:
            bool: True if the date is a holiday, False otherwise.
        """
        event_list = response_data.get("data", {}).get("event_list", [])
        for event_info in event_list:
            if (
                event_info.get("jalali_year") == jalali_date.year
                and event_info.get("jalali_month") == jalali_date.month
                and event_info.get("jalali_day") == jalali_date.day
            ):
                is_holiday = event_info.get("is_holiday", False)
                return bool(is_holiday)
        return False

    @classmethod
    def ensure_timezone_aware(cls, dt: datetime) -> datetime:
        """Ensures a datetime object is timezone-aware, converting it to UTC if necessary.

        Args:
            dt (datetime): The datetime object to make timezone-aware.

        Returns:
            datetime: The timezone-aware datetime object.
        """
        if dt.tzinfo is None:
            return dt.replace(tzinfo=UTC)
        return dt

    @classmethod
    def daterange(cls, start_date: datetime, end_date: datetime) -> Generator[date, None, None]:
        """Generates a range of dates from start_date to end_date, exclusive of end_date.

        Args:
            start_date (datetime): The start date of the range.
            end_date (datetime): The end date of the range.

        Yields:
            date: Each date in the range.
        """
        for n in range((end_date - start_date).days):
            yield (start_date + timedelta(n)).date()

    @classmethod
    def get_string_datetime_from_datetime(cls, dt: datetime, format_: str | None = None) -> str:
        """Converts a datetime object to a formatted string. Default format is ISO 8601.

        Args:
            dt (datetime): The datetime object to format.
            format_ (str | None): The format string. If None, uses ISO 8601.

        Returns:
            str: The formatted datetime string.
        """
        format_ = format_ or "%Y-%m-%dT%H:%M:%S.%f"
        return dt.strftime(format_)

    @classmethod
    def standardize_string_datetime(cls, date_string: str) -> str:
        """Standardizes a datetime string to the default format.

        Args:
            date_string (str): The datetime string to standardize.

        Returns:
            str: The standardized datetime string.
        """
        datetime_ = cls.get_datetime_from_string_datetime(date_string)
        return cls.get_string_datetime_from_datetime(datetime_)

    @classmethod
    def get_datetime_from_string_datetime(cls, date_string: str, format_: str | None = None) -> datetime:
        """Parses a string to a datetime object using the given format, or ISO 8601 by default.

        Args:
            date_string (str): The datetime string to parse.
            format_ (str | None): The format string. If None, uses ISO 8601.

        Returns:
            datetime: The parsed datetime object with UTC timezone.
        """
        # Parse using a single expression and immediately make timezone-aware for both cases
        dt = (
            datetime.fromisoformat(date_string)
            if format_ is None
            else datetime.strptime(date_string, format_).replace(tzinfo=UTC)
        )

        # Handle the fromisoformat case which might already have timezone info
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=UTC)

        return dt

    @classmethod
    def get_string_datetime_now(cls) -> str:
        """Gets the current datetime as a formatted string. Default format is ISO 8601.

        Returns:
            str: The formatted datetime string.
        """
        return cls.get_string_datetime_from_datetime(cls.get_datetime_now())

    @classmethod
    def get_datetime_now(cls) -> datetime:
        """Gets the current local datetime with timezone information.

        Returns:
            datetime: The current local datetime with UTC timezone.
        """
        return datetime.now(UTC)

    @classmethod
    def get_datetime_utc_now(cls) -> datetime:
        """Gets the current UTC datetime.

        Returns:
            datetime: The current UTC datetime.
        """
        return datetime.now(UTC)

    @classmethod
    def get_epoch_time_now(cls) -> int:
        """Gets the current time in seconds since the epoch.

        Returns:
            int: The current epoch time.
        """
        return int(time.time())

    @classmethod
    def get_datetime_before_given_datetime_or_now(
        cls,
        weeks: int = 0,
        days: int = 0,
        hours: int = 0,
        minutes: int = 0,
        seconds: int = 0,
        datetime_given: datetime | None = None,
    ) -> datetime:
        """Subtracts time from a given datetime or the current datetime if not specified.

        Args:
            weeks (int): The number of weeks to subtract.
            days (int): The number of days to subtract.
            hours (int): The number of hours to subtract.
            minutes (int): The number of minutes to subtract.
            seconds (int): The number of seconds to subtract.
            datetime_given (datetime | None): The datetime to subtract from. If None, uses the current datetime.

        Returns:
            datetime: The resulting datetime after subtraction.
        """
        datetime_given = datetime_given or cls.get_datetime_now()
        return datetime_given - timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds)

    @classmethod
    def get_datetime_after_given_datetime_or_now(
        cls,
        weeks: int = 0,
        days: int = 0,
        hours: int = 0,
        minutes: int = 0,
        seconds: int = 0,
        datetime_given: datetime | None = None,
    ) -> datetime:
        """Adds time to a given datetime or the current datetime if not specified.

        Args:
            weeks (int): The number of weeks to add.
            days (int): The number of days to add.
            hours (int): The number of hours to add.
            minutes (int): The number of minutes to add.
            seconds (int): The number of seconds to add.
            datetime_given (datetime | None): The datetime to add to. If None, uses the current datetime.

        Returns:
            datetime: The resulting datetime after addition.
        """
        datetime_given = datetime_given or cls.get_datetime_now()
        return datetime_given + timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds)

archipy.helpers.utils.datetime_utils.DatetimeUtils.convert_to_jalali(target_date) staticmethod

Converts a Gregorian date to a Jalali (Persian) date.

Parameters:

Name Type Description Default
target_date date

The Gregorian date to convert.

required

Returns:

Type Description
date

jdatetime.date: The corresponding Jalali date.

Source code in archipy/helpers/utils/datetime_utils.py
@staticmethod
def convert_to_jalali(target_date: date) -> jdatetime.date:
    """Converts a Gregorian date to a Jalali (Persian) date.

    Args:
        target_date (date): The Gregorian date to convert.

    Returns:
        jdatetime.date: The corresponding Jalali date.
    """
    return jdatetime.date.fromgregorian(date=target_date)

archipy.helpers.utils.datetime_utils.DatetimeUtils.is_holiday_in_iran(target_date) classmethod

Determines if the target date is a holiday in Iran.

This method leverages caching and an external API to check if the given date is a holiday.

Parameters:

Name Type Description Default
target_date date

The date to check for holiday status.

required

Returns:

Name Type Description
bool bool

True if the date is a holiday, False otherwise.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def is_holiday_in_iran(cls, target_date: date) -> bool:
    """Determines if the target date is a holiday in Iran.

    This method leverages caching and an external API to check if the given date is a holiday.

    Args:
        target_date (date): The date to check for holiday status.

    Returns:
        bool: True if the date is a holiday, False otherwise.
    """
    # Convert to Jalali date first
    jalali_date = cls.convert_to_jalali(target_date)
    date_str = target_date.strftime("%Y-%m-%d")
    current_time = cls.get_datetime_utc_now()

    # Check cache first
    is_cached, is_holiday = cls._check_cache(date_str, current_time)
    if is_cached:
        return is_holiday

    # Fetch holiday status and cache it
    return cls._fetch_and_cache_holiday_status(jalali_date, date_str, current_time)

archipy.helpers.utils.datetime_utils.DatetimeUtils.ensure_timezone_aware(dt) classmethod

Ensures a datetime object is timezone-aware, converting it to UTC if necessary.

Parameters:

Name Type Description Default
dt datetime

The datetime object to make timezone-aware.

required

Returns:

Name Type Description
datetime datetime

The timezone-aware datetime object.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def ensure_timezone_aware(cls, dt: datetime) -> datetime:
    """Ensures a datetime object is timezone-aware, converting it to UTC if necessary.

    Args:
        dt (datetime): The datetime object to make timezone-aware.

    Returns:
        datetime: The timezone-aware datetime object.
    """
    if dt.tzinfo is None:
        return dt.replace(tzinfo=UTC)
    return dt

archipy.helpers.utils.datetime_utils.DatetimeUtils.daterange(start_date, end_date) classmethod

Generates a range of dates from start_date to end_date, exclusive of end_date.

Parameters:

Name Type Description Default
start_date datetime

The start date of the range.

required
end_date datetime

The end date of the range.

required

Yields:

Name Type Description
date date

Each date in the range.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def daterange(cls, start_date: datetime, end_date: datetime) -> Generator[date, None, None]:
    """Generates a range of dates from start_date to end_date, exclusive of end_date.

    Args:
        start_date (datetime): The start date of the range.
        end_date (datetime): The end date of the range.

    Yields:
        date: Each date in the range.
    """
    for n in range((end_date - start_date).days):
        yield (start_date + timedelta(n)).date()

archipy.helpers.utils.datetime_utils.DatetimeUtils.get_string_datetime_from_datetime(dt, format_=None) classmethod

Converts a datetime object to a formatted string. Default format is ISO 8601.

Parameters:

Name Type Description Default
dt datetime

The datetime object to format.

required
format_ str | None

The format string. If None, uses ISO 8601.

None

Returns:

Name Type Description
str str

The formatted datetime string.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def get_string_datetime_from_datetime(cls, dt: datetime, format_: str | None = None) -> str:
    """Converts a datetime object to a formatted string. Default format is ISO 8601.

    Args:
        dt (datetime): The datetime object to format.
        format_ (str | None): The format string. If None, uses ISO 8601.

    Returns:
        str: The formatted datetime string.
    """
    format_ = format_ or "%Y-%m-%dT%H:%M:%S.%f"
    return dt.strftime(format_)

archipy.helpers.utils.datetime_utils.DatetimeUtils.standardize_string_datetime(date_string) classmethod

Standardizes a datetime string to the default format.

Parameters:

Name Type Description Default
date_string str

The datetime string to standardize.

required

Returns:

Name Type Description
str str

The standardized datetime string.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def standardize_string_datetime(cls, date_string: str) -> str:
    """Standardizes a datetime string to the default format.

    Args:
        date_string (str): The datetime string to standardize.

    Returns:
        str: The standardized datetime string.
    """
    datetime_ = cls.get_datetime_from_string_datetime(date_string)
    return cls.get_string_datetime_from_datetime(datetime_)

archipy.helpers.utils.datetime_utils.DatetimeUtils.get_datetime_from_string_datetime(date_string, format_=None) classmethod

Parses a string to a datetime object using the given format, or ISO 8601 by default.

Parameters:

Name Type Description Default
date_string str

The datetime string to parse.

required
format_ str | None

The format string. If None, uses ISO 8601.

None

Returns:

Name Type Description
datetime datetime

The parsed datetime object with UTC timezone.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def get_datetime_from_string_datetime(cls, date_string: str, format_: str | None = None) -> datetime:
    """Parses a string to a datetime object using the given format, or ISO 8601 by default.

    Args:
        date_string (str): The datetime string to parse.
        format_ (str | None): The format string. If None, uses ISO 8601.

    Returns:
        datetime: The parsed datetime object with UTC timezone.
    """
    # Parse using a single expression and immediately make timezone-aware for both cases
    dt = (
        datetime.fromisoformat(date_string)
        if format_ is None
        else datetime.strptime(date_string, format_).replace(tzinfo=UTC)
    )

    # Handle the fromisoformat case which might already have timezone info
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=UTC)

    return dt

archipy.helpers.utils.datetime_utils.DatetimeUtils.get_string_datetime_now() classmethod

Gets the current datetime as a formatted string. Default format is ISO 8601.

Returns:

Name Type Description
str str

The formatted datetime string.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def get_string_datetime_now(cls) -> str:
    """Gets the current datetime as a formatted string. Default format is ISO 8601.

    Returns:
        str: The formatted datetime string.
    """
    return cls.get_string_datetime_from_datetime(cls.get_datetime_now())

archipy.helpers.utils.datetime_utils.DatetimeUtils.get_datetime_now() classmethod

Gets the current local datetime with timezone information.

Returns:

Name Type Description
datetime datetime

The current local datetime with UTC timezone.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def get_datetime_now(cls) -> datetime:
    """Gets the current local datetime with timezone information.

    Returns:
        datetime: The current local datetime with UTC timezone.
    """
    return datetime.now(UTC)

archipy.helpers.utils.datetime_utils.DatetimeUtils.get_datetime_utc_now() classmethod

Gets the current UTC datetime.

Returns:

Name Type Description
datetime datetime

The current UTC datetime.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def get_datetime_utc_now(cls) -> datetime:
    """Gets the current UTC datetime.

    Returns:
        datetime: The current UTC datetime.
    """
    return datetime.now(UTC)

archipy.helpers.utils.datetime_utils.DatetimeUtils.get_epoch_time_now() classmethod

Gets the current time in seconds since the epoch.

Returns:

Name Type Description
int int

The current epoch time.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def get_epoch_time_now(cls) -> int:
    """Gets the current time in seconds since the epoch.

    Returns:
        int: The current epoch time.
    """
    return int(time.time())

archipy.helpers.utils.datetime_utils.DatetimeUtils.get_datetime_before_given_datetime_or_now(weeks=0, days=0, hours=0, minutes=0, seconds=0, datetime_given=None) classmethod

Subtracts time from a given datetime or the current datetime if not specified.

Parameters:

Name Type Description Default
weeks int

The number of weeks to subtract.

0
days int

The number of days to subtract.

0
hours int

The number of hours to subtract.

0
minutes int

The number of minutes to subtract.

0
seconds int

The number of seconds to subtract.

0
datetime_given datetime | None

The datetime to subtract from. If None, uses the current datetime.

None

Returns:

Name Type Description
datetime datetime

The resulting datetime after subtraction.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def get_datetime_before_given_datetime_or_now(
    cls,
    weeks: int = 0,
    days: int = 0,
    hours: int = 0,
    minutes: int = 0,
    seconds: int = 0,
    datetime_given: datetime | None = None,
) -> datetime:
    """Subtracts time from a given datetime or the current datetime if not specified.

    Args:
        weeks (int): The number of weeks to subtract.
        days (int): The number of days to subtract.
        hours (int): The number of hours to subtract.
        minutes (int): The number of minutes to subtract.
        seconds (int): The number of seconds to subtract.
        datetime_given (datetime | None): The datetime to subtract from. If None, uses the current datetime.

    Returns:
        datetime: The resulting datetime after subtraction.
    """
    datetime_given = datetime_given or cls.get_datetime_now()
    return datetime_given - timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds)

archipy.helpers.utils.datetime_utils.DatetimeUtils.get_datetime_after_given_datetime_or_now(weeks=0, days=0, hours=0, minutes=0, seconds=0, datetime_given=None) classmethod

Adds time to a given datetime or the current datetime if not specified.

Parameters:

Name Type Description Default
weeks int

The number of weeks to add.

0
days int

The number of days to add.

0
hours int

The number of hours to add.

0
minutes int

The number of minutes to add.

0
seconds int

The number of seconds to add.

0
datetime_given datetime | None

The datetime to add to. If None, uses the current datetime.

None

Returns:

Name Type Description
datetime datetime

The resulting datetime after addition.

Source code in archipy/helpers/utils/datetime_utils.py
@classmethod
def get_datetime_after_given_datetime_or_now(
    cls,
    weeks: int = 0,
    days: int = 0,
    hours: int = 0,
    minutes: int = 0,
    seconds: int = 0,
    datetime_given: datetime | None = None,
) -> datetime:
    """Adds time to a given datetime or the current datetime if not specified.

    Args:
        weeks (int): The number of weeks to add.
        days (int): The number of days to add.
        hours (int): The number of hours to add.
        minutes (int): The number of minutes to add.
        seconds (int): The number of seconds to add.
        datetime_given (datetime | None): The datetime to add to. If None, uses the current datetime.

    Returns:
        datetime: The resulting datetime after addition.
    """
    datetime_given = datetime_given or cls.get_datetime_now()
    return datetime_given + timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds)

options: show_root_heading: true show_source: true

file_utils

Utilities for file operations.

from archipy.helpers.utils.file_utils import FileUtils

# Read file content
content = FileUtils.read_file("path/to/file.txt")

# Write to file
FileUtils.write_file("path/to/output.txt", "content")

# Get file hash
file_hash = FileUtils.get_file_hash("path/to/file.txt")

# Validate file type
is_valid = FileUtils.validate_file_type("path/to/file.pdf", allowed_types=["pdf", "doc"])

archipy.helpers.utils.file_utils.FileUtils

A utility class for handling file-related operations, such as creating secure links and validating file names.

Source code in archipy/helpers/utils/file_utils.py
class FileUtils:
    """A utility class for handling file-related operations, such as creating secure links and validating file names."""

    @staticmethod
    def _create_secure_link_hash(path: str, expires_at: float, file_config: FileConfig | None = None) -> str:
        """Generates a secure hash for a file link based on the file path, expiration timestamp, and secret key.

        Args:
            path (str): The file path to generate the hash for.
            expires_at (float): The expiration timestamp for the link.
            file_config (FileConfig | None): Optional file configuration object. If not provided, uses the global config.

        Returns:
            str: A base64-encoded secure hash for the file link.

        Raises:
            InvalidArgumentError: If the `SECRET_KEY` in the configuration is `None`.
        """
        configs: FileConfig = file_config or BaseConfig.global_config().FILE
        secret: str | None = configs.SECRET_KEY
        if secret is None:
            raise InvalidArgumentError(argument_name="SECRET_KEY")
        _input = f"{expires_at}{path} {secret}"
        hash_object = hashlib.md5(_input.encode("utf8"))
        return base64.urlsafe_b64encode(hash_object.digest()).decode("utf-8").rstrip("=")

    @classmethod
    def create_secure_link(
        cls,
        path: str,
        minutes: int | None = None,
        file_config: FileConfig | None = None,
    ) -> str:
        """Creates a secure link with expiration for file access.

        Args:
            path (str): The file path to create a secure link for.
            minutes (int | None): Number of minutes until link expiration. Defaults to the config's `DEFAULT_EXPIRY_MINUTES`.
            file_config (FileConfig | None): Optional file configuration object. If not provided, uses the global config.

        Returns:
            str: A secure link with a hash and expiration timestamp.

        Raises:
            InvalidArgumentError: If the `path` is empty.
            OutOfRangeError: If `minutes` is less than 1.
        """
        if not path:
            raise InvalidArgumentError(argument_name="path")

        configs: FileConfig = file_config or BaseConfig.global_config().FILE
        expiry_minutes: int = minutes if minutes is not None else configs.DEFAULT_EXPIRY_MINUTES

        if expiry_minutes < 1:
            raise OutOfRangeError(field_name="minutes")

        expires_at = int(DatetimeUtils.get_datetime_after_given_datetime_or_now(minutes=expiry_minutes).timestamp())
        secure_link_hash = cls._create_secure_link_hash(path, expires_at, file_config)

        return f"{path}?md5={secure_link_hash}&expires_at={expires_at}"

    @classmethod
    def validate_file_name(
        cls,
        file_name: str,
        file_config: FileConfig | None = None,
    ) -> bool:
        """Validates a file name based on allowed extensions.

        Args:
            file_name (str): The file name to validate.
            file_config (FileConfig | None): Optional file configuration object. If not provided, uses the global config.

        Returns:
            bool: `True` if the file name has an allowed extension, `False` otherwise.

        Raises:
            InvalidArgumentError: If `file_name` is not a string or `allowed_extensions` is not a list.
        """
        configs: FileConfig = file_config or BaseConfig.global_config().FILE
        allowed_extensions: list[str] = configs.ALLOWED_EXTENSIONS

        if not isinstance(file_name, str):
            raise InvalidArgumentError(argument_name="file_name")

        if not allowed_extensions:
            raise InvalidArgumentError(argument_name="allowed_extensions")

        file_path = Path(file_name)
        ext = file_path.suffix[1:].lower()
        return ext in allowed_extensions and bool(ext)

Creates a secure link with expiration for file access.

Parameters:

Name Type Description Default
path str

The file path to create a secure link for.

required
minutes int | None

Number of minutes until link expiration. Defaults to the config's DEFAULT_EXPIRY_MINUTES.

None
file_config FileConfig | None

Optional file configuration object. If not provided, uses the global config.

None

Returns:

Name Type Description
str str

A secure link with a hash and expiration timestamp.

Raises:

Type Description
InvalidArgumentError

If the path is empty.

OutOfRangeError

If minutes is less than 1.

Source code in archipy/helpers/utils/file_utils.py
@classmethod
def create_secure_link(
    cls,
    path: str,
    minutes: int | None = None,
    file_config: FileConfig | None = None,
) -> str:
    """Creates a secure link with expiration for file access.

    Args:
        path (str): The file path to create a secure link for.
        minutes (int | None): Number of minutes until link expiration. Defaults to the config's `DEFAULT_EXPIRY_MINUTES`.
        file_config (FileConfig | None): Optional file configuration object. If not provided, uses the global config.

    Returns:
        str: A secure link with a hash and expiration timestamp.

    Raises:
        InvalidArgumentError: If the `path` is empty.
        OutOfRangeError: If `minutes` is less than 1.
    """
    if not path:
        raise InvalidArgumentError(argument_name="path")

    configs: FileConfig = file_config or BaseConfig.global_config().FILE
    expiry_minutes: int = minutes if minutes is not None else configs.DEFAULT_EXPIRY_MINUTES

    if expiry_minutes < 1:
        raise OutOfRangeError(field_name="minutes")

    expires_at = int(DatetimeUtils.get_datetime_after_given_datetime_or_now(minutes=expiry_minutes).timestamp())
    secure_link_hash = cls._create_secure_link_hash(path, expires_at, file_config)

    return f"{path}?md5={secure_link_hash}&expires_at={expires_at}"

archipy.helpers.utils.file_utils.FileUtils.validate_file_name(file_name, file_config=None) classmethod

Validates a file name based on allowed extensions.

Parameters:

Name Type Description Default
file_name str

The file name to validate.

required
file_config FileConfig | None

Optional file configuration object. If not provided, uses the global config.

None

Returns:

Name Type Description
bool bool

True if the file name has an allowed extension, False otherwise.

Raises:

Type Description
InvalidArgumentError

If file_name is not a string or allowed_extensions is not a list.

Source code in archipy/helpers/utils/file_utils.py
@classmethod
def validate_file_name(
    cls,
    file_name: str,
    file_config: FileConfig | None = None,
) -> bool:
    """Validates a file name based on allowed extensions.

    Args:
        file_name (str): The file name to validate.
        file_config (FileConfig | None): Optional file configuration object. If not provided, uses the global config.

    Returns:
        bool: `True` if the file name has an allowed extension, `False` otherwise.

    Raises:
        InvalidArgumentError: If `file_name` is not a string or `allowed_extensions` is not a list.
    """
    configs: FileConfig = file_config or BaseConfig.global_config().FILE
    allowed_extensions: list[str] = configs.ALLOWED_EXTENSIONS

    if not isinstance(file_name, str):
        raise InvalidArgumentError(argument_name="file_name")

    if not allowed_extensions:
        raise InvalidArgumentError(argument_name="allowed_extensions")

    file_path = Path(file_name)
    ext = file_path.suffix[1:].lower()
    return ext in allowed_extensions and bool(ext)

options: show_root_heading: true show_source: true

jwt_utils

Utilities for JWT (JSON Web Token) operations.

from archipy.helpers.utils.jwt_utils import JWTUtils

# Generate JWT
token = JWTUtils.generate_jwt(
    payload={"user_id": "123"},
    secret="your-secret",
    expires_in=3600
)

# Verify JWT
is_valid = JWTUtils.verify_jwt(token, secret="your-secret")

# Decode JWT
payload = JWTUtils.decode_jwt(token)

Utility module for JWT token operations with enhanced security and datetime handling.

This module provides a robust JWT handling implementation with support for access and refresh tokens, cryptographic security, token validation, and comprehensive error handling.

archipy.helpers.utils.jwt_utils.JWTUtils

Utility class for JWT token operations with enhanced security and datetime handling.

Source code in archipy/helpers/utils/jwt_utils.py
class JWTUtils:
    """Utility class for JWT token operations with enhanced security and datetime handling."""

    @classmethod
    def create_token(
        cls,
        data: dict[str, Any],
        expires_in: int,
        additional_claims: dict[str, Any] | None = None,
        auth_config: AuthConfig | None = None,
    ) -> str:
        """Creates a JWT token with enhanced security features.

        Args:
            data (dict[str, Any]): Base claims data to include in the token.
            expires_in (int): Token expiration time in seconds.
            additional_claims (dict[str, Any] | None): Optional additional claims to include in the token.
            auth_config (AuthConfig | None): Optional auth configuration override.
                If not provided, uses the global config.

        Returns:
            str: The encoded JWT token.

        Raises:
            ValueError: If data is empty or expiration is invalid
        """
        import jwt

        configs = auth_config or BaseConfig.global_config().AUTH
        current_time = DatetimeUtils.get_datetime_utc_now()

        # Define argument names
        arg_data = "data"
        arg_expires_in = "expires_in"

        if not data:
            raise InvalidArgumentError(arg_data)
        if expires_in <= 0:
            raise InvalidArgumentError(arg_expires_in)

        to_encode = data.copy()
        expire = DatetimeUtils.get_datetime_after_given_datetime_or_now(seconds=expires_in, datetime_given=current_time)

        # Add standard claims
        to_encode.update(
            {
                # Registered claims (RFC 7519)
                "iss": configs.JWT_ISSUER,
                "aud": configs.JWT_AUDIENCE,
                "exp": expire,
                "iat": current_time,
                "nbf": current_time,
            },
        )

        # Add JWT ID if enabled
        if configs.ENABLE_JTI_CLAIM:
            to_encode["jti"] = str(uuid4())

        # Add additional claims
        if additional_claims:
            to_encode.update(additional_claims)

        # Validate SECRET_KEY
        arg_secret_key = "SECRET_KEY"
        secret_key = configs.SECRET_KEY
        if secret_key is None:
            raise InvalidArgumentError(arg_secret_key)
        return jwt.encode(to_encode, secret_key.get_secret_value(), algorithm=configs.HASH_ALGORITHM)

    @classmethod
    def create_access_token(
        cls,
        user_uuid: UUID,
        additional_claims: dict[str, Any] | None = None,
        auth_config: AuthConfig | None = None,
    ) -> str:
        """Creates an access token for a user.

        Args:
            user_uuid (UUID): The user's UUID to include in the token.
            additional_claims (dict[str, Any] | None): Optional additional claims to include in the token.
            auth_config (AuthConfig | None): Optional auth configuration override.
                If not provided, uses the global config.

        Returns:
            str: The encoded access token.
        """
        configs = auth_config or BaseConfig.global_config().AUTH

        return cls.create_token(
            data={
                "sub": str(user_uuid),
                "type": "access",
                "token_version": configs.TOKEN_VERSION,
            },
            expires_in=configs.ACCESS_TOKEN_EXPIRES_IN,
            additional_claims=additional_claims,
            auth_config=configs,
        )

    @classmethod
    def create_refresh_token(
        cls,
        user_uuid: UUID,
        additional_claims: dict[str, Any] | None = None,
        auth_config: AuthConfig | None = None,
    ) -> str:
        """Creates a refresh token for a user.

        Args:
            user_uuid (UUID): The user's UUID to include in the token.
            additional_claims (dict[str, Any] | None): Optional additional claims to include in the token.
            auth_config (AuthConfig | None): Optional auth configuration override.
                If not provided, uses the global config.

        Returns:
            str: The encoded refresh token.
        """
        configs = auth_config or BaseConfig.global_config().AUTH

        return cls.create_token(
            data={
                "sub": str(user_uuid),
                "type": "refresh",
                "token_version": configs.TOKEN_VERSION,
            },
            expires_in=configs.REFRESH_TOKEN_EXPIRES_IN,
            additional_claims=additional_claims,
            auth_config=configs,
        )

    @classmethod
    def decode_token(
        cls,
        token: str,
        verify_type: str | None = None,
        auth_config: AuthConfig | None = None,
    ) -> dict[str, Any]:
        """Decodes and verifies a JWT token with enhanced security checks.

        Args:
            token (str): The JWT token to decode.
            verify_type (str | None): Optional token type to verify (e.g., "access" or "refresh").
            auth_config (AuthConfig | None): Optional auth configuration override.
                If not provided, uses the global config.

        Returns:
            dict[str, Any]: The decoded token payload.

        Raises:
            TokenExpiredError: If the token has expired.
            InvalidTokenError: If the token is invalid (e.g., invalid signature, audience, issuer, or type).
        """
        import jwt
        from jwt.exceptions import (
            ExpiredSignatureError,
            InvalidAudienceError,
            InvalidIssuerError,
            InvalidSignatureError,
            InvalidTokenError as JWTInvalidTokenError,
        )

        configs = auth_config or BaseConfig.global_config().AUTH
        required_claims = ["exp", "iat", "nbf", "aud", "iss", "sub", "type", "token_version"]
        if configs.ENABLE_JTI_CLAIM:
            required_claims.append("jti")

        try:
            # Validate SECRET_KEY
            arg_secret_key = "SECRET_KEY"
            secret_key = configs.SECRET_KEY
            if secret_key is None:
                raise InvalidArgumentError(arg_secret_key)

            payload = jwt.decode(
                token,
                secret_key.get_secret_value(),
                algorithms=[configs.HASH_ALGORITHM],
                options={
                    "verify_signature": True,
                    "verify_exp": True,
                    "verify_nbf": True,
                    "verify_iat": True,
                    "verify_aud": True,
                    "verify_iss": True,
                    "require": required_claims,
                },
                audience=configs.JWT_AUDIENCE,
                issuer=configs.JWT_ISSUER,
            )

            # Verify token type
            if verify_type and payload.get("type") != verify_type:
                raise InvalidTokenError

            # Verify token version
            if payload.get("token_version") != configs.TOKEN_VERSION:
                raise InvalidTokenError

            # Ensure the return type is dict[str, Any] as declared
            return dict(payload)

        except ExpiredSignatureError as exception:
            raise TokenExpiredError from exception
        except InvalidSignatureError as exception:
            raise InvalidTokenError from exception
        except InvalidAudienceError as exception:
            raise InvalidTokenError from exception
        except InvalidIssuerError as exception:
            raise InvalidTokenError from exception
        except JWTInvalidTokenError as exception:
            raise InvalidTokenError from exception

    @classmethod
    def verify_access_token(cls, token: str, auth_config: AuthConfig | None = None) -> dict[str, Any]:
        """Verifies an access token.

        Args:
            token (str): The access token to verify.
            auth_config (AuthConfig | None): Optional auth configuration override.
                If not provided, uses the global config.

        Returns:
            dict[str, Any]: The decoded access token payload.

        Raises:
            InvalidTokenException: If the token is invalid or not an access token.
            TokenExpiredException: If the token has expired.
        """
        configs = auth_config or BaseConfig.global_config().AUTH
        return cls.decode_token(token, verify_type="access", auth_config=configs)

    @classmethod
    def verify_refresh_token(cls, token: str, auth_config: AuthConfig | None = None) -> dict[str, Any]:
        """Verifies a refresh token.

        Args:
            token (str): The refresh token to verify.
            auth_config (AuthConfig | None): Optional auth configuration override.
                If not provided, uses the global config.

        Returns:
            dict[str, Any]: The decoded refresh token payload.

        Raises:
            InvalidTokenException: If the token is invalid or not a refresh token.
            TokenExpiredException: If the token has expired.
        """
        configs = auth_config or BaseConfig.global_config().AUTH
        return cls.decode_token(token, verify_type="refresh", auth_config=configs)

    @staticmethod
    def extract_user_uuid(payload: dict[str, Any]) -> UUID:
        """Extracts the user UUID from the token payload.

        Args:
            payload (dict[str, Any]): The decoded token payload.

        Returns:
            UUID: The user's UUID.

        Raises:
            InvalidTokenException: If the user identifier is invalid or missing.
        """
        try:
            return UUID(payload["sub"])
        except (KeyError, ValueError) as exception:
            raise InvalidTokenError from exception

    @classmethod
    def get_token_expiry(cls, token: str, auth_config: AuthConfig | None = None) -> int:
        """Gets the token expiry timestamp.

        Args:
            token (str): The JWT token.
            auth_config (AuthConfig | None): Optional auth configuration override.
                If not provided, uses the global config.

        Returns:
            int: The token expiry timestamp in seconds.

        Raises:
            InvalidTokenException: If the token is invalid.
        """
        payload = cls.decode_token(token, auth_config=auth_config)
        return int(payload["exp"])

archipy.helpers.utils.jwt_utils.JWTUtils.create_token(data, expires_in, additional_claims=None, auth_config=None) classmethod

Creates a JWT token with enhanced security features.

Parameters:

Name Type Description Default
data dict[str, Any]

Base claims data to include in the token.

required
expires_in int

Token expiration time in seconds.

required
additional_claims dict[str, Any] | None

Optional additional claims to include in the token.

None
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Name Type Description
str str

The encoded JWT token.

Raises:

Type Description
ValueError

If data is empty or expiration is invalid

Source code in archipy/helpers/utils/jwt_utils.py
@classmethod
def create_token(
    cls,
    data: dict[str, Any],
    expires_in: int,
    additional_claims: dict[str, Any] | None = None,
    auth_config: AuthConfig | None = None,
) -> str:
    """Creates a JWT token with enhanced security features.

    Args:
        data (dict[str, Any]): Base claims data to include in the token.
        expires_in (int): Token expiration time in seconds.
        additional_claims (dict[str, Any] | None): Optional additional claims to include in the token.
        auth_config (AuthConfig | None): Optional auth configuration override.
            If not provided, uses the global config.

    Returns:
        str: The encoded JWT token.

    Raises:
        ValueError: If data is empty or expiration is invalid
    """
    import jwt

    configs = auth_config or BaseConfig.global_config().AUTH
    current_time = DatetimeUtils.get_datetime_utc_now()

    # Define argument names
    arg_data = "data"
    arg_expires_in = "expires_in"

    if not data:
        raise InvalidArgumentError(arg_data)
    if expires_in <= 0:
        raise InvalidArgumentError(arg_expires_in)

    to_encode = data.copy()
    expire = DatetimeUtils.get_datetime_after_given_datetime_or_now(seconds=expires_in, datetime_given=current_time)

    # Add standard claims
    to_encode.update(
        {
            # Registered claims (RFC 7519)
            "iss": configs.JWT_ISSUER,
            "aud": configs.JWT_AUDIENCE,
            "exp": expire,
            "iat": current_time,
            "nbf": current_time,
        },
    )

    # Add JWT ID if enabled
    if configs.ENABLE_JTI_CLAIM:
        to_encode["jti"] = str(uuid4())

    # Add additional claims
    if additional_claims:
        to_encode.update(additional_claims)

    # Validate SECRET_KEY
    arg_secret_key = "SECRET_KEY"
    secret_key = configs.SECRET_KEY
    if secret_key is None:
        raise InvalidArgumentError(arg_secret_key)
    return jwt.encode(to_encode, secret_key.get_secret_value(), algorithm=configs.HASH_ALGORITHM)

archipy.helpers.utils.jwt_utils.JWTUtils.create_access_token(user_uuid, additional_claims=None, auth_config=None) classmethod

Creates an access token for a user.

Parameters:

Name Type Description Default
user_uuid UUID

The user's UUID to include in the token.

required
additional_claims dict[str, Any] | None

Optional additional claims to include in the token.

None
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Name Type Description
str str

The encoded access token.

Source code in archipy/helpers/utils/jwt_utils.py
@classmethod
def create_access_token(
    cls,
    user_uuid: UUID,
    additional_claims: dict[str, Any] | None = None,
    auth_config: AuthConfig | None = None,
) -> str:
    """Creates an access token for a user.

    Args:
        user_uuid (UUID): The user's UUID to include in the token.
        additional_claims (dict[str, Any] | None): Optional additional claims to include in the token.
        auth_config (AuthConfig | None): Optional auth configuration override.
            If not provided, uses the global config.

    Returns:
        str: The encoded access token.
    """
    configs = auth_config or BaseConfig.global_config().AUTH

    return cls.create_token(
        data={
            "sub": str(user_uuid),
            "type": "access",
            "token_version": configs.TOKEN_VERSION,
        },
        expires_in=configs.ACCESS_TOKEN_EXPIRES_IN,
        additional_claims=additional_claims,
        auth_config=configs,
    )

archipy.helpers.utils.jwt_utils.JWTUtils.create_refresh_token(user_uuid, additional_claims=None, auth_config=None) classmethod

Creates a refresh token for a user.

Parameters:

Name Type Description Default
user_uuid UUID

The user's UUID to include in the token.

required
additional_claims dict[str, Any] | None

Optional additional claims to include in the token.

None
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Name Type Description
str str

The encoded refresh token.

Source code in archipy/helpers/utils/jwt_utils.py
@classmethod
def create_refresh_token(
    cls,
    user_uuid: UUID,
    additional_claims: dict[str, Any] | None = None,
    auth_config: AuthConfig | None = None,
) -> str:
    """Creates a refresh token for a user.

    Args:
        user_uuid (UUID): The user's UUID to include in the token.
        additional_claims (dict[str, Any] | None): Optional additional claims to include in the token.
        auth_config (AuthConfig | None): Optional auth configuration override.
            If not provided, uses the global config.

    Returns:
        str: The encoded refresh token.
    """
    configs = auth_config or BaseConfig.global_config().AUTH

    return cls.create_token(
        data={
            "sub": str(user_uuid),
            "type": "refresh",
            "token_version": configs.TOKEN_VERSION,
        },
        expires_in=configs.REFRESH_TOKEN_EXPIRES_IN,
        additional_claims=additional_claims,
        auth_config=configs,
    )

archipy.helpers.utils.jwt_utils.JWTUtils.decode_token(token, verify_type=None, auth_config=None) classmethod

Decodes and verifies a JWT token with enhanced security checks.

Parameters:

Name Type Description Default
token str

The JWT token to decode.

required
verify_type str | None

Optional token type to verify (e.g., "access" or "refresh").

None
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The decoded token payload.

Raises:

Type Description
TokenExpiredError

If the token has expired.

InvalidTokenError

If the token is invalid (e.g., invalid signature, audience, issuer, or type).

Source code in archipy/helpers/utils/jwt_utils.py
@classmethod
def decode_token(
    cls,
    token: str,
    verify_type: str | None = None,
    auth_config: AuthConfig | None = None,
) -> dict[str, Any]:
    """Decodes and verifies a JWT token with enhanced security checks.

    Args:
        token (str): The JWT token to decode.
        verify_type (str | None): Optional token type to verify (e.g., "access" or "refresh").
        auth_config (AuthConfig | None): Optional auth configuration override.
            If not provided, uses the global config.

    Returns:
        dict[str, Any]: The decoded token payload.

    Raises:
        TokenExpiredError: If the token has expired.
        InvalidTokenError: If the token is invalid (e.g., invalid signature, audience, issuer, or type).
    """
    import jwt
    from jwt.exceptions import (
        ExpiredSignatureError,
        InvalidAudienceError,
        InvalidIssuerError,
        InvalidSignatureError,
        InvalidTokenError as JWTInvalidTokenError,
    )

    configs = auth_config or BaseConfig.global_config().AUTH
    required_claims = ["exp", "iat", "nbf", "aud", "iss", "sub", "type", "token_version"]
    if configs.ENABLE_JTI_CLAIM:
        required_claims.append("jti")

    try:
        # Validate SECRET_KEY
        arg_secret_key = "SECRET_KEY"
        secret_key = configs.SECRET_KEY
        if secret_key is None:
            raise InvalidArgumentError(arg_secret_key)

        payload = jwt.decode(
            token,
            secret_key.get_secret_value(),
            algorithms=[configs.HASH_ALGORITHM],
            options={
                "verify_signature": True,
                "verify_exp": True,
                "verify_nbf": True,
                "verify_iat": True,
                "verify_aud": True,
                "verify_iss": True,
                "require": required_claims,
            },
            audience=configs.JWT_AUDIENCE,
            issuer=configs.JWT_ISSUER,
        )

        # Verify token type
        if verify_type and payload.get("type") != verify_type:
            raise InvalidTokenError

        # Verify token version
        if payload.get("token_version") != configs.TOKEN_VERSION:
            raise InvalidTokenError

        # Ensure the return type is dict[str, Any] as declared
        return dict(payload)

    except ExpiredSignatureError as exception:
        raise TokenExpiredError from exception
    except InvalidSignatureError as exception:
        raise InvalidTokenError from exception
    except InvalidAudienceError as exception:
        raise InvalidTokenError from exception
    except InvalidIssuerError as exception:
        raise InvalidTokenError from exception
    except JWTInvalidTokenError as exception:
        raise InvalidTokenError from exception

archipy.helpers.utils.jwt_utils.JWTUtils.verify_access_token(token, auth_config=None) classmethod

Verifies an access token.

Parameters:

Name Type Description Default
token str

The access token to verify.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The decoded access token payload.

Raises:

Type Description
InvalidTokenException

If the token is invalid or not an access token.

TokenExpiredException

If the token has expired.

Source code in archipy/helpers/utils/jwt_utils.py
@classmethod
def verify_access_token(cls, token: str, auth_config: AuthConfig | None = None) -> dict[str, Any]:
    """Verifies an access token.

    Args:
        token (str): The access token to verify.
        auth_config (AuthConfig | None): Optional auth configuration override.
            If not provided, uses the global config.

    Returns:
        dict[str, Any]: The decoded access token payload.

    Raises:
        InvalidTokenException: If the token is invalid or not an access token.
        TokenExpiredException: If the token has expired.
    """
    configs = auth_config or BaseConfig.global_config().AUTH
    return cls.decode_token(token, verify_type="access", auth_config=configs)

archipy.helpers.utils.jwt_utils.JWTUtils.verify_refresh_token(token, auth_config=None) classmethod

Verifies a refresh token.

Parameters:

Name Type Description Default
token str

The refresh token to verify.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The decoded refresh token payload.

Raises:

Type Description
InvalidTokenException

If the token is invalid or not a refresh token.

TokenExpiredException

If the token has expired.

Source code in archipy/helpers/utils/jwt_utils.py
@classmethod
def verify_refresh_token(cls, token: str, auth_config: AuthConfig | None = None) -> dict[str, Any]:
    """Verifies a refresh token.

    Args:
        token (str): The refresh token to verify.
        auth_config (AuthConfig | None): Optional auth configuration override.
            If not provided, uses the global config.

    Returns:
        dict[str, Any]: The decoded refresh token payload.

    Raises:
        InvalidTokenException: If the token is invalid or not a refresh token.
        TokenExpiredException: If the token has expired.
    """
    configs = auth_config or BaseConfig.global_config().AUTH
    return cls.decode_token(token, verify_type="refresh", auth_config=configs)

archipy.helpers.utils.jwt_utils.JWTUtils.extract_user_uuid(payload) staticmethod

Extracts the user UUID from the token payload.

Parameters:

Name Type Description Default
payload dict[str, Any]

The decoded token payload.

required

Returns:

Name Type Description
UUID UUID

The user's UUID.

Raises:

Type Description
InvalidTokenException

If the user identifier is invalid or missing.

Source code in archipy/helpers/utils/jwt_utils.py
@staticmethod
def extract_user_uuid(payload: dict[str, Any]) -> UUID:
    """Extracts the user UUID from the token payload.

    Args:
        payload (dict[str, Any]): The decoded token payload.

    Returns:
        UUID: The user's UUID.

    Raises:
        InvalidTokenException: If the user identifier is invalid or missing.
    """
    try:
        return UUID(payload["sub"])
    except (KeyError, ValueError) as exception:
        raise InvalidTokenError from exception

archipy.helpers.utils.jwt_utils.JWTUtils.get_token_expiry(token, auth_config=None) classmethod

Gets the token expiry timestamp.

Parameters:

Name Type Description Default
token str

The JWT token.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Name Type Description
int int

The token expiry timestamp in seconds.

Raises:

Type Description
InvalidTokenException

If the token is invalid.

Source code in archipy/helpers/utils/jwt_utils.py
@classmethod
def get_token_expiry(cls, token: str, auth_config: AuthConfig | None = None) -> int:
    """Gets the token expiry timestamp.

    Args:
        token (str): The JWT token.
        auth_config (AuthConfig | None): Optional auth configuration override.
            If not provided, uses the global config.

    Returns:
        int: The token expiry timestamp in seconds.

    Raises:
        InvalidTokenException: If the token is invalid.
    """
    payload = cls.decode_token(token, auth_config=auth_config)
    return int(payload["exp"])

options: show_root_heading: true show_source: true

password_utils

Utilities for password operations.

from archipy.helpers.utils.password_utils import PasswordUtils

# Hash password
hashed = PasswordUtils.hash_password("my-password")

# Verify password
is_valid = PasswordUtils.verify_password("my-password", hashed)

# Generate secure password
password = PasswordUtils.generate_password(length=12)

# Validate password strength
is_strong = PasswordUtils.validate_password_strength("my-password")

archipy.helpers.utils.password_utils.PasswordUtils

A utility class for handling password-related operations, such as hashing, verification, and validation.

Source code in archipy/helpers/utils/password_utils.py
class PasswordUtils:
    """A utility class for handling password-related operations, such as hashing, verification, and validation."""

    @staticmethod
    def hash_password(password: str, auth_config: AuthConfig | None = None) -> str:
        """Hashes a password using PBKDF2 with SHA256.

        Args:
            password (str): The password to hash.
            auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.

        Returns:
            str: A base64-encoded string containing the salt and hash in the format "salt:hash".
        """
        configs = auth_config or BaseConfig.global_config().AUTH
        salt = os.urandom(configs.SALT_LENGTH)
        pw_hash = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, configs.HASH_ITERATIONS)

        # Combine salt and hash, encode in base64
        return b64encode(salt + pw_hash).decode("utf-8")

    @staticmethod
    def verify_password(password: str, stored_password: str, auth_config: AuthConfig | None = None) -> bool:
        """Verifies a password against a stored hash.

        Args:
            password (str): The password to verify.
            stored_password (str): The stored password hash to compare against.
            auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.

        Returns:
            bool: True if the password matches the stored hash, False otherwise.
        """
        try:
            configs = auth_config or BaseConfig.global_config().AUTH

            # Decode the stored password
            decoded = b64decode(stored_password.encode("utf-8"))
            salt = decoded[: configs.SALT_LENGTH]
            stored_hash = decoded[configs.SALT_LENGTH :]

            # Hash the provided password with the same salt
            pw_hash = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, configs.HASH_ITERATIONS)

            # Compare in constant time to prevent timing attacks
            return hmac.compare_digest(pw_hash, stored_hash)
        except (ValueError, TypeError, IndexError):
            # Catch specific exceptions that could occur during decoding or comparison
            return False

    @staticmethod
    def validate_password(
        password: str,
        auth_config: AuthConfig | None = None,
        lang: LanguageType = LanguageType.FA,
    ) -> None:
        """Validates a password against the password policy.

        Args:
            password (str): The password to validate.
            auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.
            lang (LanguageType): The language to use for error messages. Defaults to Persian.

        Raises:
            InvalidPasswordError: If the password does not meet the policy requirements.
        """
        configs = auth_config or BaseConfig.global_config().AUTH
        errors = []

        if len(password) < configs.MIN_LENGTH:
            errors.append(f"Password must be at least {configs.MIN_LENGTH} characters long.")

        if configs.REQUIRE_DIGIT and not any(char.isdigit() for char in password):
            errors.append("Password must contain at least one digit.")

        if configs.REQUIRE_LOWERCASE and not any(char.islower() for char in password):
            errors.append("Password must contain at least one lowercase letter.")

        if configs.REQUIRE_UPPERCASE and not any(char.isupper() for char in password):
            errors.append("Password must contain at least one uppercase letter.")

        if configs.REQUIRE_SPECIAL and not any(char in configs.SPECIAL_CHARACTERS for char in password):
            errors.append(f"Password must contain at least one special character: {configs.SPECIAL_CHARACTERS}")

        if errors:
            raise InvalidPasswordError(requirements=errors, lang=lang)

    @staticmethod
    def generate_password(auth_config: AuthConfig | None = None) -> str:
        """Generates a random password that meets the policy requirements.

        Args:
            auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.

        Returns:
            str: A randomly generated password that meets the policy requirements.
        """
        configs = auth_config or BaseConfig.global_config().AUTH

        lowercase_chars = string.ascii_lowercase
        uppercase_chars = string.ascii_uppercase
        digit_chars = string.digits
        special_chars = "".join(configs.SPECIAL_CHARACTERS)

        # Initialize with required characters
        password_chars = []
        if configs.REQUIRE_LOWERCASE:
            password_chars.append(secrets.choice(lowercase_chars))
        if configs.REQUIRE_UPPERCASE:
            password_chars.append(secrets.choice(uppercase_chars))
        if configs.REQUIRE_DIGIT:
            password_chars.append(secrets.choice(digit_chars))
        if configs.REQUIRE_SPECIAL:
            password_chars.append(secrets.choice(special_chars))

        # Calculate remaining length
        remaining_length = max(0, configs.MIN_LENGTH - len(password_chars))

        # Add random characters to meet minimum length
        all_chars = lowercase_chars + uppercase_chars + digit_chars + special_chars
        password_chars.extend(secrets.choice(all_chars) for _ in range(remaining_length))

        # Shuffle the password characters
        shuffled = list(password_chars)
        secrets.SystemRandom().shuffle(shuffled)

        return "".join(shuffled)

    @classmethod
    def validate_password_history(
        cls,
        new_password: str,
        password_history: list[str],
        auth_config: AuthConfig | None = None,
        lang: LanguageType = LanguageType.FA,
    ) -> None:
        """Validates a new password against the password history.

        Args:
            new_password (str): The new password to validate.
            password_history (list[str]): A list of previous password hashes.
            auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.
            lang (LanguageType): The language to use for error messages. Defaults to Persian.

        Raises:
            InvalidPasswordError: If the new password has been used recently or does not meet the policy requirements.
        """
        configs = auth_config or BaseConfig.global_config().AUTH

        # First validate against password policy
        cls.validate_password(new_password, configs, lang)

        # Check password history
        if any(
            cls.verify_password(new_password, old_password, configs)
            for old_password in password_history[-configs.PASSWORD_HISTORY_SIZE :]
        ):
            raise InvalidPasswordError(requirements=["Password has been used recently"], lang=lang)

archipy.helpers.utils.password_utils.PasswordUtils.hash_password(password, auth_config=None) staticmethod

Hashes a password using PBKDF2 with SHA256.

Parameters:

Name Type Description Default
password str

The password to hash.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Name Type Description
str str

A base64-encoded string containing the salt and hash in the format "salt:hash".

Source code in archipy/helpers/utils/password_utils.py
@staticmethod
def hash_password(password: str, auth_config: AuthConfig | None = None) -> str:
    """Hashes a password using PBKDF2 with SHA256.

    Args:
        password (str): The password to hash.
        auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.

    Returns:
        str: A base64-encoded string containing the salt and hash in the format "salt:hash".
    """
    configs = auth_config or BaseConfig.global_config().AUTH
    salt = os.urandom(configs.SALT_LENGTH)
    pw_hash = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, configs.HASH_ITERATIONS)

    # Combine salt and hash, encode in base64
    return b64encode(salt + pw_hash).decode("utf-8")

archipy.helpers.utils.password_utils.PasswordUtils.verify_password(password, stored_password, auth_config=None) staticmethod

Verifies a password against a stored hash.

Parameters:

Name Type Description Default
password str

The password to verify.

required
stored_password str

The stored password hash to compare against.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Name Type Description
bool bool

True if the password matches the stored hash, False otherwise.

Source code in archipy/helpers/utils/password_utils.py
@staticmethod
def verify_password(password: str, stored_password: str, auth_config: AuthConfig | None = None) -> bool:
    """Verifies a password against a stored hash.

    Args:
        password (str): The password to verify.
        stored_password (str): The stored password hash to compare against.
        auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.

    Returns:
        bool: True if the password matches the stored hash, False otherwise.
    """
    try:
        configs = auth_config or BaseConfig.global_config().AUTH

        # Decode the stored password
        decoded = b64decode(stored_password.encode("utf-8"))
        salt = decoded[: configs.SALT_LENGTH]
        stored_hash = decoded[configs.SALT_LENGTH :]

        # Hash the provided password with the same salt
        pw_hash = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, configs.HASH_ITERATIONS)

        # Compare in constant time to prevent timing attacks
        return hmac.compare_digest(pw_hash, stored_hash)
    except (ValueError, TypeError, IndexError):
        # Catch specific exceptions that could occur during decoding or comparison
        return False

archipy.helpers.utils.password_utils.PasswordUtils.validate_password(password, auth_config=None, lang=LanguageType.FA) staticmethod

Validates a password against the password policy.

Parameters:

Name Type Description Default
password str

The password to validate.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None
lang LanguageType

The language to use for error messages. Defaults to Persian.

FA

Raises:

Type Description
InvalidPasswordError

If the password does not meet the policy requirements.

Source code in archipy/helpers/utils/password_utils.py
@staticmethod
def validate_password(
    password: str,
    auth_config: AuthConfig | None = None,
    lang: LanguageType = LanguageType.FA,
) -> None:
    """Validates a password against the password policy.

    Args:
        password (str): The password to validate.
        auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.
        lang (LanguageType): The language to use for error messages. Defaults to Persian.

    Raises:
        InvalidPasswordError: If the password does not meet the policy requirements.
    """
    configs = auth_config or BaseConfig.global_config().AUTH
    errors = []

    if len(password) < configs.MIN_LENGTH:
        errors.append(f"Password must be at least {configs.MIN_LENGTH} characters long.")

    if configs.REQUIRE_DIGIT and not any(char.isdigit() for char in password):
        errors.append("Password must contain at least one digit.")

    if configs.REQUIRE_LOWERCASE and not any(char.islower() for char in password):
        errors.append("Password must contain at least one lowercase letter.")

    if configs.REQUIRE_UPPERCASE and not any(char.isupper() for char in password):
        errors.append("Password must contain at least one uppercase letter.")

    if configs.REQUIRE_SPECIAL and not any(char in configs.SPECIAL_CHARACTERS for char in password):
        errors.append(f"Password must contain at least one special character: {configs.SPECIAL_CHARACTERS}")

    if errors:
        raise InvalidPasswordError(requirements=errors, lang=lang)

archipy.helpers.utils.password_utils.PasswordUtils.generate_password(auth_config=None) staticmethod

Generates a random password that meets the policy requirements.

Parameters:

Name Type Description Default
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Name Type Description
str str

A randomly generated password that meets the policy requirements.

Source code in archipy/helpers/utils/password_utils.py
@staticmethod
def generate_password(auth_config: AuthConfig | None = None) -> str:
    """Generates a random password that meets the policy requirements.

    Args:
        auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.

    Returns:
        str: A randomly generated password that meets the policy requirements.
    """
    configs = auth_config or BaseConfig.global_config().AUTH

    lowercase_chars = string.ascii_lowercase
    uppercase_chars = string.ascii_uppercase
    digit_chars = string.digits
    special_chars = "".join(configs.SPECIAL_CHARACTERS)

    # Initialize with required characters
    password_chars = []
    if configs.REQUIRE_LOWERCASE:
        password_chars.append(secrets.choice(lowercase_chars))
    if configs.REQUIRE_UPPERCASE:
        password_chars.append(secrets.choice(uppercase_chars))
    if configs.REQUIRE_DIGIT:
        password_chars.append(secrets.choice(digit_chars))
    if configs.REQUIRE_SPECIAL:
        password_chars.append(secrets.choice(special_chars))

    # Calculate remaining length
    remaining_length = max(0, configs.MIN_LENGTH - len(password_chars))

    # Add random characters to meet minimum length
    all_chars = lowercase_chars + uppercase_chars + digit_chars + special_chars
    password_chars.extend(secrets.choice(all_chars) for _ in range(remaining_length))

    # Shuffle the password characters
    shuffled = list(password_chars)
    secrets.SystemRandom().shuffle(shuffled)

    return "".join(shuffled)

archipy.helpers.utils.password_utils.PasswordUtils.validate_password_history(new_password, password_history, auth_config=None, lang=LanguageType.FA) classmethod

Validates a new password against the password history.

Parameters:

Name Type Description Default
new_password str

The new password to validate.

required
password_history list[str]

A list of previous password hashes.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None
lang LanguageType

The language to use for error messages. Defaults to Persian.

FA

Raises:

Type Description
InvalidPasswordError

If the new password has been used recently or does not meet the policy requirements.

Source code in archipy/helpers/utils/password_utils.py
@classmethod
def validate_password_history(
    cls,
    new_password: str,
    password_history: list[str],
    auth_config: AuthConfig | None = None,
    lang: LanguageType = LanguageType.FA,
) -> None:
    """Validates a new password against the password history.

    Args:
        new_password (str): The new password to validate.
        password_history (list[str]): A list of previous password hashes.
        auth_config (AuthConfig | None): Optional auth configuration override. If not provided, uses the global config.
        lang (LanguageType): The language to use for error messages. Defaults to Persian.

    Raises:
        InvalidPasswordError: If the new password has been used recently or does not meet the policy requirements.
    """
    configs = auth_config or BaseConfig.global_config().AUTH

    # First validate against password policy
    cls.validate_password(new_password, configs, lang)

    # Check password history
    if any(
        cls.verify_password(new_password, old_password, configs)
        for old_password in password_history[-configs.PASSWORD_HISTORY_SIZE :]
    ):
        raise InvalidPasswordError(requirements=["Password has been used recently"], lang=lang)

options: show_root_heading: true show_source: true

string_utils

Utilities for string operations.

from archipy.helpers.utils.string_utils import StringUtils

# Convert to slug
slug = StringUtils.slugify("My Article Title")

# Truncate string
truncated = StringUtils.truncate("Long text here", length=10)

# Generate random string
random_str = StringUtils.generate_random_string(length=8)

# Sanitize HTML
clean_html = StringUtils.sanitize_html("<script>alert('xss')</script>")

archipy.helpers.utils.string_utils.StringUtils

Bases: StringUtilsConstants

String utilities for text normalization, cleaning, and masking.

This class provides methods for handling Persian and Arabic text, including normalization, punctuation cleaning, number conversion, and masking of sensitive information like URLs, emails, and phone numbers.

Source code in archipy/helpers/utils/string_utils.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
class StringUtils(StringUtilsConstants):
    """String utilities for text normalization, cleaning, and masking.

    This class provides methods for handling Persian and Arabic text, including normalization,
    punctuation cleaning, number conversion, and masking of sensitive information like URLs,
    emails, and phone numbers.
    """

    @classmethod
    def remove_arabic_vowels(cls, text: str) -> str:
        """Removes Arabic vowels (tashkeel) from the text.

        Args:
            text (str): The input text containing Arabic vowels.

        Returns:
            str: The text with Arabic vowels removed.
        """
        return text.translate(cls.arabic_vowel_translate_table)

    @classmethod
    def normalize_persian_chars(cls, text: str) -> str:
        """Normalizes Persian characters to their standard forms.

        Args:
            text (str): The input text containing Persian characters.

        Returns:
            str: The text with Persian characters normalized.
        """
        text = text.translate(cls.alphabet_akoolad_alef_translate_table)
        text = text.translate(cls.alphabet_alef_translate_table)
        text = text.translate(cls.alphabet_be_translate_table)
        text = text.translate(cls.alphabet_pe_translate_table)
        text = text.translate(cls.alphabet_te_translate_table)
        text = text.translate(cls.alphabet_se_translate_table)
        text = text.translate(cls.alphabet_jim_translate_table)
        text = text.translate(cls.alphabet_che_translate_table)
        text = text.translate(cls.alphabet_he_translate_table)
        text = text.translate(cls.alphabet_khe_translate_table)
        text = text.translate(cls.alphabet_dal_translate_table)
        text = text.translate(cls.alphabet_zal_translate_table)
        text = text.translate(cls.alphabet_re_translate_table)
        text = text.translate(cls.alphabet_ze_translate_table)
        text = text.translate(cls.alphabet_zhe_translate_table)
        text = text.translate(cls.alphabet_sin_translate_table)
        text = text.translate(cls.alphabet_shin_translate_table)
        text = text.translate(cls.alphabet_sad_translate_table)
        text = text.translate(cls.alphabet_zad_translate_table)
        text = text.translate(cls.alphabet_ta_translate_table)
        text = text.translate(cls.alphabet_za_translate_table)
        text = text.translate(cls.alphabet_eyn_translate_table)
        text = text.translate(cls.alphabet_gheyn_translate_table)
        text = text.translate(cls.alphabet_fe_translate_table)
        text = text.translate(cls.alphabet_ghaf_translate_table)
        text = text.translate(cls.alphabet_kaf_translate_table)
        text = text.translate(cls.alphabet_gaf_translate_table)
        text = text.translate(cls.alphabet_lam_translate_table)
        text = text.translate(cls.alphabet_mim_translate_table)
        text = text.translate(cls.alphabet_nun_translate_table)
        text = text.translate(cls.alphabet_vav_translate_table)
        text = text.translate(cls.alphabet_ha_translate_table)
        return text.translate(cls.alphabet_ye_translate_table)

    @classmethod
    def normalize_punctuation(cls, text: str) -> str:
        """Normalizes punctuation marks in the text.

        Args:
            text (str): The input text containing punctuation marks.

        Returns:
            str: The text with punctuation marks normalized.
        """
        text = text.translate(cls.punctuation_translate_table1)
        text = text.translate(cls.punctuation_translate_table2)
        text = text.translate(cls.punctuation_translate_table3)
        text = text.translate(cls.punctuation_translate_table4)
        text = text.translate(cls.punctuation_translate_table5)
        text = text.translate(cls.punctuation_translate_table6)
        text = text.translate(cls.punctuation_translate_table7)
        text = text.translate(cls.punctuation_translate_table8)
        text = text.translate(cls.punctuation_translate_table9)
        text = text.translate(cls.punctuation_translate_table10)
        text = text.translate(cls.punctuation_translate_table11)
        text = text.translate(cls.punctuation_translate_table12)
        return text.translate(cls.punctuation_translate_table13)

    @classmethod
    def normalize_numbers(cls, text: str) -> str:
        """Normalizes numbers in the text to English format.

        Args:
            text (str): The input text containing numbers.

        Returns:
            str: The text with numbers normalized to English format.
        """
        text = text.translate(cls.number_zero_translate_table)
        text = text.translate(cls.number_one_translate_table)
        text = text.translate(cls.number_two_translate_table)
        text = text.translate(cls.number_three_translate_table)
        text = text.translate(cls.number_four_translate_table)
        text = text.translate(cls.number_five_translate_table)
        text = text.translate(cls.number_six_translate_table)
        text = text.translate(cls.number_seven_translate_table)
        text = text.translate(cls.number_eight_translate_table)
        return text.translate(cls.number_nine_translate_table)

    @classmethod
    def clean_spacing(cls, text: str) -> str:
        """Cleans up spacing issues in the text, such as non-breaking spaces and zero-width non-joiners.

        Args:
            text (str): The input text with spacing issues.

        Returns:
            str: The text with spacing cleaned up.
        """
        text = text.replace("\u200c", " ")  # ZWNJ
        text = text.replace("\xa0", " ")  # NBSP

        for pattern, repl in cls.character_refinement_patterns:
            text = pattern.sub(repl, text)

        return text

    @classmethod
    def normalize_punctuation_spacing(cls, text: str) -> str:
        """Applies proper spacing around punctuation marks.

        Args:
            text (str): The input text with punctuation spacing issues.

        Returns:
            str: The text with proper spacing around punctuation marks.
        """
        for pattern, repl in cls.punctuation_spacing_patterns:
            text = pattern.sub(repl, text)
        return text

    @classmethod
    def remove_punctuation_marks(cls, text: str) -> str:
        """Removes punctuation marks from the text.

        Args:
            text (str): The input text containing punctuation marks.

        Returns:
            str: The text with punctuation marks removed.
        """
        return text.translate(cls.punctuation_persian_marks_to_space_translate_table)

    @classmethod
    def mask_urls(cls, text: str, mask: str | None = None) -> str:
        """Masks URLs in the text with a specified mask.

        Args:
            text (str): The input text containing URLs.
            mask (str | None): The mask to replace URLs with. Defaults to "MASK_URL".

        Returns:
            str: The text with URLs masked.
        """
        mask = mask or "MASK_URL"
        return compile(r"https?://\S+|www\.\S+").sub(f" {mask} ", text)

    @classmethod
    def mask_emails(cls, text: str, mask: str | None = None) -> str:
        """Masks email addresses in the text with a specified mask.

        Args:
            text (str): The input text containing email addresses.
            mask (str | None): The mask to replace emails with. Defaults to "MASK_EMAIL".

        Returns:
            str: The text with email addresses masked.
        """
        mask = mask or "MASK_EMAIL"
        return compile(r"\S+@\S+\.\S+").sub(f" {mask} ", text)

    @classmethod
    def mask_phones(cls, text: str, mask: str | None = None) -> str:
        """Masks phone numbers in the text with a specified mask.

        Args:
            text (str): The input text containing phone numbers.
            mask (str | None): The mask to replace phone numbers with. Defaults to "MASK_PHONE".

        Returns:
            str: The text with phone numbers masked.
        """
        mask = mask or "MASK_PHONE"
        return compile(r"(?:\+98|0)?(?:\d{3}\s*?\d{3}\s*?\d{4})").sub(f" {mask} ", text)

    @classmethod
    def convert_english_number_to_persian(cls, text: str) -> str:
        """Converts English numbers to Persian numbers in the text.

        Args:
            text (str): The input text containing English numbers.

        Returns:
            str: The text with English numbers converted to Persian numbers.
        """
        table = {
            48: 1776,  # 0
            49: 1777,  # 1
            50: 1778,  # 2
            51: 1779,  # 3
            52: 1780,  # 4
            53: 1781,  # 5
            54: 1782,  # 6
            55: 1783,  # 7
            56: 1784,  # 8
            57: 1785,  # 9
            44: 1548,  # ,
        }
        return text.translate(table)

    @classmethod
    def convert_numbers_to_english(cls, text: str) -> str:
        """Converts Persian/Arabic numbers to English numbers in the text.

        Args:
            text (str): The input text containing Persian/Arabic numbers.

        Returns:
            str: The text with Persian/Arabic numbers converted to English numbers.
        """
        table = {
            1776: 48,  # 0
            1777: 49,  # 1
            1778: 50,  # 2
            1779: 51,  # 3
            1780: 52,  # 4
            1781: 53,  # 5
            1782: 54,  # 6
            1783: 55,  # 7
            1784: 56,  # 8
            1785: 57,  # 9
            1632: 48,  # 0
            1633: 49,  # 1
            1634: 50,  # 2
            1635: 51,  # 3
            1636: 52,  # 4
            1637: 53,  # 5
            1638: 54,  # 6
            1639: 55,  # 7
            1640: 56,  # 8
            1641: 57,  # 9
        }
        return text.translate(table)

    @classmethod
    def convert_add_3digit_delimiter(cls, value: int) -> str:
        """Adds thousand separators to numbers.

        Args:
            value (int): The number to format.

        Returns:
            str: The formatted number with thousand separators.
        """
        return f"{value:,}" if isinstance(value, int) else value

    @classmethod
    def remove_emoji(cls, text: str) -> str:
        """Removes emoji characters from the text.

        Args:
            text (str): The input text containing emojis.

        Returns:
            str: The text with emojis removed.
        """
        emoji_pattern = re.compile(
            r"["
            r"\U0001F600-\U0001F64F"  # emoticons
            r"\U0001F300-\U0001F5FF"  # symbols & pictographs
            r"\U0001F680-\U0001F6FF"  # transport & map symbols
            r"\U0001F1E0-\U0001F1FF"  # flags
            r"\U0001F900-\U0001F9FF"  # supplemental symbols and pictographs
            r"\U0001FA00-\U0001FA6F"  # symbols and pictographs extended-A
            r"\U00002600-\U000026FF"  # miscellaneous symbols (some are emojis)
            r"\U00002700-\U000027BF"  # dingbats (some are emojis)
            r"\U00002190-\U000021FF"  # arrows (some are emojis)
            r"]+",
            re.UNICODE,
        )
        return emoji_pattern.sub(r"", text)

    @classmethod
    def replace_currencies_with_mask(cls, text: str, mask: str | None = None) -> str:
        """Masks currency symbols and amounts in the text.

        Args:
            text (str): The input text containing currency symbols and amounts.
            mask (str | None): The mask to replace currencies with. Defaults to "MASK_CURRENCIES".

        Returns:
            str: The text with currency symbols and amounts masked.
        """
        mask = mask or "MASK_CURRENCIES"
        currency_pattern = compile(r"(\\|zł|£|\$|₡|₦|¥|₩|₪|₫|€|₱|₲|₴|₹|﷼)+")
        return currency_pattern.sub(f" {mask} ", text)

    @classmethod
    def replace_numbers_with_mask(cls, text: str, mask: str | None = None) -> str:
        """Masks numbers in the text.

        Args:
            text (str): The input text containing numbers.
            mask (str | None): The mask to replace numbers with. Defaults to "MASK_NUMBERS".

        Returns:
            str: The text with numbers masked.
        """
        mask = mask or "MASK_NUMBERS"
        numbers = re.findall("[0-9]+", text)
        for number in sorted(numbers, key=len, reverse=True):
            text = text.replace(number, f" {mask} ")
        return text

    @classmethod
    def is_string_none_or_empty(cls, text: str) -> bool:
        """Checks if a string is `None` or empty (after stripping whitespace).

        Args:
            text (str): The input string to check.

        Returns:
            bool: `True` if the string is `None` or empty, `False` otherwise.
        """
        return text is None or isinstance(text, str) and not text.strip()

    @classmethod
    def normalize_persian_text(
        cls,
        text: str,
        *,
        remove_vowels: bool = True,
        normalize_punctuation: bool = True,
        normalize_numbers: bool = True,
        normalize_persian_chars: bool = True,
        mask_urls: bool = False,
        mask_emails: bool = False,
        mask_phones: bool = False,
        mask_currencies: bool = False,
        mask_all_numbers: bool = False,
        remove_emojis: bool = False,
        url_mask: str | None = None,
        email_mask: str | None = None,
        phone_mask: str | None = None,
        currency_mask: str | None = None,
        number_mask: str | None = None,
        clean_spacing: bool = True,
        remove_punctuation: bool = False,
        normalize_punctuation_spacing: bool = False,
    ) -> str:
        """Normalizes Persian text with configurable options.

        Args:
            text (str): The input text to normalize.
            remove_vowels (bool): Whether to remove Arabic vowels. Defaults to `True`.
            normalize_punctuation (bool): Whether to normalize punctuation marks. Defaults to `True`.
            normalize_numbers (bool): Whether to normalize numbers to English format. Defaults to `True`.
            normalize_persian_chars (bool): Whether to normalize Persian characters. Defaults to `True`.
            mask_urls (bool): Whether to mask URLs. Defaults to `False`.
            mask_emails (bool): Whether to mask email addresses. Defaults to `False`.
            mask_phones (bool): Whether to mask phone numbers. Defaults to `False`.
            mask_currencies (bool): Whether to mask currency symbols and amounts. Defaults to `False`.
            mask_all_numbers (bool): Whether to mask all numbers. Defaults to `False`.
            remove_emojis (bool): Whether to remove emojis. Defaults to `False`.
            url_mask (str | None): The mask to replace URLs with. Defaults to `None`.
            email_mask (str | None): The mask to replace email addresses with. Defaults to `None`.
            phone_mask (str | None): The mask to replace phone numbers with. Defaults to `None`.
            currency_mask (str | None): The mask to replace currency symbols and amounts with. Defaults to `None`.
            number_mask (str | None): The mask to replace numbers with. Defaults to `None`.
            clean_spacing (bool): Whether to clean up spacing issues. Defaults to `True`.
            remove_punctuation (bool): Whether to remove punctuation marks. Defaults to `False`.
            normalize_punctuation_spacing (bool): Whether to apply proper spacing around punctuation marks. Defaults to `False`.

        Returns:
            str: The normalized text.
        """
        if not text:
            return text

        # Remove emojis if requested
        if remove_emojis:
            text = cls.remove_emoji(text)

        # Apply normalizations
        if remove_vowels:
            text = cls.remove_arabic_vowels(text)
        if normalize_persian_chars:
            text = cls.normalize_persian_chars(text)
        if normalize_punctuation:
            text = cls.normalize_punctuation(text)
        if remove_punctuation:
            text = cls.remove_punctuation_marks(text)
        if normalize_numbers:
            text = cls.normalize_numbers(text)

        # Apply masking
        if mask_urls:
            text = cls.mask_urls(text, mask=url_mask)
        if mask_emails:
            text = cls.mask_emails(text, mask=email_mask)
        if mask_phones:
            text = cls.mask_phones(text, mask=phone_mask)
        if mask_currencies:
            text = cls.replace_currencies_with_mask(text, mask=currency_mask)
        if mask_all_numbers:
            text = cls.replace_numbers_with_mask(text, mask=number_mask)

        if clean_spacing:
            text = cls.clean_spacing(text)
        if normalize_punctuation_spacing:
            text = cls.normalize_punctuation_spacing(text)

        return text.strip()

    @classmethod
    def snake_to_camel_case(cls, text: str) -> str:
        """Converts snake_case to camelCase.

        Args:
            text (str): The input text in snake_case format.

        Returns:
            str: The text converted to camelCase format.
        """
        if cls.is_string_none_or_empty(text):
            return text

        components = text.split("_")
        # First component remains lowercase, the rest get capitalized
        return components[0] + "".join(x.title() for x in components[1:])

    @classmethod
    def camel_to_snake_case(cls, text: str) -> str:
        """Converts camelCase to snake_case.

        Args:
            text (str): The input text in camelCase format.

        Returns:
            str: The text converted to snake_case format.
        """
        if cls.is_string_none_or_empty(text):
            return text

        # Add underscore before each capital letter and convert to lowercase
        s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text)
        return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()

archipy.helpers.utils.string_utils.StringUtils.remove_arabic_vowels(text) classmethod

Removes Arabic vowels (tashkeel) from the text.

Parameters:

Name Type Description Default
text str

The input text containing Arabic vowels.

required

Returns:

Name Type Description
str str

The text with Arabic vowels removed.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def remove_arabic_vowels(cls, text: str) -> str:
    """Removes Arabic vowels (tashkeel) from the text.

    Args:
        text (str): The input text containing Arabic vowels.

    Returns:
        str: The text with Arabic vowels removed.
    """
    return text.translate(cls.arabic_vowel_translate_table)

archipy.helpers.utils.string_utils.StringUtils.normalize_persian_chars(text) classmethod

Normalizes Persian characters to their standard forms.

Parameters:

Name Type Description Default
text str

The input text containing Persian characters.

required

Returns:

Name Type Description
str str

The text with Persian characters normalized.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def normalize_persian_chars(cls, text: str) -> str:
    """Normalizes Persian characters to their standard forms.

    Args:
        text (str): The input text containing Persian characters.

    Returns:
        str: The text with Persian characters normalized.
    """
    text = text.translate(cls.alphabet_akoolad_alef_translate_table)
    text = text.translate(cls.alphabet_alef_translate_table)
    text = text.translate(cls.alphabet_be_translate_table)
    text = text.translate(cls.alphabet_pe_translate_table)
    text = text.translate(cls.alphabet_te_translate_table)
    text = text.translate(cls.alphabet_se_translate_table)
    text = text.translate(cls.alphabet_jim_translate_table)
    text = text.translate(cls.alphabet_che_translate_table)
    text = text.translate(cls.alphabet_he_translate_table)
    text = text.translate(cls.alphabet_khe_translate_table)
    text = text.translate(cls.alphabet_dal_translate_table)
    text = text.translate(cls.alphabet_zal_translate_table)
    text = text.translate(cls.alphabet_re_translate_table)
    text = text.translate(cls.alphabet_ze_translate_table)
    text = text.translate(cls.alphabet_zhe_translate_table)
    text = text.translate(cls.alphabet_sin_translate_table)
    text = text.translate(cls.alphabet_shin_translate_table)
    text = text.translate(cls.alphabet_sad_translate_table)
    text = text.translate(cls.alphabet_zad_translate_table)
    text = text.translate(cls.alphabet_ta_translate_table)
    text = text.translate(cls.alphabet_za_translate_table)
    text = text.translate(cls.alphabet_eyn_translate_table)
    text = text.translate(cls.alphabet_gheyn_translate_table)
    text = text.translate(cls.alphabet_fe_translate_table)
    text = text.translate(cls.alphabet_ghaf_translate_table)
    text = text.translate(cls.alphabet_kaf_translate_table)
    text = text.translate(cls.alphabet_gaf_translate_table)
    text = text.translate(cls.alphabet_lam_translate_table)
    text = text.translate(cls.alphabet_mim_translate_table)
    text = text.translate(cls.alphabet_nun_translate_table)
    text = text.translate(cls.alphabet_vav_translate_table)
    text = text.translate(cls.alphabet_ha_translate_table)
    return text.translate(cls.alphabet_ye_translate_table)

archipy.helpers.utils.string_utils.StringUtils.normalize_punctuation(text) classmethod

Normalizes punctuation marks in the text.

Parameters:

Name Type Description Default
text str

The input text containing punctuation marks.

required

Returns:

Name Type Description
str str

The text with punctuation marks normalized.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def normalize_punctuation(cls, text: str) -> str:
    """Normalizes punctuation marks in the text.

    Args:
        text (str): The input text containing punctuation marks.

    Returns:
        str: The text with punctuation marks normalized.
    """
    text = text.translate(cls.punctuation_translate_table1)
    text = text.translate(cls.punctuation_translate_table2)
    text = text.translate(cls.punctuation_translate_table3)
    text = text.translate(cls.punctuation_translate_table4)
    text = text.translate(cls.punctuation_translate_table5)
    text = text.translate(cls.punctuation_translate_table6)
    text = text.translate(cls.punctuation_translate_table7)
    text = text.translate(cls.punctuation_translate_table8)
    text = text.translate(cls.punctuation_translate_table9)
    text = text.translate(cls.punctuation_translate_table10)
    text = text.translate(cls.punctuation_translate_table11)
    text = text.translate(cls.punctuation_translate_table12)
    return text.translate(cls.punctuation_translate_table13)

archipy.helpers.utils.string_utils.StringUtils.normalize_numbers(text) classmethod

Normalizes numbers in the text to English format.

Parameters:

Name Type Description Default
text str

The input text containing numbers.

required

Returns:

Name Type Description
str str

The text with numbers normalized to English format.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def normalize_numbers(cls, text: str) -> str:
    """Normalizes numbers in the text to English format.

    Args:
        text (str): The input text containing numbers.

    Returns:
        str: The text with numbers normalized to English format.
    """
    text = text.translate(cls.number_zero_translate_table)
    text = text.translate(cls.number_one_translate_table)
    text = text.translate(cls.number_two_translate_table)
    text = text.translate(cls.number_three_translate_table)
    text = text.translate(cls.number_four_translate_table)
    text = text.translate(cls.number_five_translate_table)
    text = text.translate(cls.number_six_translate_table)
    text = text.translate(cls.number_seven_translate_table)
    text = text.translate(cls.number_eight_translate_table)
    return text.translate(cls.number_nine_translate_table)

archipy.helpers.utils.string_utils.StringUtils.clean_spacing(text) classmethod

Cleans up spacing issues in the text, such as non-breaking spaces and zero-width non-joiners.

Parameters:

Name Type Description Default
text str

The input text with spacing issues.

required

Returns:

Name Type Description
str str

The text with spacing cleaned up.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def clean_spacing(cls, text: str) -> str:
    """Cleans up spacing issues in the text, such as non-breaking spaces and zero-width non-joiners.

    Args:
        text (str): The input text with spacing issues.

    Returns:
        str: The text with spacing cleaned up.
    """
    text = text.replace("\u200c", " ")  # ZWNJ
    text = text.replace("\xa0", " ")  # NBSP

    for pattern, repl in cls.character_refinement_patterns:
        text = pattern.sub(repl, text)

    return text

archipy.helpers.utils.string_utils.StringUtils.normalize_punctuation_spacing(text) classmethod

Applies proper spacing around punctuation marks.

Parameters:

Name Type Description Default
text str

The input text with punctuation spacing issues.

required

Returns:

Name Type Description
str str

The text with proper spacing around punctuation marks.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def normalize_punctuation_spacing(cls, text: str) -> str:
    """Applies proper spacing around punctuation marks.

    Args:
        text (str): The input text with punctuation spacing issues.

    Returns:
        str: The text with proper spacing around punctuation marks.
    """
    for pattern, repl in cls.punctuation_spacing_patterns:
        text = pattern.sub(repl, text)
    return text

archipy.helpers.utils.string_utils.StringUtils.remove_punctuation_marks(text) classmethod

Removes punctuation marks from the text.

Parameters:

Name Type Description Default
text str

The input text containing punctuation marks.

required

Returns:

Name Type Description
str str

The text with punctuation marks removed.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def remove_punctuation_marks(cls, text: str) -> str:
    """Removes punctuation marks from the text.

    Args:
        text (str): The input text containing punctuation marks.

    Returns:
        str: The text with punctuation marks removed.
    """
    return text.translate(cls.punctuation_persian_marks_to_space_translate_table)

archipy.helpers.utils.string_utils.StringUtils.mask_urls(text, mask=None) classmethod

Masks URLs in the text with a specified mask.

Parameters:

Name Type Description Default
text str

The input text containing URLs.

required
mask str | None

The mask to replace URLs with. Defaults to "MASK_URL".

None

Returns:

Name Type Description
str str

The text with URLs masked.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def mask_urls(cls, text: str, mask: str | None = None) -> str:
    """Masks URLs in the text with a specified mask.

    Args:
        text (str): The input text containing URLs.
        mask (str | None): The mask to replace URLs with. Defaults to "MASK_URL".

    Returns:
        str: The text with URLs masked.
    """
    mask = mask or "MASK_URL"
    return compile(r"https?://\S+|www\.\S+").sub(f" {mask} ", text)

archipy.helpers.utils.string_utils.StringUtils.mask_emails(text, mask=None) classmethod

Masks email addresses in the text with a specified mask.

Parameters:

Name Type Description Default
text str

The input text containing email addresses.

required
mask str | None

The mask to replace emails with. Defaults to "MASK_EMAIL".

None

Returns:

Name Type Description
str str

The text with email addresses masked.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def mask_emails(cls, text: str, mask: str | None = None) -> str:
    """Masks email addresses in the text with a specified mask.

    Args:
        text (str): The input text containing email addresses.
        mask (str | None): The mask to replace emails with. Defaults to "MASK_EMAIL".

    Returns:
        str: The text with email addresses masked.
    """
    mask = mask or "MASK_EMAIL"
    return compile(r"\S+@\S+\.\S+").sub(f" {mask} ", text)

archipy.helpers.utils.string_utils.StringUtils.mask_phones(text, mask=None) classmethod

Masks phone numbers in the text with a specified mask.

Parameters:

Name Type Description Default
text str

The input text containing phone numbers.

required
mask str | None

The mask to replace phone numbers with. Defaults to "MASK_PHONE".

None

Returns:

Name Type Description
str str

The text with phone numbers masked.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def mask_phones(cls, text: str, mask: str | None = None) -> str:
    """Masks phone numbers in the text with a specified mask.

    Args:
        text (str): The input text containing phone numbers.
        mask (str | None): The mask to replace phone numbers with. Defaults to "MASK_PHONE".

    Returns:
        str: The text with phone numbers masked.
    """
    mask = mask or "MASK_PHONE"
    return compile(r"(?:\+98|0)?(?:\d{3}\s*?\d{3}\s*?\d{4})").sub(f" {mask} ", text)

archipy.helpers.utils.string_utils.StringUtils.convert_english_number_to_persian(text) classmethod

Converts English numbers to Persian numbers in the text.

Parameters:

Name Type Description Default
text str

The input text containing English numbers.

required

Returns:

Name Type Description
str str

The text with English numbers converted to Persian numbers.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def convert_english_number_to_persian(cls, text: str) -> str:
    """Converts English numbers to Persian numbers in the text.

    Args:
        text (str): The input text containing English numbers.

    Returns:
        str: The text with English numbers converted to Persian numbers.
    """
    table = {
        48: 1776,  # 0
        49: 1777,  # 1
        50: 1778,  # 2
        51: 1779,  # 3
        52: 1780,  # 4
        53: 1781,  # 5
        54: 1782,  # 6
        55: 1783,  # 7
        56: 1784,  # 8
        57: 1785,  # 9
        44: 1548,  # ,
    }
    return text.translate(table)

archipy.helpers.utils.string_utils.StringUtils.convert_numbers_to_english(text) classmethod

Converts Persian/Arabic numbers to English numbers in the text.

Parameters:

Name Type Description Default
text str

The input text containing Persian/Arabic numbers.

required

Returns:

Name Type Description
str str

The text with Persian/Arabic numbers converted to English numbers.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def convert_numbers_to_english(cls, text: str) -> str:
    """Converts Persian/Arabic numbers to English numbers in the text.

    Args:
        text (str): The input text containing Persian/Arabic numbers.

    Returns:
        str: The text with Persian/Arabic numbers converted to English numbers.
    """
    table = {
        1776: 48,  # 0
        1777: 49,  # 1
        1778: 50,  # 2
        1779: 51,  # 3
        1780: 52,  # 4
        1781: 53,  # 5
        1782: 54,  # 6
        1783: 55,  # 7
        1784: 56,  # 8
        1785: 57,  # 9
        1632: 48,  # 0
        1633: 49,  # 1
        1634: 50,  # 2
        1635: 51,  # 3
        1636: 52,  # 4
        1637: 53,  # 5
        1638: 54,  # 6
        1639: 55,  # 7
        1640: 56,  # 8
        1641: 57,  # 9
    }
    return text.translate(table)

archipy.helpers.utils.string_utils.StringUtils.convert_add_3digit_delimiter(value) classmethod

Adds thousand separators to numbers.

Parameters:

Name Type Description Default
value int

The number to format.

required

Returns:

Name Type Description
str str

The formatted number with thousand separators.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def convert_add_3digit_delimiter(cls, value: int) -> str:
    """Adds thousand separators to numbers.

    Args:
        value (int): The number to format.

    Returns:
        str: The formatted number with thousand separators.
    """
    return f"{value:,}" if isinstance(value, int) else value

archipy.helpers.utils.string_utils.StringUtils.remove_emoji(text) classmethod

Removes emoji characters from the text.

Parameters:

Name Type Description Default
text str

The input text containing emojis.

required

Returns:

Name Type Description
str str

The text with emojis removed.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def remove_emoji(cls, text: str) -> str:
    """Removes emoji characters from the text.

    Args:
        text (str): The input text containing emojis.

    Returns:
        str: The text with emojis removed.
    """
    emoji_pattern = re.compile(
        r"["
        r"\U0001F600-\U0001F64F"  # emoticons
        r"\U0001F300-\U0001F5FF"  # symbols & pictographs
        r"\U0001F680-\U0001F6FF"  # transport & map symbols
        r"\U0001F1E0-\U0001F1FF"  # flags
        r"\U0001F900-\U0001F9FF"  # supplemental symbols and pictographs
        r"\U0001FA00-\U0001FA6F"  # symbols and pictographs extended-A
        r"\U00002600-\U000026FF"  # miscellaneous symbols (some are emojis)
        r"\U00002700-\U000027BF"  # dingbats (some are emojis)
        r"\U00002190-\U000021FF"  # arrows (some are emojis)
        r"]+",
        re.UNICODE,
    )
    return emoji_pattern.sub(r"", text)

archipy.helpers.utils.string_utils.StringUtils.replace_currencies_with_mask(text, mask=None) classmethod

Masks currency symbols and amounts in the text.

Parameters:

Name Type Description Default
text str

The input text containing currency symbols and amounts.

required
mask str | None

The mask to replace currencies with. Defaults to "MASK_CURRENCIES".

None

Returns:

Name Type Description
str str

The text with currency symbols and amounts masked.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def replace_currencies_with_mask(cls, text: str, mask: str | None = None) -> str:
    """Masks currency symbols and amounts in the text.

    Args:
        text (str): The input text containing currency symbols and amounts.
        mask (str | None): The mask to replace currencies with. Defaults to "MASK_CURRENCIES".

    Returns:
        str: The text with currency symbols and amounts masked.
    """
    mask = mask or "MASK_CURRENCIES"
    currency_pattern = compile(r"(\\|zł|£|\$|₡|₦|¥|₩|₪|₫|€|₱|₲|₴|₹|﷼)+")
    return currency_pattern.sub(f" {mask} ", text)

archipy.helpers.utils.string_utils.StringUtils.replace_numbers_with_mask(text, mask=None) classmethod

Masks numbers in the text.

Parameters:

Name Type Description Default
text str

The input text containing numbers.

required
mask str | None

The mask to replace numbers with. Defaults to "MASK_NUMBERS".

None

Returns:

Name Type Description
str str

The text with numbers masked.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def replace_numbers_with_mask(cls, text: str, mask: str | None = None) -> str:
    """Masks numbers in the text.

    Args:
        text (str): The input text containing numbers.
        mask (str | None): The mask to replace numbers with. Defaults to "MASK_NUMBERS".

    Returns:
        str: The text with numbers masked.
    """
    mask = mask or "MASK_NUMBERS"
    numbers = re.findall("[0-9]+", text)
    for number in sorted(numbers, key=len, reverse=True):
        text = text.replace(number, f" {mask} ")
    return text

archipy.helpers.utils.string_utils.StringUtils.is_string_none_or_empty(text) classmethod

Checks if a string is None or empty (after stripping whitespace).

Parameters:

Name Type Description Default
text str

The input string to check.

required

Returns:

Name Type Description
bool bool

True if the string is None or empty, False otherwise.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def is_string_none_or_empty(cls, text: str) -> bool:
    """Checks if a string is `None` or empty (after stripping whitespace).

    Args:
        text (str): The input string to check.

    Returns:
        bool: `True` if the string is `None` or empty, `False` otherwise.
    """
    return text is None or isinstance(text, str) and not text.strip()

archipy.helpers.utils.string_utils.StringUtils.normalize_persian_text(text, *, remove_vowels=True, normalize_punctuation=True, normalize_numbers=True, normalize_persian_chars=True, mask_urls=False, mask_emails=False, mask_phones=False, mask_currencies=False, mask_all_numbers=False, remove_emojis=False, url_mask=None, email_mask=None, phone_mask=None, currency_mask=None, number_mask=None, clean_spacing=True, remove_punctuation=False, normalize_punctuation_spacing=False) classmethod

Normalizes Persian text with configurable options.

Parameters:

Name Type Description Default
text str

The input text to normalize.

required
remove_vowels bool

Whether to remove Arabic vowels. Defaults to True.

True
normalize_punctuation bool

Whether to normalize punctuation marks. Defaults to True.

True
normalize_numbers bool

Whether to normalize numbers to English format. Defaults to True.

True
normalize_persian_chars bool

Whether to normalize Persian characters. Defaults to True.

True
mask_urls bool

Whether to mask URLs. Defaults to False.

False
mask_emails bool

Whether to mask email addresses. Defaults to False.

False
mask_phones bool

Whether to mask phone numbers. Defaults to False.

False
mask_currencies bool

Whether to mask currency symbols and amounts. Defaults to False.

False
mask_all_numbers bool

Whether to mask all numbers. Defaults to False.

False
remove_emojis bool

Whether to remove emojis. Defaults to False.

False
url_mask str | None

The mask to replace URLs with. Defaults to None.

None
email_mask str | None

The mask to replace email addresses with. Defaults to None.

None
phone_mask str | None

The mask to replace phone numbers with. Defaults to None.

None
currency_mask str | None

The mask to replace currency symbols and amounts with. Defaults to None.

None
number_mask str | None

The mask to replace numbers with. Defaults to None.

None
clean_spacing bool

Whether to clean up spacing issues. Defaults to True.

True
remove_punctuation bool

Whether to remove punctuation marks. Defaults to False.

False
normalize_punctuation_spacing bool

Whether to apply proper spacing around punctuation marks. Defaults to False.

False

Returns:

Name Type Description
str str

The normalized text.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def normalize_persian_text(
    cls,
    text: str,
    *,
    remove_vowels: bool = True,
    normalize_punctuation: bool = True,
    normalize_numbers: bool = True,
    normalize_persian_chars: bool = True,
    mask_urls: bool = False,
    mask_emails: bool = False,
    mask_phones: bool = False,
    mask_currencies: bool = False,
    mask_all_numbers: bool = False,
    remove_emojis: bool = False,
    url_mask: str | None = None,
    email_mask: str | None = None,
    phone_mask: str | None = None,
    currency_mask: str | None = None,
    number_mask: str | None = None,
    clean_spacing: bool = True,
    remove_punctuation: bool = False,
    normalize_punctuation_spacing: bool = False,
) -> str:
    """Normalizes Persian text with configurable options.

    Args:
        text (str): The input text to normalize.
        remove_vowels (bool): Whether to remove Arabic vowels. Defaults to `True`.
        normalize_punctuation (bool): Whether to normalize punctuation marks. Defaults to `True`.
        normalize_numbers (bool): Whether to normalize numbers to English format. Defaults to `True`.
        normalize_persian_chars (bool): Whether to normalize Persian characters. Defaults to `True`.
        mask_urls (bool): Whether to mask URLs. Defaults to `False`.
        mask_emails (bool): Whether to mask email addresses. Defaults to `False`.
        mask_phones (bool): Whether to mask phone numbers. Defaults to `False`.
        mask_currencies (bool): Whether to mask currency symbols and amounts. Defaults to `False`.
        mask_all_numbers (bool): Whether to mask all numbers. Defaults to `False`.
        remove_emojis (bool): Whether to remove emojis. Defaults to `False`.
        url_mask (str | None): The mask to replace URLs with. Defaults to `None`.
        email_mask (str | None): The mask to replace email addresses with. Defaults to `None`.
        phone_mask (str | None): The mask to replace phone numbers with. Defaults to `None`.
        currency_mask (str | None): The mask to replace currency symbols and amounts with. Defaults to `None`.
        number_mask (str | None): The mask to replace numbers with. Defaults to `None`.
        clean_spacing (bool): Whether to clean up spacing issues. Defaults to `True`.
        remove_punctuation (bool): Whether to remove punctuation marks. Defaults to `False`.
        normalize_punctuation_spacing (bool): Whether to apply proper spacing around punctuation marks. Defaults to `False`.

    Returns:
        str: The normalized text.
    """
    if not text:
        return text

    # Remove emojis if requested
    if remove_emojis:
        text = cls.remove_emoji(text)

    # Apply normalizations
    if remove_vowels:
        text = cls.remove_arabic_vowels(text)
    if normalize_persian_chars:
        text = cls.normalize_persian_chars(text)
    if normalize_punctuation:
        text = cls.normalize_punctuation(text)
    if remove_punctuation:
        text = cls.remove_punctuation_marks(text)
    if normalize_numbers:
        text = cls.normalize_numbers(text)

    # Apply masking
    if mask_urls:
        text = cls.mask_urls(text, mask=url_mask)
    if mask_emails:
        text = cls.mask_emails(text, mask=email_mask)
    if mask_phones:
        text = cls.mask_phones(text, mask=phone_mask)
    if mask_currencies:
        text = cls.replace_currencies_with_mask(text, mask=currency_mask)
    if mask_all_numbers:
        text = cls.replace_numbers_with_mask(text, mask=number_mask)

    if clean_spacing:
        text = cls.clean_spacing(text)
    if normalize_punctuation_spacing:
        text = cls.normalize_punctuation_spacing(text)

    return text.strip()

archipy.helpers.utils.string_utils.StringUtils.snake_to_camel_case(text) classmethod

Converts snake_case to camelCase.

Parameters:

Name Type Description Default
text str

The input text in snake_case format.

required

Returns:

Name Type Description
str str

The text converted to camelCase format.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def snake_to_camel_case(cls, text: str) -> str:
    """Converts snake_case to camelCase.

    Args:
        text (str): The input text in snake_case format.

    Returns:
        str: The text converted to camelCase format.
    """
    if cls.is_string_none_or_empty(text):
        return text

    components = text.split("_")
    # First component remains lowercase, the rest get capitalized
    return components[0] + "".join(x.title() for x in components[1:])

archipy.helpers.utils.string_utils.StringUtils.camel_to_snake_case(text) classmethod

Converts camelCase to snake_case.

Parameters:

Name Type Description Default
text str

The input text in camelCase format.

required

Returns:

Name Type Description
str str

The text converted to snake_case format.

Source code in archipy/helpers/utils/string_utils.py
@classmethod
def camel_to_snake_case(cls, text: str) -> str:
    """Converts camelCase to snake_case.

    Args:
        text (str): The input text in camelCase format.

    Returns:
        str: The text converted to snake_case format.
    """
    if cls.is_string_none_or_empty(text):
        return text

    # Add underscore before each capital letter and convert to lowercase
    s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text)
    return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()

options: show_root_heading: true show_source: true

totp_utils

Utilities for TOTP (Time-based One-Time Password) operations.

from archipy.helpers.utils.totp_utils import TOTPUtils

# Generate TOTP
totp_code = TOTPUtils.generate_totp(secret_key="your-secret")

# Verify TOTP
is_valid = TOTPUtils.verify_totp(totp_code, secret_key="your-secret")

# Generate secret key
secret_key = TOTPUtils.generate_secret_key()

# Get TOTP URI for QR code
totp_uri = TOTPUtils.get_totp_uri(
    secret_key=secret_key,
    issuer="MyApp",
    account_name="user@example.com"
)

Utility module for TOTP (Time-based One-Time Password) operations.

This module provides functionality for generating and verifying TOTP codes that are commonly used for multi-factor authentication.

archipy.helpers.utils.totp_utils.TOTPUtils

Utility class for TOTP (Time-based One-Time Password) operations.

This class provides methods for generating and verifying TOTP codes, as well as generating secure secret keys for TOTP initialization.

Uses the following configuration parameters from AuthConfig: - TOTP_SECRET_KEY: Master secret key for generating TOTP secrets - TOTP_HASH_ALGORITHM: Hash algorithm used for TOTP generation (default: SHA1) - TOTP_LENGTH: Number of digits in generated TOTP codes - TOTP_TIME_STEP: Time step in seconds between TOTP code changes - TOTP_EXPIRES_IN: TOTP validity period in seconds - TOTP_VERIFICATION_WINDOW: Number of time steps to check before/after - SALT_LENGTH: Length of random bytes for secure key generation

Source code in archipy/helpers/utils/totp_utils.py
class TOTPUtils:
    """Utility class for TOTP (Time-based One-Time Password) operations.

    This class provides methods for generating and verifying TOTP codes, as well as generating
    secure secret keys for TOTP initialization.

    Uses the following configuration parameters from AuthConfig:
    - TOTP_SECRET_KEY: Master secret key for generating TOTP secrets
    - TOTP_HASH_ALGORITHM: Hash algorithm used for TOTP generation (default: SHA1)
    - TOTP_LENGTH: Number of digits in generated TOTP codes
    - TOTP_TIME_STEP: Time step in seconds between TOTP code changes
    - TOTP_EXPIRES_IN: TOTP validity period in seconds
    - TOTP_VERIFICATION_WINDOW: Number of time steps to check before/after
    - SALT_LENGTH: Length of random bytes for secure key generation
    """

    @classmethod
    def generate_totp(cls, secret: str | UUID, auth_config: AuthConfig | None = None) -> tuple[str, datetime]:
        """Generates a TOTP code using the configured hash algorithm.

        Args:
            secret: The secret key used to generate the TOTP code.
            auth_config: Optional auth configuration override. If not provided, uses the global config.

        Returns:
            A tuple containing the generated TOTP code and its expiration time.

        Raises:
            InvalidArgumentError: If the secret is invalid or empty.
        """
        if not secret:
            raise InvalidArgumentError(
                argument_name="secret",
            )

        configs = auth_config or BaseConfig.global_config().AUTH

        # Convert secret to bytes if it's UUID
        if isinstance(secret, UUID):
            secret = str(secret)

        # Get current timestamp and calculate time step
        current_time = DatetimeUtils.get_epoch_time_now()
        time_step_counter = int(current_time / configs.TOTP_TIME_STEP)

        # Generate HMAC hash
        secret_bytes = str(secret).encode("utf-8")
        time_bytes = struct.pack(">Q", time_step_counter)

        # Use the dedicated TOTP hash algorithm from config, with fallback to SHA1
        hash_algo = getattr(configs, "TOTP_HASH_ALGORITHM", "SHA1")

        hmac_obj = hmac.new(secret_bytes, time_bytes, hash_algo)
        hmac_result = hmac_obj.digest()

        # Get offset and truncate
        offset = hmac_result[-1] & 0xF
        truncated_hash = (
            ((hmac_result[offset] & 0x7F) << 24)
            | ((hmac_result[offset + 1] & 0xFF) << 16)
            | ((hmac_result[offset + 2] & 0xFF) << 8)
            | (hmac_result[offset + 3] & 0xFF)
        )

        # Generate TOTP code
        totp_code = str(truncated_hash % (10**configs.TOTP_LENGTH)).zfill(configs.TOTP_LENGTH)

        # Calculate expiration time
        expires_in = DatetimeUtils.get_datetime_after_given_datetime_or_now(seconds=configs.TOTP_EXPIRES_IN)

        return totp_code, expires_in

    @classmethod
    def verify_totp(cls, secret: str | UUID, totp_code: str, auth_config: AuthConfig | None = None) -> bool:
        """Verifies a TOTP code against the provided secret.

        Args:
            secret: The secret key used to generate the TOTP code.
            totp_code: The TOTP code to verify.
            auth_config: Optional auth configuration override. If not provided, uses the global config.

        Returns:
            `True` if the TOTP code is valid, `False` otherwise.

        Raises:
            InvalidArgumentError: If the secret is invalid or empty.
            InvalidTokenError: If the TOTP code format is invalid.
        """
        if not secret:
            raise InvalidArgumentError(
                argument_name="secret",
            )

        if not totp_code:
            raise InvalidArgumentError(
                argument_name="totp_code",
            )

        if not totp_code.isdigit():
            raise InvalidTokenError

        configs = auth_config or BaseConfig.global_config().AUTH

        current_time = DatetimeUtils.get_epoch_time_now()

        # Use the dedicated TOTP hash algorithm from config, with fallback to SHA1
        hash_algo = getattr(configs, "TOTP_HASH_ALGORITHM", "SHA1")

        # Check codes within verification window
        for i in range(-configs.TOTP_VERIFICATION_WINDOW, configs.TOTP_VERIFICATION_WINDOW + 1):
            time_step_counter = int(current_time / configs.TOTP_TIME_STEP) + i

            secret_bytes = str(secret).encode("utf-8")
            time_bytes = struct.pack(">Q", time_step_counter)
            hmac_obj = hmac.new(secret_bytes, time_bytes, hash_algo)
            hmac_result = hmac_obj.digest()

            offset = hmac_result[-1] & 0xF
            truncated_hash = (
                ((hmac_result[offset] & 0x7F) << 24)
                | ((hmac_result[offset + 1] & 0xFF) << 16)
                | ((hmac_result[offset + 2] & 0xFF) << 8)
                | (hmac_result[offset + 3] & 0xFF)
            )

            computed_totp = str(truncated_hash % (10 ** len(totp_code))).zfill(len(totp_code))

            if hmac.compare_digest(totp_code, computed_totp):
                return True

        return False

    @staticmethod
    def generate_secret_key_for_totp(auth_config: AuthConfig | None = None) -> str:
        """Generates a random secret key for TOTP initialization.

        Args:
            auth_config: Optional auth configuration override. If not provided, uses the global config.

        Returns:
            A base32-encoded secret key for TOTP initialization.

        Raises:
            InvalidArgumentError: If the TOTP_SECRET_KEY is not configured.
            InternalError: If there is an error generating the secret key.
        """
        try:
            configs = auth_config or BaseConfig.global_config().AUTH

            # Use secrets module instead of random for better security
            random_bytes = secrets.token_bytes(configs.SALT_LENGTH)

            # Check if TOTP secret key is configured
            if not configs.TOTP_SECRET_KEY:
                # Disable linter for this specific case since we're already in a try-except block
                # and creating nested functions would reduce code readability
                raise InvalidArgumentError(
                    argument_name="TOTP_SECRET_KEY",
                )

            master_key = configs.TOTP_SECRET_KEY.get_secret_value().encode("utf-8")

            # Use the dedicated TOTP hash algorithm from config, with fallback to SHA1
            hash_algo = getattr(configs, "TOTP_HASH_ALGORITHM", "SHA1")

            # Use HMAC with master key for additional security
            hmac_obj = hmac.new(master_key, random_bytes, hash_algo)
            return base64.b32encode(hmac_obj.digest()).decode("utf-8")
        except Exception as e:
            # Convert any errors to our custom errors
            raise InternalError() from e

archipy.helpers.utils.totp_utils.TOTPUtils.generate_totp(secret, auth_config=None) classmethod

Generates a TOTP code using the configured hash algorithm.

Parameters:

Name Type Description Default
secret str | UUID

The secret key used to generate the TOTP code.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Type Description
tuple[str, datetime]

A tuple containing the generated TOTP code and its expiration time.

Raises:

Type Description
InvalidArgumentError

If the secret is invalid or empty.

Source code in archipy/helpers/utils/totp_utils.py
@classmethod
def generate_totp(cls, secret: str | UUID, auth_config: AuthConfig | None = None) -> tuple[str, datetime]:
    """Generates a TOTP code using the configured hash algorithm.

    Args:
        secret: The secret key used to generate the TOTP code.
        auth_config: Optional auth configuration override. If not provided, uses the global config.

    Returns:
        A tuple containing the generated TOTP code and its expiration time.

    Raises:
        InvalidArgumentError: If the secret is invalid or empty.
    """
    if not secret:
        raise InvalidArgumentError(
            argument_name="secret",
        )

    configs = auth_config or BaseConfig.global_config().AUTH

    # Convert secret to bytes if it's UUID
    if isinstance(secret, UUID):
        secret = str(secret)

    # Get current timestamp and calculate time step
    current_time = DatetimeUtils.get_epoch_time_now()
    time_step_counter = int(current_time / configs.TOTP_TIME_STEP)

    # Generate HMAC hash
    secret_bytes = str(secret).encode("utf-8")
    time_bytes = struct.pack(">Q", time_step_counter)

    # Use the dedicated TOTP hash algorithm from config, with fallback to SHA1
    hash_algo = getattr(configs, "TOTP_HASH_ALGORITHM", "SHA1")

    hmac_obj = hmac.new(secret_bytes, time_bytes, hash_algo)
    hmac_result = hmac_obj.digest()

    # Get offset and truncate
    offset = hmac_result[-1] & 0xF
    truncated_hash = (
        ((hmac_result[offset] & 0x7F) << 24)
        | ((hmac_result[offset + 1] & 0xFF) << 16)
        | ((hmac_result[offset + 2] & 0xFF) << 8)
        | (hmac_result[offset + 3] & 0xFF)
    )

    # Generate TOTP code
    totp_code = str(truncated_hash % (10**configs.TOTP_LENGTH)).zfill(configs.TOTP_LENGTH)

    # Calculate expiration time
    expires_in = DatetimeUtils.get_datetime_after_given_datetime_or_now(seconds=configs.TOTP_EXPIRES_IN)

    return totp_code, expires_in

archipy.helpers.utils.totp_utils.TOTPUtils.verify_totp(secret, totp_code, auth_config=None) classmethod

Verifies a TOTP code against the provided secret.

Parameters:

Name Type Description Default
secret str | UUID

The secret key used to generate the TOTP code.

required
totp_code str

The TOTP code to verify.

required
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Type Description
bool

True if the TOTP code is valid, False otherwise.

Raises:

Type Description
InvalidArgumentError

If the secret is invalid or empty.

InvalidTokenError

If the TOTP code format is invalid.

Source code in archipy/helpers/utils/totp_utils.py
@classmethod
def verify_totp(cls, secret: str | UUID, totp_code: str, auth_config: AuthConfig | None = None) -> bool:
    """Verifies a TOTP code against the provided secret.

    Args:
        secret: The secret key used to generate the TOTP code.
        totp_code: The TOTP code to verify.
        auth_config: Optional auth configuration override. If not provided, uses the global config.

    Returns:
        `True` if the TOTP code is valid, `False` otherwise.

    Raises:
        InvalidArgumentError: If the secret is invalid or empty.
        InvalidTokenError: If the TOTP code format is invalid.
    """
    if not secret:
        raise InvalidArgumentError(
            argument_name="secret",
        )

    if not totp_code:
        raise InvalidArgumentError(
            argument_name="totp_code",
        )

    if not totp_code.isdigit():
        raise InvalidTokenError

    configs = auth_config or BaseConfig.global_config().AUTH

    current_time = DatetimeUtils.get_epoch_time_now()

    # Use the dedicated TOTP hash algorithm from config, with fallback to SHA1
    hash_algo = getattr(configs, "TOTP_HASH_ALGORITHM", "SHA1")

    # Check codes within verification window
    for i in range(-configs.TOTP_VERIFICATION_WINDOW, configs.TOTP_VERIFICATION_WINDOW + 1):
        time_step_counter = int(current_time / configs.TOTP_TIME_STEP) + i

        secret_bytes = str(secret).encode("utf-8")
        time_bytes = struct.pack(">Q", time_step_counter)
        hmac_obj = hmac.new(secret_bytes, time_bytes, hash_algo)
        hmac_result = hmac_obj.digest()

        offset = hmac_result[-1] & 0xF
        truncated_hash = (
            ((hmac_result[offset] & 0x7F) << 24)
            | ((hmac_result[offset + 1] & 0xFF) << 16)
            | ((hmac_result[offset + 2] & 0xFF) << 8)
            | (hmac_result[offset + 3] & 0xFF)
        )

        computed_totp = str(truncated_hash % (10 ** len(totp_code))).zfill(len(totp_code))

        if hmac.compare_digest(totp_code, computed_totp):
            return True

    return False

archipy.helpers.utils.totp_utils.TOTPUtils.generate_secret_key_for_totp(auth_config=None) staticmethod

Generates a random secret key for TOTP initialization.

Parameters:

Name Type Description Default
auth_config AuthConfig | None

Optional auth configuration override. If not provided, uses the global config.

None

Returns:

Type Description
str

A base32-encoded secret key for TOTP initialization.

Raises:

Type Description
InvalidArgumentError

If the TOTP_SECRET_KEY is not configured.

InternalError

If there is an error generating the secret key.

Source code in archipy/helpers/utils/totp_utils.py
@staticmethod
def generate_secret_key_for_totp(auth_config: AuthConfig | None = None) -> str:
    """Generates a random secret key for TOTP initialization.

    Args:
        auth_config: Optional auth configuration override. If not provided, uses the global config.

    Returns:
        A base32-encoded secret key for TOTP initialization.

    Raises:
        InvalidArgumentError: If the TOTP_SECRET_KEY is not configured.
        InternalError: If there is an error generating the secret key.
    """
    try:
        configs = auth_config or BaseConfig.global_config().AUTH

        # Use secrets module instead of random for better security
        random_bytes = secrets.token_bytes(configs.SALT_LENGTH)

        # Check if TOTP secret key is configured
        if not configs.TOTP_SECRET_KEY:
            # Disable linter for this specific case since we're already in a try-except block
            # and creating nested functions would reduce code readability
            raise InvalidArgumentError(
                argument_name="TOTP_SECRET_KEY",
            )

        master_key = configs.TOTP_SECRET_KEY.get_secret_value().encode("utf-8")

        # Use the dedicated TOTP hash algorithm from config, with fallback to SHA1
        hash_algo = getattr(configs, "TOTP_HASH_ALGORITHM", "SHA1")

        # Use HMAC with master key for additional security
        hmac_obj = hmac.new(master_key, random_bytes, hash_algo)
        return base64.b32encode(hmac_obj.digest()).decode("utf-8")
    except Exception as e:
        # Convert any errors to our custom errors
        raise InternalError() from e

options: show_root_heading: true show_source: true

keycloak_utils

Utilities for Keycloak integration.

from archipy.helpers.utils.keycloak_utils import KeycloakUtils

# Get token
token = KeycloakUtils.get_keycloak_token(
    username="user",
    password="pass",
    client_id="my-client"
)

# Validate token
is_valid = KeycloakUtils.validate_keycloak_token(token)

# Get user info
user_info = KeycloakUtils.get_keycloak_userinfo(token)

# Check role
has_role = KeycloakUtils.has_keycloak_role(token, "admin")

archipy.helpers.utils.keycloak_utils.KeycloakUtils

Utility class for Keycloak authentication and authorization in FastAPI applications.

Source code in archipy/helpers/utils/keycloak_utils.py
class KeycloakUtils:
    """Utility class for Keycloak authentication and authorization in FastAPI applications."""

    @staticmethod
    def _get_keycloak_adapter() -> KeycloakAdapter:
        return KeycloakAdapter()

    @staticmethod
    def _get_async_keycloak_adapter() -> AsyncKeycloakAdapter:
        return AsyncKeycloakAdapter()

    @classmethod
    # Synchronous decorator
    def fastapi_auth(
        cls,
        resource_type_param: str | None = None,
        resource_type: str | None = None,
        required_roles: frozenset[str] | None = None,
        all_roles_required: bool = False,
        required_permissions: tuple[tuple[str, str], ...] | None = None,
        admin_roles: frozenset[str] | None = None,
        lang: LanguageType = DEFAULT_LANG,
    ) -> Callable:
        """FastAPI decorator for Keycloak authentication and resource-based authorization.

        Args:
            resource_type_param: The parameter name in the path (e.g., 'user_uuid', 'employee_uuid')
            resource_type: The type of resource being accessed (e.g., 'users', 'employees')
            required_roles: Set of role names that the user must have
            all_roles_required: If True, user must have all specified roles; if False, any role is sufficient
            required_permissions: List of (resource, scope) tuples to check
            admin_roles: Set of roles that grant administrative access to all resources
            lang: Language for error messages
        Raises:
            UnauthenticatedError: If no valid Authorization header is provided
            InvalidTokenError: If token is invalid
            TokenExpiredError: If token is expired
            PermissionDeniedError: If user lacks required roles, permissions, or resource access
            InvalidArgumentError: If resource_type_param is missing when resource_type is provided
        """

        def dependency(
            request: Request,
            token: HTTPAuthorizationCredentials = Security(security),
            keycloak: KeycloakAdapter = Depends(cls._get_keycloak_adapter),
        ) -> dict:
            if token is None:
                raise UnauthenticatedError(lang=lang)
            token_str = token.credentials  # Extract the token string
            # Validate token
            if not keycloak.validate_token(token_str):
                token_info = keycloak.introspect_token(token_str)
                if not token_info.get("active", False):
                    raise TokenExpiredError(lang=lang)

            # Get user info from token
            user_info = keycloak.get_userinfo(token_str)
            token_info = keycloak.get_token_info(token_str)

            # Resource-based authorization if resource type is provided
            if resource_type and resource_type_param:
                # Extract resource UUID from path parameters
                resource_uuid = request.path_params.get(resource_type_param)
                if not resource_uuid:
                    raise InvalidArgumentError(argument_name=resource_type_param, lang=lang)

                # Verify resource exists and user has access
                user_uuid = user_info.get("sub")

                # Check if resource exists
                resource_user = keycloak.get_user_by_id(resource_uuid)
                if not resource_user:
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"resource_type": resource_type, "resource_id": resource_uuid},
                    )

                # Authorization check: either owns the resource or has admin privileges
                has_admin_privileges = admin_roles and keycloak.has_any_of_roles(token_str, admin_roles)
                if user_uuid != resource_uuid and not has_admin_privileges:
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"resource_type": resource_type, "resource_id": resource_uuid},
                    )

            # Check additional roles if specified
            if required_roles:
                if all_roles_required:
                    if not keycloak.has_all_roles(token_str, required_roles):
                        raise PermissionDeniedError(
                            lang=lang,
                            additional_data={"required_roles": required_roles},
                        )
                elif not keycloak.has_any_of_roles(token_str, required_roles):
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"required_roles": required_roles},
                    )

            # Check permissions if specified
            if required_permissions:
                for resource, scope in required_permissions:
                    if not keycloak.check_permissions(token_str, resource, scope):
                        raise PermissionDeniedError(
                            lang=lang,
                            additional_data={"required_permission": f"{resource}#{scope}"},
                        )

            # Add user info to request state
            request.state.user_info = user_info
            request.state.token_info = token_info
            return user_info

        return dependency

    @classmethod
    def async_fastapi_auth(
        cls,
        resource_type_param: str | None = None,
        resource_type: str | None = None,
        required_roles: frozenset[str] | None = None,
        all_roles_required: bool = False,
        required_permissions: tuple[tuple[str, str], ...] | None = None,
        admin_roles: frozenset[str] | None = None,
        lang: LanguageType = DEFAULT_LANG,
    ) -> Callable:
        """FastAPI async decorator for Keycloak authentication and resource-based authorization.

        Args:
            resource_type_param: The parameter name in the path (e.g., 'user_uuid', 'employee_uuid')
            resource_type: The type of resource being accessed (e.g., 'users', 'employees')
            required_roles: Set of role names that the user must have
            all_roles_required: If True, user must have all specified roles; if False, any role is sufficient
            required_permissions: List of (resource, scope) tuples to check
            admin_roles: Set of roles that grant administrative access to all resources
            lang: Language for error messages
        Raises:
            UnauthenticatedError: If no valid Authorization header is provided
            InvalidTokenError: If token is invalid
            TokenExpiredError: If token is expired
            PermissionDeniedError: If user lacks required roles, permissions, or resource access
            InvalidArgumentError: If resource_type_param is missing when resource_type is provided
        """

        async def dependency(
            request: Request,
            token: HTTPAuthorizationCredentials = Security(security),
            keycloak: AsyncKeycloakAdapter = Depends(cls._get_async_keycloak_adapter),
        ) -> dict:
            if token is None:
                raise UnauthenticatedError(lang=lang)
            token_str = token.credentials  # Extract the token string

            # Validate token
            if not await keycloak.validate_token(token_str):
                # Handle token validation error
                token_info = await keycloak.introspect_token(token_str)
                if not token_info.get("active", False):
                    raise TokenExpiredError(lang=lang)

            # Get user info from token
            user_info = await keycloak.get_userinfo(token_str)
            token_info = await keycloak.get_token_info(token_str)

            # Resource-based authorization if resource type is provided
            if resource_type and resource_type_param:
                # Extract resource UUID from path parameters
                resource_uuid = request.path_params.get(resource_type_param)
                if not resource_uuid:
                    raise InvalidArgumentError(argument_name=resource_type_param, lang=lang)

                # Verify resource exists and user has access
                user_uuid = user_info.get("sub")

                # Check if resource exists
                resource_user = await keycloak.get_user_by_id(resource_uuid)
                if not resource_user:
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"resource_type": resource_type, "resource_id": resource_uuid},
                    )

                # Authorization check: either owns the resource or has admin privileges
                has_admin_privileges = admin_roles and await keycloak.has_any_of_roles(token_str, admin_roles)
                if user_uuid != resource_uuid and not has_admin_privileges:
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"resource_type": resource_type, "resource_id": resource_uuid},
                    )

            # Check additional roles if specified
            if required_roles:
                if all_roles_required:
                    if not await keycloak.has_all_roles(token_str, required_roles):
                        raise PermissionDeniedError(
                            lang=lang,
                            additional_data={"required_roles": required_roles},
                        )
                elif not await keycloak.has_any_of_roles(token_str, required_roles):
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"required_roles": required_roles},
                    )

            # Check permissions if specified
            if required_permissions:
                for resource, scope in required_permissions:
                    if not await keycloak.check_permissions(token_str, resource, scope):
                        raise PermissionDeniedError(
                            lang=lang,
                            additional_data={"required_permission": f"{resource}#{scope}"},
                        )

            # Add user info to request state
            request.state.user_info = user_info
            request.state.token_info = token_info
            return user_info

        return dependency

archipy.helpers.utils.keycloak_utils.KeycloakUtils.fastapi_auth(resource_type_param=None, resource_type=None, required_roles=None, all_roles_required=False, required_permissions=None, admin_roles=None, lang=DEFAULT_LANG) classmethod

FastAPI decorator for Keycloak authentication and resource-based authorization.

Parameters:

Name Type Description Default
resource_type_param str | None

The parameter name in the path (e.g., 'user_uuid', 'employee_uuid')

None
resource_type str | None

The type of resource being accessed (e.g., 'users', 'employees')

None
required_roles frozenset[str] | None

Set of role names that the user must have

None
all_roles_required bool

If True, user must have all specified roles; if False, any role is sufficient

False
required_permissions tuple[tuple[str, str], ...] | None

List of (resource, scope) tuples to check

None
admin_roles frozenset[str] | None

Set of roles that grant administrative access to all resources

None
lang LanguageType

Language for error messages

DEFAULT_LANG

Raises: UnauthenticatedError: If no valid Authorization header is provided InvalidTokenError: If token is invalid TokenExpiredError: If token is expired PermissionDeniedError: If user lacks required roles, permissions, or resource access InvalidArgumentError: If resource_type_param is missing when resource_type is provided

Source code in archipy/helpers/utils/keycloak_utils.py
@classmethod
# Synchronous decorator
def fastapi_auth(
    cls,
    resource_type_param: str | None = None,
    resource_type: str | None = None,
    required_roles: frozenset[str] | None = None,
    all_roles_required: bool = False,
    required_permissions: tuple[tuple[str, str], ...] | None = None,
    admin_roles: frozenset[str] | None = None,
    lang: LanguageType = DEFAULT_LANG,
) -> Callable:
    """FastAPI decorator for Keycloak authentication and resource-based authorization.

    Args:
        resource_type_param: The parameter name in the path (e.g., 'user_uuid', 'employee_uuid')
        resource_type: The type of resource being accessed (e.g., 'users', 'employees')
        required_roles: Set of role names that the user must have
        all_roles_required: If True, user must have all specified roles; if False, any role is sufficient
        required_permissions: List of (resource, scope) tuples to check
        admin_roles: Set of roles that grant administrative access to all resources
        lang: Language for error messages
    Raises:
        UnauthenticatedError: If no valid Authorization header is provided
        InvalidTokenError: If token is invalid
        TokenExpiredError: If token is expired
        PermissionDeniedError: If user lacks required roles, permissions, or resource access
        InvalidArgumentError: If resource_type_param is missing when resource_type is provided
    """

    def dependency(
        request: Request,
        token: HTTPAuthorizationCredentials = Security(security),
        keycloak: KeycloakAdapter = Depends(cls._get_keycloak_adapter),
    ) -> dict:
        if token is None:
            raise UnauthenticatedError(lang=lang)
        token_str = token.credentials  # Extract the token string
        # Validate token
        if not keycloak.validate_token(token_str):
            token_info = keycloak.introspect_token(token_str)
            if not token_info.get("active", False):
                raise TokenExpiredError(lang=lang)

        # Get user info from token
        user_info = keycloak.get_userinfo(token_str)
        token_info = keycloak.get_token_info(token_str)

        # Resource-based authorization if resource type is provided
        if resource_type and resource_type_param:
            # Extract resource UUID from path parameters
            resource_uuid = request.path_params.get(resource_type_param)
            if not resource_uuid:
                raise InvalidArgumentError(argument_name=resource_type_param, lang=lang)

            # Verify resource exists and user has access
            user_uuid = user_info.get("sub")

            # Check if resource exists
            resource_user = keycloak.get_user_by_id(resource_uuid)
            if not resource_user:
                raise PermissionDeniedError(
                    lang=lang,
                    additional_data={"resource_type": resource_type, "resource_id": resource_uuid},
                )

            # Authorization check: either owns the resource or has admin privileges
            has_admin_privileges = admin_roles and keycloak.has_any_of_roles(token_str, admin_roles)
            if user_uuid != resource_uuid and not has_admin_privileges:
                raise PermissionDeniedError(
                    lang=lang,
                    additional_data={"resource_type": resource_type, "resource_id": resource_uuid},
                )

        # Check additional roles if specified
        if required_roles:
            if all_roles_required:
                if not keycloak.has_all_roles(token_str, required_roles):
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"required_roles": required_roles},
                    )
            elif not keycloak.has_any_of_roles(token_str, required_roles):
                raise PermissionDeniedError(
                    lang=lang,
                    additional_data={"required_roles": required_roles},
                )

        # Check permissions if specified
        if required_permissions:
            for resource, scope in required_permissions:
                if not keycloak.check_permissions(token_str, resource, scope):
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"required_permission": f"{resource}#{scope}"},
                    )

        # Add user info to request state
        request.state.user_info = user_info
        request.state.token_info = token_info
        return user_info

    return dependency

archipy.helpers.utils.keycloak_utils.KeycloakUtils.async_fastapi_auth(resource_type_param=None, resource_type=None, required_roles=None, all_roles_required=False, required_permissions=None, admin_roles=None, lang=DEFAULT_LANG) classmethod

FastAPI async decorator for Keycloak authentication and resource-based authorization.

Parameters:

Name Type Description Default
resource_type_param str | None

The parameter name in the path (e.g., 'user_uuid', 'employee_uuid')

None
resource_type str | None

The type of resource being accessed (e.g., 'users', 'employees')

None
required_roles frozenset[str] | None

Set of role names that the user must have

None
all_roles_required bool

If True, user must have all specified roles; if False, any role is sufficient

False
required_permissions tuple[tuple[str, str], ...] | None

List of (resource, scope) tuples to check

None
admin_roles frozenset[str] | None

Set of roles that grant administrative access to all resources

None
lang LanguageType

Language for error messages

DEFAULT_LANG

Raises: UnauthenticatedError: If no valid Authorization header is provided InvalidTokenError: If token is invalid TokenExpiredError: If token is expired PermissionDeniedError: If user lacks required roles, permissions, or resource access InvalidArgumentError: If resource_type_param is missing when resource_type is provided

Source code in archipy/helpers/utils/keycloak_utils.py
@classmethod
def async_fastapi_auth(
    cls,
    resource_type_param: str | None = None,
    resource_type: str | None = None,
    required_roles: frozenset[str] | None = None,
    all_roles_required: bool = False,
    required_permissions: tuple[tuple[str, str], ...] | None = None,
    admin_roles: frozenset[str] | None = None,
    lang: LanguageType = DEFAULT_LANG,
) -> Callable:
    """FastAPI async decorator for Keycloak authentication and resource-based authorization.

    Args:
        resource_type_param: The parameter name in the path (e.g., 'user_uuid', 'employee_uuid')
        resource_type: The type of resource being accessed (e.g., 'users', 'employees')
        required_roles: Set of role names that the user must have
        all_roles_required: If True, user must have all specified roles; if False, any role is sufficient
        required_permissions: List of (resource, scope) tuples to check
        admin_roles: Set of roles that grant administrative access to all resources
        lang: Language for error messages
    Raises:
        UnauthenticatedError: If no valid Authorization header is provided
        InvalidTokenError: If token is invalid
        TokenExpiredError: If token is expired
        PermissionDeniedError: If user lacks required roles, permissions, or resource access
        InvalidArgumentError: If resource_type_param is missing when resource_type is provided
    """

    async def dependency(
        request: Request,
        token: HTTPAuthorizationCredentials = Security(security),
        keycloak: AsyncKeycloakAdapter = Depends(cls._get_async_keycloak_adapter),
    ) -> dict:
        if token is None:
            raise UnauthenticatedError(lang=lang)
        token_str = token.credentials  # Extract the token string

        # Validate token
        if not await keycloak.validate_token(token_str):
            # Handle token validation error
            token_info = await keycloak.introspect_token(token_str)
            if not token_info.get("active", False):
                raise TokenExpiredError(lang=lang)

        # Get user info from token
        user_info = await keycloak.get_userinfo(token_str)
        token_info = await keycloak.get_token_info(token_str)

        # Resource-based authorization if resource type is provided
        if resource_type and resource_type_param:
            # Extract resource UUID from path parameters
            resource_uuid = request.path_params.get(resource_type_param)
            if not resource_uuid:
                raise InvalidArgumentError(argument_name=resource_type_param, lang=lang)

            # Verify resource exists and user has access
            user_uuid = user_info.get("sub")

            # Check if resource exists
            resource_user = await keycloak.get_user_by_id(resource_uuid)
            if not resource_user:
                raise PermissionDeniedError(
                    lang=lang,
                    additional_data={"resource_type": resource_type, "resource_id": resource_uuid},
                )

            # Authorization check: either owns the resource or has admin privileges
            has_admin_privileges = admin_roles and await keycloak.has_any_of_roles(token_str, admin_roles)
            if user_uuid != resource_uuid and not has_admin_privileges:
                raise PermissionDeniedError(
                    lang=lang,
                    additional_data={"resource_type": resource_type, "resource_id": resource_uuid},
                )

        # Check additional roles if specified
        if required_roles:
            if all_roles_required:
                if not await keycloak.has_all_roles(token_str, required_roles):
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"required_roles": required_roles},
                    )
            elif not await keycloak.has_any_of_roles(token_str, required_roles):
                raise PermissionDeniedError(
                    lang=lang,
                    additional_data={"required_roles": required_roles},
                )

        # Check permissions if specified
        if required_permissions:
            for resource, scope in required_permissions:
                if not await keycloak.check_permissions(token_str, resource, scope):
                    raise PermissionDeniedError(
                        lang=lang,
                        additional_data={"required_permission": f"{resource}#{scope}"},
                    )

        # Add user info to request state
        request.state.user_info = user_info
        request.state.token_info = token_info
        return user_info

    return dependency

options: show_root_heading: true show_source: true

Key Classes

DateTimeUtils

Class: archipy.helpers.utils.datetime_utils.DateTimeUtils

Provides datetime operations with features:

  • Timezone-aware
  • Microsecond precision
  • Consistent across the application

JWTUtils

Class: archipy.helpers.utils.jwt_utils.JWTUtils

Provides JWT operations with features:

  • Configurable expiration
  • Custom payload support
  • Multiple signing algorithms
  • Token refresh capability

PasswordUtils

Class: archipy.helpers.utils.password_utils.PasswordUtils

Provides password operations with features:

  • Secure hashing algorithm
  • Salt generation
  • Configurable work factor
  • Protection against timing attacks