Skip to content

Keycloak

The keycloak adapter provides integration with Keycloak for identity and access management, including token validation, user management, and role-based access control.

Ports

Abstract port interface defining the Keycloak adapter contract.

Keycloak port definitions for ArchiPy.

archipy.adapters.keycloak.ports.KeycloakResponseType module-attribute

KeycloakResponseType = dict[str, Any]

archipy.adapters.keycloak.ports.KeycloakRoleType module-attribute

KeycloakRoleType = dict[str, Any]

archipy.adapters.keycloak.ports.KeycloakUserType module-attribute

KeycloakUserType = dict[str, Any]

archipy.adapters.keycloak.ports.KeycloakGroupType module-attribute

KeycloakGroupType = dict[str, Any]

archipy.adapters.keycloak.ports.KeycloakTokenType module-attribute

KeycloakTokenType = dict[str, Any]

archipy.adapters.keycloak.ports.KeycloakOrganizationType module-attribute

KeycloakOrganizationType = dict[str, Any]

archipy.adapters.keycloak.ports.PublicKeyType module-attribute

PublicKeyType = Any

archipy.adapters.keycloak.ports.KeycloakPort

Interface for Keycloak operations providing a standardized access pattern.

This interface defines the contract for Keycloak adapters, ensuring consistent implementation of Keycloak operations across different adapters. It covers essential functionality including authentication, user management, and role management.

Source code in archipy/adapters/keycloak/ports.py
class KeycloakPort:
    """Interface for Keycloak operations providing a standardized access pattern.

    This interface defines the contract for Keycloak adapters, ensuring consistent
    implementation of Keycloak operations across different adapters. It covers essential
    functionality including authentication, user management, and role management.
    """

    # Token Operations
    @abstractmethod
    def get_token(self, username: str, password: str) -> KeycloakTokenType | None:
        """Get a user token by username and password."""
        raise NotImplementedError

    @abstractmethod
    def refresh_token(self, refresh_token: str) -> KeycloakTokenType | None:
        """Refresh an existing token using a refresh token."""
        raise NotImplementedError

    @abstractmethod
    def validate_token(self, token: str) -> bool:
        """Validate if a token is still valid."""
        raise NotImplementedError

    @abstractmethod
    def get_userinfo(self, token: str) -> KeycloakUserType | None:
        """Get user information from a token."""
        raise NotImplementedError

    @abstractmethod
    def get_token_info(self, token: str) -> dict[str, Any] | None:
        """Decode token to get its claims."""
        raise NotImplementedError

    @abstractmethod
    def introspect_token(self, token: str) -> dict[str, Any] | None:
        """Introspect token to get detailed information about it."""
        raise NotImplementedError

    @abstractmethod
    def get_client_credentials_token(self) -> KeycloakTokenType | None:
        """Get token using client credentials."""
        raise NotImplementedError

    @abstractmethod
    def logout(self, refresh_token: str) -> None:
        """Logout user by invalidating their refresh token."""
        raise NotImplementedError

    # User Operations
    @abstractmethod
    def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
        """Get user details by user ID."""
        raise NotImplementedError

    @abstractmethod
    def get_user_by_username(self, username: str) -> KeycloakUserType | None:
        """Get user details by username."""
        raise NotImplementedError

    @abstractmethod
    def get_user_by_email(self, email: str) -> KeycloakUserType | None:
        """Get user details by email."""
        raise NotImplementedError

    @abstractmethod
    def create_user(self, user_data: dict[str, Any]) -> str | None:
        """Create a new user in Keycloak."""
        raise NotImplementedError

    @abstractmethod
    def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
        """Update user details."""
        raise NotImplementedError

    @abstractmethod
    def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
        """Reset a user's password."""
        raise NotImplementedError

    @abstractmethod
    def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
        """Search for users by username, email, or name."""
        raise NotImplementedError

    @abstractmethod
    def clear_user_sessions(self, user_id: str) -> None:
        """Clear all sessions for a user."""
        raise NotImplementedError

    # Role Operations
    @abstractmethod
    def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
        """Get roles assigned to a user."""
        raise NotImplementedError

    @abstractmethod
    def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
        """Get client-specific roles assigned to a user."""
        raise NotImplementedError

    @abstractmethod
    def has_role(self, token: str, role_name: str) -> bool:
        """Check if a user has a specific role."""
        raise NotImplementedError

    @abstractmethod
    def has_any_of_roles(self, token: str, role_names: frozenset[str]) -> bool:
        """Check if a user has any of the specified roles."""
        raise NotImplementedError

    @abstractmethod
    def has_all_roles(self, token: str, role_names: frozenset[str]) -> bool:
        """Check if a user has all of the specified roles."""
        raise NotImplementedError

    @abstractmethod
    def assign_realm_role(self, user_id: str, role_name: str) -> None:
        """Assign a realm role to a user."""
        raise NotImplementedError

    @abstractmethod
    def remove_realm_role(self, user_id: str, role_name: str) -> None:
        """Remove a realm role from a user."""
        raise NotImplementedError

    @abstractmethod
    def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Assign a client-specific role to a user."""
        raise NotImplementedError

    @abstractmethod
    def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Remove a client-specific role from a user."""
        raise NotImplementedError

    @abstractmethod
    def get_realm_role(self, role_name: str) -> dict[str, Any]:
        """Get realm role."""
        raise NotImplementedError

    @abstractmethod
    def get_realm_roles(self) -> list[dict[str, Any]]:
        """Get all realm roles."""
        raise NotImplementedError

    @abstractmethod
    def create_realm_role(
        self,
        role_name: str,
        description: str | None = None,
        skip_exists: bool = True,
    ) -> dict[str, Any] | None:
        """Create a new realm role."""
        raise NotImplementedError

    @abstractmethod
    def delete_realm_role(self, role_name: str) -> None:
        """Delete a realm role."""
        raise NotImplementedError

    # Client Operations
    @abstractmethod
    def get_client_id(self, client_name: str) -> str:
        """Get client ID by client name."""
        raise NotImplementedError

    @abstractmethod
    def get_client_secret(self, client_id: str) -> str:
        """Get client secret."""
        raise NotImplementedError

    @abstractmethod
    def get_service_account_id(self) -> str:
        """Get service account user ID for the current client."""
        raise NotImplementedError

    # System Operations
    @abstractmethod
    def get_public_key(self) -> PublicKeyType:
        """Get the public key used to verify tokens."""
        raise NotImplementedError

    @abstractmethod
    def get_well_known_config(self) -> dict[str, Any]:
        """Get the well-known OpenID configuration."""
        raise NotImplementedError

    @abstractmethod
    def get_certs(self) -> dict[str, Any]:
        """Get the JWT verification certificates."""
        raise NotImplementedError

    # Authorization
    @abstractmethod
    def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType | None:
        """Exchange authorization code for token."""
        raise NotImplementedError

    @abstractmethod
    def check_permissions(self, token: str, resource: str, scope: str) -> bool:
        """Check if a user has permission to access a resource with the specified scope."""
        raise NotImplementedError

    @abstractmethod
    def delete_user(self, user_id: str) -> None:
        """Delete a user from Keycloak by their ID."""
        raise NotImplementedError

    @abstractmethod
    def create_client_role(
        self,
        client_id: str,
        role_name: str,
        description: str | None = None,
        skip_exists: bool = True,
    ) -> dict[str, Any] | None:
        """Create a new client role."""
        raise NotImplementedError

    @abstractmethod
    def create_realm(self, realm_name: str, skip_exists: bool = True, **kwargs: Any) -> dict[str, Any] | None:
        """Create a new Keycloak realm."""
        raise NotImplementedError

    @abstractmethod
    def get_realm(self, realm_name: str) -> dict[str, Any] | None:
        """Get realm details by realm name."""
        raise NotImplementedError

    @abstractmethod
    def update_realm(self, realm_name: str, **kwargs: Any) -> dict[str, Any] | None:
        """Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled)."""
        raise NotImplementedError

    @abstractmethod
    def create_client(
        self,
        client_id: str,
        realm: str | None = None,
        skip_exists: bool = True,
        **kwargs: Any,
    ) -> dict[str, Any] | None:
        """Create a new client in the specified realm."""
        raise NotImplementedError

    @abstractmethod
    def add_realm_roles_to_composite(self, composite_role_name: str, child_role_names: list[str]) -> None:
        """Add realm roles to a composite role."""
        raise NotImplementedError

    @abstractmethod
    def add_client_roles_to_composite(
        self,
        composite_role_name: str,
        client_id: str,
        child_role_names: list[str],
    ) -> None:
        """Add client roles to a composite role."""
        raise NotImplementedError

    @abstractmethod
    def get_composite_realm_roles(self, role_name: str) -> list[dict[str, Any]] | None:
        """Get composite roles for a realm role."""
        raise NotImplementedError

    # Organization Operations
    @abstractmethod
    def get_organizations(self, query: dict | None = None) -> list[KeycloakOrganizationType]:
        """Fetch all organizations. Returns list of OrganizationRepresentation, filtered by query."""
        raise NotImplementedError

    @abstractmethod
    def get_organization(self, organization_id: str) -> KeycloakOrganizationType:
        """Get representation of the organization by ID."""
        raise NotImplementedError

    @abstractmethod
    def create_organization(self, name: str, alias: str, **kwargs: Any) -> str | None:
        """Create a new organization. Name and alias must be unique. Returns org_id."""
        raise NotImplementedError

    @abstractmethod
    def update_organization(self, organization_id: str, **kwargs: Any) -> dict[str, Any]:
        """Update an existing organization. Kwargs are organization attributes (e.g. name, alias)."""
        raise NotImplementedError

    @abstractmethod
    def delete_organization(self, organization_id: str) -> dict[str, Any]:
        """Delete an organization."""
        raise NotImplementedError

    @abstractmethod
    def get_organization_idps(self, organization_id: str) -> list[dict[str, Any]]:
        """Get IDPs by organization id."""
        raise NotImplementedError

    @abstractmethod
    def get_user_organizations(self, user_id: str) -> list[KeycloakOrganizationType]:
        """Get organizations by user id. Returns list of organizations the user is member of."""
        raise NotImplementedError

    @abstractmethod
    def get_organization_members(self, organization_id: str, query: dict | None = None) -> list[dict[str, Any]]:
        """Get members by organization id, optionally filtered by query parameters."""
        raise NotImplementedError

    @abstractmethod
    def get_organization_members_count(self, organization_id: str) -> int:
        """Get the number of members in the organization."""
        raise NotImplementedError

    @abstractmethod
    def organization_user_add(self, user_id: str, organization_id: str) -> bytes:
        """Add a user to an organization."""
        raise NotImplementedError

    @abstractmethod
    def organization_user_remove(self, user_id: str, organization_id: str) -> dict[str, Any]:
        """Remove a user from an organization."""
        raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_token abstractmethod

get_token(
    username: str, password: str
) -> KeycloakTokenType | None

Get a user token by username and password.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_token(self, username: str, password: str) -> KeycloakTokenType | None:
    """Get a user token by username and password."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.refresh_token abstractmethod

refresh_token(
    refresh_token: str,
) -> KeycloakTokenType | None

Refresh an existing token using a refresh token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def refresh_token(self, refresh_token: str) -> KeycloakTokenType | None:
    """Refresh an existing token using a refresh token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.validate_token abstractmethod

validate_token(token: str) -> bool

Validate if a token is still valid.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def validate_token(self, token: str) -> bool:
    """Validate if a token is still valid."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_userinfo abstractmethod

get_userinfo(token: str) -> KeycloakUserType | None

Get user information from a token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_userinfo(self, token: str) -> KeycloakUserType | None:
    """Get user information from a token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_token_info abstractmethod

get_token_info(token: str) -> dict[str, Any] | None

Decode token to get its claims.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_token_info(self, token: str) -> dict[str, Any] | None:
    """Decode token to get its claims."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.introspect_token abstractmethod

introspect_token(token: str) -> dict[str, Any] | None

Introspect token to get detailed information about it.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def introspect_token(self, token: str) -> dict[str, Any] | None:
    """Introspect token to get detailed information about it."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_client_credentials_token abstractmethod

get_client_credentials_token() -> KeycloakTokenType | None

Get token using client credentials.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_client_credentials_token(self) -> KeycloakTokenType | None:
    """Get token using client credentials."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.logout abstractmethod

logout(refresh_token: str) -> None

Logout user by invalidating their refresh token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def logout(self, refresh_token: str) -> None:
    """Logout user by invalidating their refresh token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_by_id abstractmethod

get_user_by_id(user_id: str) -> KeycloakUserType | None

Get user details by user ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
    """Get user details by user ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_by_username abstractmethod

get_user_by_username(
    username: str,
) -> KeycloakUserType | None

Get user details by username.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_by_username(self, username: str) -> KeycloakUserType | None:
    """Get user details by username."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_by_email abstractmethod

get_user_by_email(email: str) -> KeycloakUserType | None

Get user details by email.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_by_email(self, email: str) -> KeycloakUserType | None:
    """Get user details by email."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.create_user abstractmethod

create_user(user_data: dict[str, Any]) -> str | None

Create a new user in Keycloak.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def create_user(self, user_data: dict[str, Any]) -> str | None:
    """Create a new user in Keycloak."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.update_user abstractmethod

update_user(
    user_id: str, user_data: dict[str, Any]
) -> None

Update user details.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
    """Update user details."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.reset_password abstractmethod

reset_password(
    user_id: str, password: str, temporary: bool = False
) -> None

Reset a user's password.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
    """Reset a user's password."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.search_users abstractmethod

search_users(
    query: str, max_results: int = 100
) -> list[KeycloakUserType]

Search for users by username, email, or name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
    """Search for users by username, email, or name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.clear_user_sessions abstractmethod

clear_user_sessions(user_id: str) -> None

Clear all sessions for a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def clear_user_sessions(self, user_id: str) -> None:
    """Clear all sessions for a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_roles abstractmethod

get_user_roles(user_id: str) -> list[KeycloakRoleType]

Get roles assigned to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
    """Get roles assigned to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_client_roles_for_user abstractmethod

get_client_roles_for_user(
    user_id: str, client_id: str
) -> list[KeycloakRoleType]

Get client-specific roles assigned to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
    """Get client-specific roles assigned to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.has_role abstractmethod

has_role(token: str, role_name: str) -> bool

Check if a user has a specific role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def has_role(self, token: str, role_name: str) -> bool:
    """Check if a user has a specific role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.has_any_of_roles abstractmethod

has_any_of_roles(
    token: str, role_names: frozenset[str]
) -> bool

Check if a user has any of the specified roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def has_any_of_roles(self, token: str, role_names: frozenset[str]) -> bool:
    """Check if a user has any of the specified roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.has_all_roles abstractmethod

has_all_roles(
    token: str, role_names: frozenset[str]
) -> bool

Check if a user has all of the specified roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def has_all_roles(self, token: str, role_names: frozenset[str]) -> bool:
    """Check if a user has all of the specified roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.assign_realm_role abstractmethod

assign_realm_role(user_id: str, role_name: str) -> None

Assign a realm role to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def assign_realm_role(self, user_id: str, role_name: str) -> None:
    """Assign a realm role to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.remove_realm_role abstractmethod

remove_realm_role(user_id: str, role_name: str) -> None

Remove a realm role from a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def remove_realm_role(self, user_id: str, role_name: str) -> None:
    """Remove a realm role from a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.assign_client_role abstractmethod

assign_client_role(
    user_id: str, client_id: str, role_name: str
) -> None

Assign a client-specific role to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Assign a client-specific role to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.remove_client_role abstractmethod

remove_client_role(
    user_id: str, client_id: str, role_name: str
) -> None

Remove a client-specific role from a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Remove a client-specific role from a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_realm_role abstractmethod

get_realm_role(role_name: str) -> dict[str, Any]

Get realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_realm_role(self, role_name: str) -> dict[str, Any]:
    """Get realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_realm_roles abstractmethod

get_realm_roles() -> list[dict[str, Any]]

Get all realm roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_realm_roles(self) -> list[dict[str, Any]]:
    """Get all realm roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.create_realm_role abstractmethod

create_realm_role(
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None

Create a new realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def create_realm_role(
    self,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None:
    """Create a new realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.delete_realm_role abstractmethod

delete_realm_role(role_name: str) -> None

Delete a realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def delete_realm_role(self, role_name: str) -> None:
    """Delete a realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_client_id abstractmethod

get_client_id(client_name: str) -> str

Get client ID by client name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_client_id(self, client_name: str) -> str:
    """Get client ID by client name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_client_secret abstractmethod

get_client_secret(client_id: str) -> str

Get client secret.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_client_secret(self, client_id: str) -> str:
    """Get client secret."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_service_account_id abstractmethod

get_service_account_id() -> str

Get service account user ID for the current client.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_service_account_id(self) -> str:
    """Get service account user ID for the current client."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_public_key abstractmethod

get_public_key() -> PublicKeyType

Get the public key used to verify tokens.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_public_key(self) -> PublicKeyType:
    """Get the public key used to verify tokens."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_well_known_config abstractmethod

get_well_known_config() -> dict[str, Any]

Get the well-known OpenID configuration.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_well_known_config(self) -> dict[str, Any]:
    """Get the well-known OpenID configuration."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_certs abstractmethod

get_certs() -> dict[str, Any]

Get the JWT verification certificates.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_certs(self) -> dict[str, Any]:
    """Get the JWT verification certificates."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_token_from_code abstractmethod

get_token_from_code(
    code: str, redirect_uri: str
) -> KeycloakTokenType | None

Exchange authorization code for token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType | None:
    """Exchange authorization code for token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.check_permissions abstractmethod

check_permissions(
    token: str, resource: str, scope: str
) -> bool

Check if a user has permission to access a resource with the specified scope.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def check_permissions(self, token: str, resource: str, scope: str) -> bool:
    """Check if a user has permission to access a resource with the specified scope."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.delete_user abstractmethod

delete_user(user_id: str) -> None

Delete a user from Keycloak by their ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def delete_user(self, user_id: str) -> None:
    """Delete a user from Keycloak by their ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.create_client_role abstractmethod

create_client_role(
    client_id: str,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None

Create a new client role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def create_client_role(
    self,
    client_id: str,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None:
    """Create a new client role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.create_realm abstractmethod

create_realm(
    realm_name: str, skip_exists: bool = True, **kwargs: Any
) -> dict[str, Any] | None

Create a new Keycloak realm.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def create_realm(self, realm_name: str, skip_exists: bool = True, **kwargs: Any) -> dict[str, Any] | None:
    """Create a new Keycloak realm."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_realm abstractmethod

get_realm(realm_name: str) -> dict[str, Any] | None

Get realm details by realm name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_realm(self, realm_name: str) -> dict[str, Any] | None:
    """Get realm details by realm name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.update_realm abstractmethod

update_realm(
    realm_name: str, **kwargs: Any
) -> dict[str, Any] | None

Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled).

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def update_realm(self, realm_name: str, **kwargs: Any) -> dict[str, Any] | None:
    """Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled)."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.create_client abstractmethod

create_client(
    client_id: str,
    realm: str | None = None,
    skip_exists: bool = True,
    **kwargs: Any,
) -> dict[str, Any] | None

Create a new client in the specified realm.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def create_client(
    self,
    client_id: str,
    realm: str | None = None,
    skip_exists: bool = True,
    **kwargs: Any,
) -> dict[str, Any] | None:
    """Create a new client in the specified realm."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.add_realm_roles_to_composite abstractmethod

add_realm_roles_to_composite(
    composite_role_name: str, child_role_names: list[str]
) -> None

Add realm roles to a composite role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def add_realm_roles_to_composite(self, composite_role_name: str, child_role_names: list[str]) -> None:
    """Add realm roles to a composite role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.add_client_roles_to_composite abstractmethod

add_client_roles_to_composite(
    composite_role_name: str,
    client_id: str,
    child_role_names: list[str],
) -> None

Add client roles to a composite role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def add_client_roles_to_composite(
    self,
    composite_role_name: str,
    client_id: str,
    child_role_names: list[str],
) -> None:
    """Add client roles to a composite role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_composite_realm_roles abstractmethod

get_composite_realm_roles(
    role_name: str,
) -> list[dict[str, Any]] | None

Get composite roles for a realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_composite_realm_roles(self, role_name: str) -> list[dict[str, Any]] | None:
    """Get composite roles for a realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_organizations abstractmethod

get_organizations(
    query: dict | None = None,
) -> list[KeycloakOrganizationType]

Fetch all organizations. Returns list of OrganizationRepresentation, filtered by query.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_organizations(self, query: dict | None = None) -> list[KeycloakOrganizationType]:
    """Fetch all organizations. Returns list of OrganizationRepresentation, filtered by query."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_organization abstractmethod

get_organization(
    organization_id: str,
) -> KeycloakOrganizationType

Get representation of the organization by ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_organization(self, organization_id: str) -> KeycloakOrganizationType:
    """Get representation of the organization by ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.create_organization abstractmethod

create_organization(
    name: str, alias: str, **kwargs: Any
) -> str | None

Create a new organization. Name and alias must be unique. Returns org_id.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def create_organization(self, name: str, alias: str, **kwargs: Any) -> str | None:
    """Create a new organization. Name and alias must be unique. Returns org_id."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.update_organization abstractmethod

update_organization(
    organization_id: str, **kwargs: Any
) -> dict[str, Any]

Update an existing organization. Kwargs are organization attributes (e.g. name, alias).

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def update_organization(self, organization_id: str, **kwargs: Any) -> dict[str, Any]:
    """Update an existing organization. Kwargs are organization attributes (e.g. name, alias)."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.delete_organization abstractmethod

delete_organization(organization_id: str) -> dict[str, Any]

Delete an organization.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def delete_organization(self, organization_id: str) -> dict[str, Any]:
    """Delete an organization."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_organization_idps abstractmethod

get_organization_idps(
    organization_id: str,
) -> list[dict[str, Any]]

Get IDPs by organization id.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_organization_idps(self, organization_id: str) -> list[dict[str, Any]]:
    """Get IDPs by organization id."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_organizations abstractmethod

get_user_organizations(
    user_id: str,
) -> list[KeycloakOrganizationType]

Get organizations by user id. Returns list of organizations the user is member of.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_organizations(self, user_id: str) -> list[KeycloakOrganizationType]:
    """Get organizations by user id. Returns list of organizations the user is member of."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_organization_members abstractmethod

get_organization_members(
    organization_id: str, query: dict | None = None
) -> list[dict[str, Any]]

Get members by organization id, optionally filtered by query parameters.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_organization_members(self, organization_id: str, query: dict | None = None) -> list[dict[str, Any]]:
    """Get members by organization id, optionally filtered by query parameters."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_organization_members_count abstractmethod

get_organization_members_count(organization_id: str) -> int

Get the number of members in the organization.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_organization_members_count(self, organization_id: str) -> int:
    """Get the number of members in the organization."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.organization_user_add abstractmethod

organization_user_add(
    user_id: str, organization_id: str
) -> bytes

Add a user to an organization.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def organization_user_add(self, user_id: str, organization_id: str) -> bytes:
    """Add a user to an organization."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.organization_user_remove abstractmethod

organization_user_remove(
    user_id: str, organization_id: str
) -> dict[str, Any]

Remove a user from an organization.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def organization_user_remove(self, user_id: str, organization_id: str) -> dict[str, Any]:
    """Remove a user from an organization."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort

Asynchronous interface for Keycloak operations providing a standardized access pattern.

This interface defines the contract for async Keycloak adapters, ensuring consistent implementation of Keycloak operations across different adapters. It covers essential functionality including authentication, user management, and role management.

Source code in archipy/adapters/keycloak/ports.py
class AsyncKeycloakPort:
    """Asynchronous interface for Keycloak operations providing a standardized access pattern.

    This interface defines the contract for async Keycloak adapters, ensuring consistent
    implementation of Keycloak operations across different adapters. It covers essential
    functionality including authentication, user management, and role management.
    """

    # Token Operations
    @abstractmethod
    async def get_token(self, username: str, password: str) -> KeycloakTokenType | None:
        """Get a user token by username and password."""
        raise NotImplementedError

    @abstractmethod
    async def refresh_token(self, refresh_token: str) -> KeycloakTokenType | None:
        """Refresh an existing token using a refresh token."""
        raise NotImplementedError

    @abstractmethod
    async def validate_token(self, token: str) -> bool:
        """Validate if a token is still valid."""
        raise NotImplementedError

    @abstractmethod
    async def get_userinfo(self, token: str) -> KeycloakUserType | None:
        """Get user information from a token."""
        raise NotImplementedError

    @abstractmethod
    async def get_token_info(self, token: str) -> dict[str, Any] | None:
        """Decode token to get its claims."""
        raise NotImplementedError

    @abstractmethod
    async def introspect_token(self, token: str) -> dict[str, Any] | None:
        """Introspect token to get detailed information about it."""
        raise NotImplementedError

    @abstractmethod
    async def get_client_credentials_token(self) -> KeycloakTokenType | None:
        """Get token using client credentials."""
        raise NotImplementedError

    @abstractmethod
    async def logout(self, refresh_token: str) -> None:
        """Logout user by invalidating their refresh token."""
        raise NotImplementedError

    # User Operations
    @abstractmethod
    async def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
        """Get user details by user ID."""
        raise NotImplementedError

    @abstractmethod
    async def get_user_by_username(self, username: str) -> KeycloakUserType | None:
        """Get user details by username."""
        raise NotImplementedError

    @abstractmethod
    async def get_user_by_email(self, email: str) -> KeycloakUserType | None:
        """Get user details by email."""
        raise NotImplementedError

    @abstractmethod
    async def create_user(self, user_data: dict[str, Any]) -> str | None:
        """Create a new user in Keycloak."""
        raise NotImplementedError

    @abstractmethod
    async def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
        """Update user details."""
        raise NotImplementedError

    @abstractmethod
    async def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
        """Reset a user's password."""
        raise NotImplementedError

    @abstractmethod
    async def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
        """Search for users by username, email, or name."""
        raise NotImplementedError

    @abstractmethod
    async def clear_user_sessions(self, user_id: str) -> None:
        """Clear all sessions for a user."""
        raise NotImplementedError

    # Role Operations
    @abstractmethod
    async def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
        """Get roles assigned to a user."""
        raise NotImplementedError

    @abstractmethod
    async def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
        """Get client-specific roles assigned to a user."""
        raise NotImplementedError

    @abstractmethod
    async def has_role(self, token: str, role_name: str) -> bool:
        """Check if a user has a specific role."""
        raise NotImplementedError

    @abstractmethod
    async def has_any_of_roles(self, token: str, role_names: frozenset[str]) -> bool:
        """Check if a user has any of the specified roles."""
        raise NotImplementedError

    @abstractmethod
    async def has_all_roles(self, token: str, role_names: frozenset[str]) -> bool:
        """Check if a user has all of the specified roles."""
        raise NotImplementedError

    @abstractmethod
    async def assign_realm_role(self, user_id: str, role_name: str) -> None:
        """Assign a realm role to a user."""
        raise NotImplementedError

    @abstractmethod
    async def remove_realm_role(self, user_id: str, role_name: str) -> None:
        """Remove a realm role from a user."""
        raise NotImplementedError

    @abstractmethod
    async def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Assign a client-specific role to a user."""
        raise NotImplementedError

    @abstractmethod
    async def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Remove a client-specific role from a user."""
        raise NotImplementedError

    @abstractmethod
    async def get_realm_role(self, role_name: str) -> dict[str, Any]:
        """Get realm role."""
        raise NotImplementedError

    @abstractmethod
    async def get_realm_roles(self) -> list[dict[str, Any]]:
        """Get all realm roles."""
        raise NotImplementedError

    @abstractmethod
    async def create_realm_role(
        self,
        role_name: str,
        description: str | None = None,
        skip_exists: bool = True,
    ) -> dict[str, Any] | None:
        """Create a new realm role."""
        raise NotImplementedError

    @abstractmethod
    async def delete_realm_role(self, role_name: str) -> None:
        """Delete a realm role."""
        raise NotImplementedError

    # Client Operations
    @abstractmethod
    async def get_client_id(self, client_name: str) -> str:
        """Get client ID by client name."""
        raise NotImplementedError

    @abstractmethod
    async def get_client_secret(self, client_id: str) -> str:
        """Get client secret."""
        raise NotImplementedError

    @abstractmethod
    async def get_service_account_id(self) -> str:
        """Get service account user ID for the current client."""
        raise NotImplementedError

    # System Operations
    @abstractmethod
    async def get_public_key(self) -> PublicKeyType:
        """Get the public key used to verify tokens."""
        raise NotImplementedError

    @abstractmethod
    async def get_well_known_config(self) -> dict[str, Any]:
        """Get the well-known OpenID configuration."""
        raise NotImplementedError

    @abstractmethod
    async def get_certs(self) -> dict[str, Any]:
        """Get the JWT verification certificates."""
        raise NotImplementedError

    # Authorization
    @abstractmethod
    async def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType | None:
        """Exchange authorization code for token."""
        raise NotImplementedError

    @abstractmethod
    async def check_permissions(self, token: str, resource: str, scope: str) -> bool:
        """Check if a user has permission to access a resource with the specified scope."""
        raise NotImplementedError

    @abstractmethod
    async def delete_user(self, user_id: str) -> None:
        """Delete a user from Keycloak by their ID."""
        raise NotImplementedError

    @abstractmethod
    async def create_client_role(
        self,
        client_id: str,
        role_name: str,
        description: str | None = None,
        skip_exists: bool = True,
    ) -> dict[str, Any] | None:
        """Create a new client role."""
        raise NotImplementedError

    @abstractmethod
    async def create_realm(self, realm_name: str, skip_exists: bool = True, **kwargs: Any) -> dict[str, Any] | None:
        """Create a new Keycloak realm."""
        raise NotImplementedError

    @abstractmethod
    async def get_realm(self, realm_name: str) -> dict[str, Any] | None:
        """Get realm details by realm name."""
        raise NotImplementedError

    @abstractmethod
    async def update_realm(self, realm_name: str, **kwargs: Any) -> dict[str, Any] | None:
        """Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled)."""
        raise NotImplementedError

    @abstractmethod
    async def create_client(
        self,
        client_id: str,
        realm: str | None = None,
        skip_exists: bool = True,
        **kwargs: Any,
    ) -> dict[str, Any] | None:
        """Create a new client in the specified realm."""
        raise NotImplementedError

    @abstractmethod
    async def add_realm_roles_to_composite(self, composite_role_name: str, child_role_names: list[str]) -> None:
        """Add realm roles to a composite role."""
        raise NotImplementedError

    @abstractmethod
    async def add_client_roles_to_composite(
        self,
        composite_role_name: str,
        client_id: str,
        child_role_names: list[str],
    ) -> None:
        """Add client roles to a composite role."""
        raise NotImplementedError

    @abstractmethod
    async def get_composite_realm_roles(self, role_name: str) -> list[dict[str, Any]] | None:
        """Get composite roles for a realm role."""
        raise NotImplementedError

    # Organization Operations
    @abstractmethod
    async def get_organizations(self, query: dict | None = None) -> list[KeycloakOrganizationType]:
        """Fetch all organizations. Returns list of OrganizationRepresentation, filtered by query."""
        raise NotImplementedError

    @abstractmethod
    async def get_organization(self, organization_id: str) -> KeycloakOrganizationType:
        """Get representation of the organization by ID."""
        raise NotImplementedError

    @abstractmethod
    async def create_organization(self, name: str, alias: str, **kwargs: Any) -> str | None:
        """Create a new organization. Name and alias must be unique. Returns org_id."""
        raise NotImplementedError

    @abstractmethod
    async def update_organization(self, organization_id: str, **kwargs: Any) -> dict[str, Any]:
        """Update an existing organization. Kwargs are organization attributes (e.g. name, alias)."""
        raise NotImplementedError

    @abstractmethod
    async def delete_organization(self, organization_id: str) -> dict[str, Any]:
        """Delete an organization."""
        raise NotImplementedError

    @abstractmethod
    async def get_organization_idps(self, organization_id: str) -> list[dict[str, Any]]:
        """Get IDPs by organization id."""
        raise NotImplementedError

    @abstractmethod
    async def get_user_organizations(self, user_id: str) -> list[KeycloakOrganizationType]:
        """Get organizations by user id. Returns list of organizations the user is member of."""
        raise NotImplementedError

    @abstractmethod
    async def get_organization_members(self, organization_id: str, query: dict | None = None) -> list[dict[str, Any]]:
        """Get members by organization id, optionally filtered by query parameters."""
        raise NotImplementedError

    @abstractmethod
    async def get_organization_members_count(self, organization_id: str) -> int:
        """Get the number of members in the organization."""
        raise NotImplementedError

    @abstractmethod
    async def organization_user_add(self, user_id: str, organization_id: str) -> bytes:
        """Add a user to an organization."""
        raise NotImplementedError

    @abstractmethod
    async def organization_user_remove(self, user_id: str, organization_id: str) -> dict[str, Any]:
        """Remove a user from an organization."""
        raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_token abstractmethod async

get_token(
    username: str, password: str
) -> KeycloakTokenType | None

Get a user token by username and password.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_token(self, username: str, password: str) -> KeycloakTokenType | None:
    """Get a user token by username and password."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.refresh_token abstractmethod async

refresh_token(
    refresh_token: str,
) -> KeycloakTokenType | None

Refresh an existing token using a refresh token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def refresh_token(self, refresh_token: str) -> KeycloakTokenType | None:
    """Refresh an existing token using a refresh token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.validate_token abstractmethod async

validate_token(token: str) -> bool

Validate if a token is still valid.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def validate_token(self, token: str) -> bool:
    """Validate if a token is still valid."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_userinfo abstractmethod async

get_userinfo(token: str) -> KeycloakUserType | None

Get user information from a token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_userinfo(self, token: str) -> KeycloakUserType | None:
    """Get user information from a token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_token_info abstractmethod async

get_token_info(token: str) -> dict[str, Any] | None

Decode token to get its claims.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_token_info(self, token: str) -> dict[str, Any] | None:
    """Decode token to get its claims."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.introspect_token abstractmethod async

introspect_token(token: str) -> dict[str, Any] | None

Introspect token to get detailed information about it.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def introspect_token(self, token: str) -> dict[str, Any] | None:
    """Introspect token to get detailed information about it."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_client_credentials_token abstractmethod async

get_client_credentials_token() -> KeycloakTokenType | None

Get token using client credentials.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_client_credentials_token(self) -> KeycloakTokenType | None:
    """Get token using client credentials."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.logout abstractmethod async

logout(refresh_token: str) -> None

Logout user by invalidating their refresh token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def logout(self, refresh_token: str) -> None:
    """Logout user by invalidating their refresh token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_by_id abstractmethod async

get_user_by_id(user_id: str) -> KeycloakUserType | None

Get user details by user ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
    """Get user details by user ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_by_username abstractmethod async

get_user_by_username(
    username: str,
) -> KeycloakUserType | None

Get user details by username.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_by_username(self, username: str) -> KeycloakUserType | None:
    """Get user details by username."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_by_email abstractmethod async

get_user_by_email(email: str) -> KeycloakUserType | None

Get user details by email.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_by_email(self, email: str) -> KeycloakUserType | None:
    """Get user details by email."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.create_user abstractmethod async

create_user(user_data: dict[str, Any]) -> str | None

Create a new user in Keycloak.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def create_user(self, user_data: dict[str, Any]) -> str | None:
    """Create a new user in Keycloak."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.update_user abstractmethod async

update_user(
    user_id: str, user_data: dict[str, Any]
) -> None

Update user details.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
    """Update user details."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.reset_password abstractmethod async

reset_password(
    user_id: str, password: str, temporary: bool = False
) -> None

Reset a user's password.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
    """Reset a user's password."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.search_users abstractmethod async

search_users(
    query: str, max_results: int = 100
) -> list[KeycloakUserType]

Search for users by username, email, or name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
    """Search for users by username, email, or name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.clear_user_sessions abstractmethod async

clear_user_sessions(user_id: str) -> None

Clear all sessions for a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def clear_user_sessions(self, user_id: str) -> None:
    """Clear all sessions for a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_roles abstractmethod async

get_user_roles(user_id: str) -> list[KeycloakRoleType]

Get roles assigned to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
    """Get roles assigned to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_client_roles_for_user abstractmethod async

get_client_roles_for_user(
    user_id: str, client_id: str
) -> list[KeycloakRoleType]

Get client-specific roles assigned to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
    """Get client-specific roles assigned to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.has_role abstractmethod async

has_role(token: str, role_name: str) -> bool

Check if a user has a specific role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def has_role(self, token: str, role_name: str) -> bool:
    """Check if a user has a specific role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.has_any_of_roles abstractmethod async

has_any_of_roles(
    token: str, role_names: frozenset[str]
) -> bool

Check if a user has any of the specified roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def has_any_of_roles(self, token: str, role_names: frozenset[str]) -> bool:
    """Check if a user has any of the specified roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.has_all_roles abstractmethod async

has_all_roles(
    token: str, role_names: frozenset[str]
) -> bool

Check if a user has all of the specified roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def has_all_roles(self, token: str, role_names: frozenset[str]) -> bool:
    """Check if a user has all of the specified roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.assign_realm_role abstractmethod async

assign_realm_role(user_id: str, role_name: str) -> None

Assign a realm role to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def assign_realm_role(self, user_id: str, role_name: str) -> None:
    """Assign a realm role to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.remove_realm_role abstractmethod async

remove_realm_role(user_id: str, role_name: str) -> None

Remove a realm role from a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def remove_realm_role(self, user_id: str, role_name: str) -> None:
    """Remove a realm role from a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.assign_client_role abstractmethod async

assign_client_role(
    user_id: str, client_id: str, role_name: str
) -> None

Assign a client-specific role to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Assign a client-specific role to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.remove_client_role abstractmethod async

remove_client_role(
    user_id: str, client_id: str, role_name: str
) -> None

Remove a client-specific role from a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Remove a client-specific role from a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_realm_role abstractmethod async

get_realm_role(role_name: str) -> dict[str, Any]

Get realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_realm_role(self, role_name: str) -> dict[str, Any]:
    """Get realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_realm_roles abstractmethod async

get_realm_roles() -> list[dict[str, Any]]

Get all realm roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_realm_roles(self) -> list[dict[str, Any]]:
    """Get all realm roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.create_realm_role abstractmethod async

create_realm_role(
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None

Create a new realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def create_realm_role(
    self,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None:
    """Create a new realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.delete_realm_role abstractmethod async

delete_realm_role(role_name: str) -> None

Delete a realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def delete_realm_role(self, role_name: str) -> None:
    """Delete a realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_client_id abstractmethod async

get_client_id(client_name: str) -> str

Get client ID by client name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_client_id(self, client_name: str) -> str:
    """Get client ID by client name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_client_secret abstractmethod async

get_client_secret(client_id: str) -> str

Get client secret.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_client_secret(self, client_id: str) -> str:
    """Get client secret."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_service_account_id abstractmethod async

get_service_account_id() -> str

Get service account user ID for the current client.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_service_account_id(self) -> str:
    """Get service account user ID for the current client."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_public_key abstractmethod async

get_public_key() -> PublicKeyType

Get the public key used to verify tokens.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_public_key(self) -> PublicKeyType:
    """Get the public key used to verify tokens."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_well_known_config abstractmethod async

get_well_known_config() -> dict[str, Any]

Get the well-known OpenID configuration.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_well_known_config(self) -> dict[str, Any]:
    """Get the well-known OpenID configuration."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_certs abstractmethod async

get_certs() -> dict[str, Any]

Get the JWT verification certificates.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_certs(self) -> dict[str, Any]:
    """Get the JWT verification certificates."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_token_from_code abstractmethod async

get_token_from_code(
    code: str, redirect_uri: str
) -> KeycloakTokenType | None

Exchange authorization code for token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType | None:
    """Exchange authorization code for token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.check_permissions abstractmethod async

check_permissions(
    token: str, resource: str, scope: str
) -> bool

Check if a user has permission to access a resource with the specified scope.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def check_permissions(self, token: str, resource: str, scope: str) -> bool:
    """Check if a user has permission to access a resource with the specified scope."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.delete_user abstractmethod async

delete_user(user_id: str) -> None

Delete a user from Keycloak by their ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def delete_user(self, user_id: str) -> None:
    """Delete a user from Keycloak by their ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.create_client_role abstractmethod async

create_client_role(
    client_id: str,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None

Create a new client role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def create_client_role(
    self,
    client_id: str,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None:
    """Create a new client role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.create_realm abstractmethod async

create_realm(
    realm_name: str, skip_exists: bool = True, **kwargs: Any
) -> dict[str, Any] | None

Create a new Keycloak realm.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def create_realm(self, realm_name: str, skip_exists: bool = True, **kwargs: Any) -> dict[str, Any] | None:
    """Create a new Keycloak realm."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_realm abstractmethod async

get_realm(realm_name: str) -> dict[str, Any] | None

Get realm details by realm name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_realm(self, realm_name: str) -> dict[str, Any] | None:
    """Get realm details by realm name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.update_realm abstractmethod async

update_realm(
    realm_name: str, **kwargs: Any
) -> dict[str, Any] | None

Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled).

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def update_realm(self, realm_name: str, **kwargs: Any) -> dict[str, Any] | None:
    """Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled)."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.create_client abstractmethod async

create_client(
    client_id: str,
    realm: str | None = None,
    skip_exists: bool = True,
    **kwargs: Any,
) -> dict[str, Any] | None

Create a new client in the specified realm.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def create_client(
    self,
    client_id: str,
    realm: str | None = None,
    skip_exists: bool = True,
    **kwargs: Any,
) -> dict[str, Any] | None:
    """Create a new client in the specified realm."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.add_realm_roles_to_composite abstractmethod async

add_realm_roles_to_composite(
    composite_role_name: str, child_role_names: list[str]
) -> None

Add realm roles to a composite role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def add_realm_roles_to_composite(self, composite_role_name: str, child_role_names: list[str]) -> None:
    """Add realm roles to a composite role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.add_client_roles_to_composite abstractmethod async

add_client_roles_to_composite(
    composite_role_name: str,
    client_id: str,
    child_role_names: list[str],
) -> None

Add client roles to a composite role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def add_client_roles_to_composite(
    self,
    composite_role_name: str,
    client_id: str,
    child_role_names: list[str],
) -> None:
    """Add client roles to a composite role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_composite_realm_roles abstractmethod async

get_composite_realm_roles(
    role_name: str,
) -> list[dict[str, Any]] | None

Get composite roles for a realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_composite_realm_roles(self, role_name: str) -> list[dict[str, Any]] | None:
    """Get composite roles for a realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_organizations abstractmethod async

get_organizations(
    query: dict | None = None,
) -> list[KeycloakOrganizationType]

Fetch all organizations. Returns list of OrganizationRepresentation, filtered by query.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_organizations(self, query: dict | None = None) -> list[KeycloakOrganizationType]:
    """Fetch all organizations. Returns list of OrganizationRepresentation, filtered by query."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_organization abstractmethod async

get_organization(
    organization_id: str,
) -> KeycloakOrganizationType

Get representation of the organization by ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_organization(self, organization_id: str) -> KeycloakOrganizationType:
    """Get representation of the organization by ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.create_organization abstractmethod async

create_organization(
    name: str, alias: str, **kwargs: Any
) -> str | None

Create a new organization. Name and alias must be unique. Returns org_id.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def create_organization(self, name: str, alias: str, **kwargs: Any) -> str | None:
    """Create a new organization. Name and alias must be unique. Returns org_id."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.update_organization abstractmethod async

update_organization(
    organization_id: str, **kwargs: Any
) -> dict[str, Any]

Update an existing organization. Kwargs are organization attributes (e.g. name, alias).

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def update_organization(self, organization_id: str, **kwargs: Any) -> dict[str, Any]:
    """Update an existing organization. Kwargs are organization attributes (e.g. name, alias)."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.delete_organization abstractmethod async

delete_organization(organization_id: str) -> dict[str, Any]

Delete an organization.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def delete_organization(self, organization_id: str) -> dict[str, Any]:
    """Delete an organization."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_organization_idps abstractmethod async

get_organization_idps(
    organization_id: str,
) -> list[dict[str, Any]]

Get IDPs by organization id.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_organization_idps(self, organization_id: str) -> list[dict[str, Any]]:
    """Get IDPs by organization id."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_organizations abstractmethod async

get_user_organizations(
    user_id: str,
) -> list[KeycloakOrganizationType]

Get organizations by user id. Returns list of organizations the user is member of.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_organizations(self, user_id: str) -> list[KeycloakOrganizationType]:
    """Get organizations by user id. Returns list of organizations the user is member of."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_organization_members abstractmethod async

get_organization_members(
    organization_id: str, query: dict | None = None
) -> list[dict[str, Any]]

Get members by organization id, optionally filtered by query parameters.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_organization_members(self, organization_id: str, query: dict | None = None) -> list[dict[str, Any]]:
    """Get members by organization id, optionally filtered by query parameters."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_organization_members_count abstractmethod async

get_organization_members_count(organization_id: str) -> int

Get the number of members in the organization.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_organization_members_count(self, organization_id: str) -> int:
    """Get the number of members in the organization."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.organization_user_add abstractmethod async

organization_user_add(
    user_id: str, organization_id: str
) -> bytes

Add a user to an organization.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def organization_user_add(self, user_id: str, organization_id: str) -> bytes:
    """Add a user to an organization."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.organization_user_remove abstractmethod async

organization_user_remove(
    user_id: str, organization_id: str
) -> dict[str, Any]

Remove a user from an organization.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def organization_user_remove(self, user_id: str, organization_id: str) -> dict[str, Any]:
    """Remove a user from an organization."""
    raise NotImplementedError

options: show_root_toc_entry: false heading_level: 3

Adapters

Concrete Keycloak adapter wrapping the Keycloak REST API for authentication and authorization operations.

archipy.adapters.keycloak.adapters.logger module-attribute

logger = getLogger(__name__)

archipy.adapters.keycloak.adapters.KeycloakExceptionHandlerMixin

Mixin class to handle Keycloak exceptions in a consistent way.

Source code in archipy/adapters/keycloak/adapters.py
class KeycloakExceptionHandlerMixin:
    """Mixin class to handle Keycloak exceptions in a consistent way."""

    @classmethod
    def _extract_error_message(cls, exception: KeycloakError) -> str:
        """Extract the actual error message from Keycloak error response.

        Args:
            exception: The Keycloak exception

        Returns:
            str: The extracted error message
        """
        error_message = str(exception)

        # Try to parse JSON response body
        if hasattr(exception, "response_body") and exception.response_body:
            try:
                body = exception.response_body
                if isinstance(body, bytes):
                    body_str = body.decode("utf-8")
                elif isinstance(body, str):
                    body_str = body
                else:
                    body_str = str(body)

                parsed = json.loads(body_str)
                if isinstance(parsed, dict):
                    error_message = (
                        parsed.get("errorMessage")
                        or parsed.get("error_description")
                        or parsed.get("error")
                        or error_message
                    )
            except json.JSONDecodeError, UnicodeDecodeError:
                pass

        return error_message

    @classmethod
    def _handle_keycloak_exception(cls, exception: KeycloakError, operation: str) -> NoReturn:
        """Handle Keycloak exceptions and map them to appropriate application errors.

        Args:
            exception: The original Keycloak exception
            operation: The name of the operation that failed

        Raises:
            Various application-specific errors based on the exception type/content
        """
        error_message = cls._extract_error_message(exception)
        response_code = getattr(exception, "response_code", None)
        error_lower = error_message.lower()

        # Common context data
        additional_data = {
            "operation": operation,
            "original_error": error_message,
            "response_code": response_code,
            "keycloak_error_type": type(exception).__name__,
        }

        # Connection and network errors
        if isinstance(exception, KeycloakConnectionError):
            if "timeout" in error_lower:
                raise KeycloakConnectionTimeoutError(additional_data=additional_data) from exception
            raise KeycloakServiceUnavailableError(additional_data=additional_data) from exception

        # Authentication errors
        if isinstance(exception, KeycloakAuthenticationError) or any(
            phrase in error_lower
            for phrase in ["invalid user credentials", "invalid credentials", "authentication failed", "unauthorized"]
        ):
            raise InvalidCredentialsError(additional_data=additional_data) from exception

        # Resource already exists errors
        if "already exists" in error_lower:
            if "realm" in error_lower:
                raise RealmAlreadyExistsError(additional_data=additional_data) from exception
            elif "user exists with same" in error_lower:
                raise UserAlreadyExistsError(additional_data=additional_data) from exception
            elif "client" in error_lower:
                raise ClientAlreadyExistsError(additional_data=additional_data) from exception
            elif "role" in error_lower:
                raise RoleAlreadyExistsError(additional_data=additional_data) from exception

        # Not found errors
        if "not found" in error_lower:
            raise ResourceNotFoundError(additional_data=additional_data) from exception

        # Permission errors
        if any(
            phrase in error_lower
            for phrase in ["forbidden", "access denied", "insufficient permissions", "insufficient scope"]
        ):
            raise InsufficientPermissionsError(additional_data=additional_data) from exception

        # Password policy errors
        if any(
            phrase in error_lower
            for phrase in ["invalid password", "password policy", "minimum length", "password must"]
        ):
            raise PasswordPolicyError(additional_data=additional_data) from exception

        # Validation errors (400 status codes that don't match above)
        if response_code == 400 or any(
            phrase in error_lower for phrase in ["validation", "invalid", "required field", "bad request"]
        ):
            raise ValidationError(additional_data=additional_data) from exception

        # Service unavailable
        if response_code in [503, 504] or "unavailable" in error_lower:
            raise KeycloakServiceUnavailableError(additional_data=additional_data) from exception

        # Default to InternalError for unrecognized errors
        raise InternalError(additional_data=additional_data) from exception

    @classmethod
    def _handle_realm_exception(
        cls,
        exception: KeycloakError,
        operation: str,
        realm_name: str | None = None,
    ) -> NoReturn:
        """Handle realm-specific exceptions.

        Args:
            exception: The original Keycloak exception
            operation: The name of the operation that failed
            realm_name: The realm name involved in the operation

        Raises:
            RealmAlreadyExistsError: If realm already exists
            Various other errors from _handle_keycloak_exception
        """
        # Add realm-specific context
        error_message = cls._extract_error_message(exception)

        # Realm-specific error handling
        if realm_name and "already exists" in error_message.lower():
            additional_data = {
                "operation": operation,
                "realm_name": realm_name,
                "original_error": error_message,
                "response_code": getattr(exception, "response_code", None),
            }
            raise RealmAlreadyExistsError(additional_data=additional_data) from exception

        # Fall back to general Keycloak error handling
        cls._handle_keycloak_exception(exception, operation)

    @classmethod
    def _handle_user_exception(
        cls,
        exception: KeycloakError,
        operation: str,
        user_data: dict | None = None,
    ) -> NoReturn:
        """Handle user-specific exceptions.

        Args:
            exception: The original Keycloak exception
            operation: The name of the operation that failed
            user_data: The user data involved in the operation

        Raises:
            UserAlreadyExistsError: If user already exists
            Various other errors from _handle_keycloak_exception
        """
        error_message = cls._extract_error_message(exception)

        # User-specific error handling
        if "user exists with same" in error_message.lower():
            additional_data = {
                "operation": operation,
                "original_error": error_message,
                "response_code": getattr(exception, "response_code", None),
            }
            if user_data:
                additional_data.update(
                    {
                        "username": user_data.get("username"),
                        "email": user_data.get("email"),
                    },
                )
            raise UserAlreadyExistsError(additional_data=additional_data) from exception

        # Fall back to general Keycloak error handling
        cls._handle_keycloak_exception(exception, operation)

    @classmethod
    def _handle_client_exception(
        cls,
        exception: KeycloakError,
        operation: str,
        client_data: dict | None = None,
    ) -> None:
        """Handle client-specific exceptions.

        Args:
            exception: The original Keycloak exception
            operation: The name of the operation that failed
            client_data: The client data involved in the operation

        Raises:
            ClientAlreadyExistsError: If client already exists
            Various other errors from _handle_keycloak_exception
        """
        error_message = cls._extract_error_message(exception)

        # Client-specific error handling
        if "client" in error_message.lower() and "already exists" in error_message.lower():
            additional_data = {
                "operation": operation,
                "original_error": error_message,
                "response_code": getattr(exception, "response_code", None),
            }
            if client_data:
                additional_data.update(
                    {
                        "client_id": client_data.get("clientId"),
                        "client_name": client_data.get("name"),
                    },
                )
            raise ClientAlreadyExistsError(additional_data=additional_data) from exception

        # Fall back to general Keycloak error handling
        cls._handle_keycloak_exception(exception, operation)

archipy.adapters.keycloak.adapters.KeycloakAdapter

Bases: KeycloakPort, KeycloakExceptionHandlerMixin

Concrete implementation of the KeycloakPort interface using python-keycloak library.

This implementation includes TTL caching for appropriate operations to improve performance while ensuring cache entries expire after a configured time to prevent stale data.

Source code in archipy/adapters/keycloak/adapters.py
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
class KeycloakAdapter(KeycloakPort, KeycloakExceptionHandlerMixin):
    """Concrete implementation of the KeycloakPort interface using python-keycloak library.

    This implementation includes TTL caching for appropriate operations to improve performance
    while ensuring cache entries expire after a configured time to prevent stale data.
    """

    def __init__(self, keycloak_configs: KeycloakConfig | None = None) -> None:
        """Initialize KeycloakAdapter with configuration.

        Args:
            keycloak_configs: Optional Keycloak configuration. If None, global config is used.
        """
        self.configs: KeycloakConfig = (
            BaseConfig.global_config().KEYCLOAK if keycloak_configs is None else keycloak_configs
        )

        # Initialize the OpenID client for authentication
        self._openid_adapter = self._get_openid_client(self.configs)

        # Cache for admin client to avoid unnecessary re-authentication
        self._admin_adapter: KeycloakAdmin | None = None
        self._admin_token_expiry: float = 0.0

        # Initialize admin client if admin mode is enabled and credentials are provided
        if self.configs.IS_ADMIN_MODE_ENABLED and (
            self.configs.CLIENT_SECRET_KEY or (self.configs.ADMIN_USERNAME and self.configs.ADMIN_PASSWORD)
        ):
            self._initialize_admin_client()

    def clear_all_caches(self) -> None:
        """Clear all cached values."""
        for attr_name in dir(self):
            attr = getattr(self, attr_name)
            if hasattr(attr, "clear_cache"):
                attr.clear_cache()

    @staticmethod
    def _get_openid_client(configs: KeycloakConfig) -> KeycloakOpenID:
        """Create and configure a KeycloakOpenID instance.

        Args:
            configs: Keycloak configuration

        Returns:
            Configured KeycloakOpenID client
        """
        server_url = configs.SERVER_URL
        client_id = configs.CLIENT_ID
        if not server_url or not client_id:
            raise ValueError("SERVER_URL and CLIENT_ID must be provided")
        return KeycloakOpenID(
            server_url=server_url,
            client_id=client_id,
            realm_name=configs.REALM_NAME,
            client_secret_key=configs.CLIENT_SECRET_KEY,
            verify=configs.VERIFY_SSL,
            timeout=configs.TIMEOUT,
        )

    def _initialize_admin_client(self) -> None:
        """Initialize or refresh the admin client."""
        try:
            # Check if admin credentials are available
            if self.configs.ADMIN_USERNAME and self.configs.ADMIN_PASSWORD:
                # Create admin client using admin credentials
                self._admin_adapter = KeycloakAdmin(
                    server_url=self.configs.SERVER_URL,
                    username=self.configs.ADMIN_USERNAME,
                    password=self.configs.ADMIN_PASSWORD,
                    realm_name=self.configs.REALM_NAME,
                    user_realm_name=self.configs.ADMIN_REALM_NAME,
                    verify=self.configs.VERIFY_SSL,
                    timeout=self.configs.TIMEOUT,
                )
                # Since we're using direct credentials, set a long expiry time
                self._admin_token_expiry = time.time() + 3600  # 1 hour
                logger.debug("Admin client initialized with admin credentials")

            elif self.configs.CLIENT_SECRET_KEY:
                # Get token using client credentials
                token = self._openid_adapter.token(grant_type="client_credentials")

                # Set token expiry time (current time + expires_in - buffer)
                # Using a 30-second buffer to ensure we refresh before expiration
                self._admin_token_expiry = time.time() + token.get("expires_in", 60) - 30

                self._admin_adapter = KeycloakAdmin(
                    server_url=self.configs.SERVER_URL,
                    realm_name=self.configs.REALM_NAME,
                    token=token,
                    verify=self.configs.VERIFY_SSL,
                    timeout=self.configs.TIMEOUT,
                )
                logger.debug("Admin client initialized with client credentials")

            else:
                raise UnauthenticatedError(
                    additional_data={"detail": "Neither admin credentials nor client secret provided"},
                )

        except KeycloakAuthenticationError as e:
            self._admin_adapter = None
            self._admin_token_expiry = 0
            raise UnauthenticatedError(
                additional_data={"detail": "Failed to authenticate with Keycloak service account"},
            ) from e
        except KeycloakConnectionError as e:
            self._admin_adapter = None
            self._admin_token_expiry = 0
            raise ConnectionTimeoutError("Failed to connect to Keycloak server") from e
        except KeycloakError as e:
            self._admin_adapter = None
            self._admin_token_expiry = 0
            self._handle_keycloak_exception(e, "_initialize_admin_client")

    @property
    def admin_adapter(self) -> KeycloakAdmin:
        """Get the admin adapter, refreshing it if necessary.

        Returns:
            KeycloakAdmin instance

        Raises:
            UnauthenticatedError: If admin client is not available due to authentication issues
            UnavailableError: If Keycloak service is unavailable
        """
        if not self.configs.IS_ADMIN_MODE_ENABLED or not (
            self.configs.CLIENT_SECRET_KEY or (self.configs.ADMIN_USERNAME and self.configs.ADMIN_PASSWORD)
        ):
            raise UnauthenticatedError(
                additional_data={
                    "data": "Admin mode is disabled or neither admin credentials nor client secret provided",
                },
            )

        # Check if token is about to expire and refresh if needed
        if self._admin_adapter is None or time.time() >= self._admin_token_expiry:
            self._initialize_admin_client()

        if self._admin_adapter is None:
            raise UnavailableError("Keycloak admin client is not available")

        return self._admin_adapter

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour, public key rarely changes
    def get_public_key(self) -> PublicKeyType:
        """Get the public key used to verify tokens.

        Returns:
            JWK key object used to verify signatures

        Raises:
            ServiceUnavailableError: If Keycloak service is unavailable
            InternalError: If there's an internal error processing the public key
        """
        try:
            from jwcrypto import jwk

            keys_info = self._openid_adapter.public_key()
            key = f"-----BEGIN PUBLIC KEY-----\n{keys_info}\n-----END PUBLIC KEY-----"
            return jwk.JWK.from_pem(key.encode("utf-8"))
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_public_key")
        except Exception as e:
            raise InternalError(additional_data={"operation": "get_public_key", "error": str(e)}) from e

    @override
    def get_token(self, username: str, password: str) -> KeycloakTokenType | None:
        """Get a user token by username and password using the Resource Owner Password Credentials Grant.

        Warning:
            This method uses the direct password grant flow, which is less secure and not recommended
            for user login in production environments. Instead, prefer the web-based OAuth 2.0
            Authorization Code Flow (use `get_token_from_code`) for secure authentication.
            Use this method only for testing, administrative tasks, or specific service accounts
            where direct credential use is acceptable and properly secured.

        Args:
            username: User's username
            password: User's password

        Returns:
            Token response containing access_token, refresh_token, etc.

        Raises:
            InvalidCredentialsError: If username or password is invalid
            ServiceUnavailableError: If Keycloak service is unavailable
        """
        try:
            return self._openid_adapter.token(grant_type="password", username=username, password=password)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_token")

    @override
    def refresh_token(self, refresh_token: str) -> KeycloakTokenType | None:
        """Refresh an existing token using a refresh token.

        Args:
            refresh_token: Refresh token string

        Returns:
            New token response containing access_token, refresh_token, etc.

        Raises:
            InvalidTokenError: If refresh token is invalid or expired
            ServiceUnavailableError: If Keycloak service is unavailable
        """
        try:
            return self._openid_adapter.refresh_token(refresh_token)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "refresh_token")

    @override
    def validate_token(self, token: str) -> bool:
        """Validate if a token is still valid.

        Args:
            token: Access token to validate

        Returns:
            True if token is valid, False otherwise
        """
        # Not caching validation results as tokens are time-sensitive
        try:
            # Let the underlying adapter handle key selection to align with expected types
            self._openid_adapter.decode_token(token)
        except Exception as e:
            logger.debug(f"Token validation failed: {e!s}")
            return False
        else:
            return True

    @override
    def get_userinfo(self, token: str) -> KeycloakUserType | None:
        """Get user information from a token.

        Args:
            token: Access token

        Returns:
            User information

        Raises:
            ValueError: If getting user info fails
        """
        if not self.validate_token(token):
            raise InvalidTokenError()
        try:
            # _get_userinfo_cached returns KeycloakUserType (dict[str, Any])
            # The ttl_cache_decorator loses type info, but runtime behavior is correct
            # Access underlying function for proper typing
            cached_func = self._get_userinfo_cached
            underlying_func = getattr(cached_func, "__wrapped__", None)
            if underlying_func is not None:
                # Call underlying function directly for type checking
                result: KeycloakUserType = underlying_func(self, token)
            else:
                # Fallback to cached version if __wrapped__ not available
                result_raw = cached_func(token)
                if not isinstance(result_raw, dict):
                    return None
                # Type assertion: result_raw is a dict, which matches KeycloakUserType
                # Convert to proper type by creating a new dict with explicit typing
                result: KeycloakUserType = {str(k): v for k, v in result_raw.items()}
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_userinfo")
            return None
        else:
            return result

    @ttl_cache_decorator(ttl_seconds=30, maxsize=100)  # Cache for 30 seconds
    def _get_userinfo_cached(self, token: str) -> KeycloakUserType:
        return self._openid_adapter.userinfo(token)  # type: ignore[return-value]

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
        """Get user details by user ID.

        Args:
            user_id: User's ID

        Returns:
            User details or None if not found

        Raises:
            ValueError: If getting user fails
        """
        try:
            return self.admin_adapter.get_user(user_id)
        except KeycloakGetError as e:
            if e.response_code == 404:
                return None
            self._handle_keycloak_exception(e, "get_user_by_id")
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_by_id")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_user_by_username(self, username: str) -> KeycloakUserType | None:
        """Get user details by username.

        Args:
            username: User's username

        Returns:
            User details or None if not found

        Raises:
            ValueError: If query fails
        """
        try:
            users = self.admin_adapter.get_users({"username": username})
            return users[0] if users else None
        except KeycloakError as e:
            raise InternalError() from e

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_user_by_email(self, email: str) -> KeycloakUserType | None:
        """Get user details by email.

        Args:
            email: User's email

        Returns:
            User details or None if not found

        Raises:
            ValueError: If query fails
        """
        try:
            users = self.admin_adapter.get_users({"email": email})
            return users[0] if users else None
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_by_email")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_user_roles(self, user_id: str) -> list[KeycloakRoleType] | None:
        """Get roles assigned to a user.

        Args:
            user_id: User's ID

        Returns:
            List of roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return self.admin_adapter.get_realm_roles_of_user(user_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_roles")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
        """Get client-specific roles assigned to a user.

        Args:
            user_id: User's ID
            client_id: Client ID

        Returns:
            List of client-specific roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return self.admin_adapter.get_client_roles_of_user(user_id, client_id)
        except KeycloakError as e:
            raise InternalError() from e

    @override
    def create_user(self, user_data: dict[str, Any]) -> str | None:
        """Create a new user in Keycloak.

        Args:
            user_data: User data including username, email, etc.

        Returns:
            ID of the created user

        Raises:
            ValueError: If creating user fails
        """
        # This is a write operation, no caching needed
        try:
            user_id = self.admin_adapter.create_user(user_data)

            # Clear related caches
            self.clear_all_caches()

        except KeycloakError as e:
            self._handle_user_exception(e, "create_user", user_data)
        else:
            return user_id

    @override
    def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
        """Update user details.

        Args:
            user_id: User's ID
            user_data: User data to update

        Raises:
            ValueError: If updating user fails
        """
        # This is a write operation, no caching needed
        try:
            self.admin_adapter.update_user(user_id, user_data)

            # Clear user-related caches
            self.clear_all_caches()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "update_user")

    @override
    def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
        """Reset a user's password.

        Args:
            user_id: User's ID
            password: New password
            temporary: Whether the password is temporary and should be changed on next login

        Raises:
            ValueError: If password reset fails
        """
        # This is a write operation, no caching needed
        try:
            self.admin_adapter.set_user_password(user_id, password, temporary)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "reset_password")

    @override
    def assign_realm_role(self, user_id: str, role_name: str) -> None:
        """Assign a realm role to a user.

        Args:
            user_id: User's ID
            role_name: Role name to assign

        Raises:
            ValueError: If role assignment fails
        """
        # This is a write operation, no caching needed
        try:
            # Get role representation
            role = self.admin_adapter.get_realm_role(role_name)
            # Assign role to user
            self.admin_adapter.assign_realm_roles(user_id, [role])

            # Clear role-related caches
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "assign_realm_role")

    @override
    def remove_realm_role(self, user_id: str, role_name: str) -> None:
        """Remove a realm role from a user.

        Args:
            user_id: User's ID
            role_name: Role name to remove

        Raises:
            ValueError: If role removal fails
        """
        # This is a write operation, no caching needed
        try:
            # Get role representation
            role = self.admin_adapter.get_realm_role(role_name)
            # Remove role from user
            self.admin_adapter.delete_realm_roles_of_user(user_id, [role])

            # Clear role-related caches
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "remove_realm_role")

    @override
    def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Assign a client-specific role to a user.

        Args:
            user_id: User's ID
            client_id: Client ID
            role_name: Role name to assign

        Raises:
            ValueError: If role assignment fails
        """
        # This is a write operation, no caching needed
        try:
            # Get client
            client = self.admin_adapter.get_client_id(client_id)
            if client is None:
                raise ValueError("client_id resolved to None")
            # Get role representation
            # Keycloak admin adapter methods accept these types at runtime
            role = self.admin_adapter.get_client_role(client, role_name)
            # Assign role to user
            self.admin_adapter.assign_client_role(user_id, client, [role])

            # Clear role-related caches
            if hasattr(self.get_client_roles_for_user, "clear_cache"):
                self.get_client_roles_for_user.clear_cache()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "assign_client_role")

    @override
    def create_realm_role(
        self,
        role_name: str,
        description: str | None = None,
        skip_exists: bool = True,
    ) -> dict[str, Any] | None:
        """Create a new realm role.

        Args:
            role_name: Role name
            description: Optional role description
            skip_exists: Skip creation if realm role already exists

        Returns:
            Created role details

        Raises:
            ValueError: If role creation fails
        """
        # This is a write operation, no caching needed
        try:
            role_data = {"name": role_name}
            if description:
                role_data["description"] = description

            self.admin_adapter.create_realm_role(role_data, skip_exists=skip_exists)

            # Clear realm roles cache
            if hasattr(self.get_realm_roles, "clear_cache"):
                self.get_realm_roles.clear_cache()

            return self.admin_adapter.get_realm_role(role_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "create_realm_role")

    @override
    def create_client_role(
        self,
        client_id: str,
        role_name: str,
        description: str | None = None,
        skip_exists: bool = True,
    ) -> dict[str, Any] | None:
        """Create a new client role.

        Args:
            client_id: Client ID or client name
            role_name: Role name
            description: Optional role description
            skip_exists: Skip creation if client role already exists

        Returns:
            Created role details

        Raises:
            ValueError: If role creation fails
        """
        # This is a write operation, no caching needed
        try:
            resolved_client_id = self.admin_adapter.get_client_id(client_id)
            if resolved_client_id is None:
                raise ValueError(f"Client ID not found: {client_id}")

            # Prepare role data
            role_data = {"name": role_name}
            if description:
                role_data["description"] = description

            # Create client role
            self.admin_adapter.create_client_role(resolved_client_id, role_data, skip_exists=skip_exists)

            # Clear related caches if they exist
            if hasattr(self.get_client_roles_for_user, "clear_cache"):
                self.get_client_roles_for_user.clear_cache()

            # Return created role
            return self.admin_adapter.get_client_role(resolved_client_id, role_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "create_client_role")

    @override
    def delete_realm_role(self, role_name: str) -> None:
        """Delete a realm role.

        Args:
            role_name: Role name to delete

        Raises:
            ValueError: If role deletion fails
        """
        # This is a write operation, no caching needed
        try:
            self.admin_adapter.delete_realm_role(role_name)

            # Clear realm roles cache
            if hasattr(self.get_realm_roles, "clear_cache"):
                self.get_realm_roles.clear_cache()

            # We also need to clear user role caches since they might contain this role
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "delete_realm_role")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
    def get_service_account_id(self) -> str | None:
        """Get service account user ID for the current client.

        Returns:
            Service account user ID

        Raises:
            ValueError: If getting service account fails
        """
        try:
            client_id = self.get_client_id(self.configs.CLIENT_ID)
            return self.admin_adapter.get_client_service_account_user(str(client_id)).get("id")
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_service_account_id")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
    def get_well_known_config(self) -> dict[str, Any] | None:
        """Get the well-known OpenID configuration.

        Returns:
            OIDC configuration

        Raises:
            ValueError: If getting configuration fails
        """
        try:
            return self._openid_adapter.well_known()
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_well_known_config")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
    def get_certs(self) -> dict[str, Any] | None:
        """Get the JWT verification certificates.

        Returns:
            Certificate information

        Raises:
            ValueError: If getting certificates fails
        """
        try:
            return self._openid_adapter.certs()
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_certs")

    @override
    def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType | None:
        """Exchange authorization code for token.

        Args:
            code: Authorization code
            redirect_uri: Redirect URI used in authorization request

        Returns:
            Token response

        Raises:
            ValueError: If token exchange fails
        """
        # Authorization codes can only be used once, don't cache
        try:
            return self._openid_adapter.token(grant_type="authorization_code", code=code, redirect_uri=redirect_uri)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_token_from_code")

    @override
    def get_client_credentials_token(self) -> KeycloakTokenType | None:
        """Get token using client credentials.

        Returns:
            Token response

        Raises:
            ValueError: If token acquisition fails
        """
        # Tokens are time-sensitive, don't cache
        try:
            return self._openid_adapter.token(grant_type="client_credentials")
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_client_credentials_token")

    @override
    @ttl_cache_decorator(ttl_seconds=30, maxsize=50)  # Cache for 30 seconds with limited entries
    def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType] | None:
        """Search for users by username, email, or name.

        Args:
            query: Search query
            max_results: Maximum number of results to return

        Returns:
            List of matching users

        Raises:
            ValueError: If search fails
        """
        try:
            # Try searching by different fields
            users = []

            # Search by username
            users.extend(self.admin_adapter.get_users({"username": query, "max": max_results}))

            # Search by email if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                email_users = self.admin_adapter.get_users({"email": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in email_users if user["id"] not in user_ids])

            # Search by firstName if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                first_name_users = self.admin_adapter.get_users({"firstName": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in first_name_users if user["id"] not in user_ids])

            # Search by lastName if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                last_name_users = self.admin_adapter.get_users({"lastName": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in last_name_users if user["id"] not in user_ids])

            return users[:max_results]
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "search_users")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=50)  # Cache for 1 hour
    def get_client_secret(self, client_id: str) -> str | None:
        """Get client secret.

        Args:
            client_id: Client ID

        Returns:
            Client secret

        Raises:
            ValueError: If getting secret fails
        """
        try:
            client = self.admin_adapter.get_client(client_id)
            return client.get("secret", "")
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_client_secret")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=50)  # Cache for 1 hour
    def get_client_id(self, client_name: str) -> str | None:
        """Get client ID by client name.

        Args:
            client_name: Name of the client

        Returns:
            Client ID

        Raises:
            ValueError: If client not found
        """
        try:
            return self.admin_adapter.get_client_id(client_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_client_id")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=1)  # Cache for 5 minutes
    def get_realm_roles(self) -> list[dict[str, Any]] | None:
        """Get all realm roles.

        Returns:
            List of realm roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return self.admin_adapter.get_realm_roles()
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_realm_roles")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=1)  # Cache for 5 minutes
    def get_realm_role(self, role_name: str) -> dict | None:
        """Get realm role.

        Args:
            role_name: Role name
        Returns:
            A realm role

        Raises:
            ValueError: If getting role fails
        """
        try:
            return self.admin_adapter.get_realm_role(role_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_realm_role")

    @override
    def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Remove a client-specific role from a user.

        Args:
            user_id: User's ID
            client_id: Client ID
            role_name: Role name to remove

        Raises:
            ValueError: If role removal fails
        """
        try:
            client = self.admin_adapter.get_client_id(client_id)
            if client is None:
                raise ValueError("client_id resolved to None")
            # Keycloak admin adapter methods accept these types at runtime
            role = self.admin_adapter.get_client_role(client, role_name)
            self.admin_adapter.delete_client_roles_of_user(user_id, client, [role])

            if hasattr(self.get_client_roles_for_user, "clear_cache"):
                self.get_client_roles_for_user.clear_cache()
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "remove_client_role")

    @override
    def clear_user_sessions(self, user_id: str) -> None:
        """Clear all sessions for a user.

        Args:
            user_id: User's ID

        Raises:
            ValueError: If clearing sessions fails
        """
        try:
            self.admin_adapter.user_logout(user_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "clear_user_sessions")

    @override
    def logout(self, refresh_token: str) -> None:
        """Logout user by invalidating their refresh token.

        Args:
            refresh_token: Refresh token to invalidate

        Raises:
            ValueError: If logout fails
        """
        try:
            self._openid_adapter.logout(refresh_token)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "logout")

    @override
    def introspect_token(self, token: str) -> dict[str, Any] | None:
        """Introspect token to get detailed information about it.

        Args:
            token: Access token

        Returns:
            Token introspection details

        Raises:
            ValueError: If token introspection fails
        """
        try:
            return self._openid_adapter.introspect(token)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "introspect_token")

    @override
    def get_token_info(self, token: str) -> dict[str, Any] | None:
        """Decode token to get its claims.

        Args:
            token: Access token

        Returns:
            Dictionary of token claims

        Raises:
            ValueError: If token decoding fails
        """
        try:
            # Let the underlying adapter handle key selection to align with expected types
            return self._openid_adapter.decode_token(token)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_token_info")

    @override
    def delete_user(self, user_id: str) -> None:
        """Delete a user from Keycloak by their ID.

        Args:
            user_id: The ID of the user to delete

        Raises:
            ValueError: If the deletion fails
        """
        try:
            self.admin_adapter.delete_user(user_id=user_id)

            if hasattr(self.get_user_by_username, "clear_cache"):
                self.get_user_by_username.clear_cache()

            logger.info(f"Successfully deleted user with ID {user_id}")
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "delete_user")

    @override
    def has_role(self, token: str, role_name: str) -> bool:
        """Check if a user has a specific role.

        Args:
            token: Access token
            role_name: Role name to check

        Returns:
            True if user has the role, False otherwise
        """
        # Not caching this result as token validation is time-sensitive
        try:
            user_info = self.get_userinfo(token)
            if not user_info:
                return False

            # Check realm roles
            realm_access = user_info.get("realm_access", {})
            roles = realm_access.get("roles", [])
            if role_name in roles:
                return True

            # Check client roles
            resource_access = user_info.get("resource_access", {})
            client_roles = resource_access.get(self.configs.CLIENT_ID, {}).get("roles", [])
            if role_name in client_roles:
                return True

        except Exception as e:
            logger.debug(f"Role check failed: {e!s}")
            return False
        else:
            return False

    @override
    def has_any_of_roles(self, token: str, role_names: frozenset[str]) -> bool:
        """Check if a user has any of the specified roles.

        Args:
            token: Access token
            role_names: Set of role names to check

        Returns:
            True if user has any of the roles, False otherwise
        """
        try:
            user_info = self.get_userinfo(token)
            if not user_info:
                return False

            # Check realm roles first
            realm_access = user_info.get("realm_access", {})
            realm_roles = set(realm_access.get("roles", []))
            if role_names.intersection(realm_roles):
                return True

            # Check roles for the configured client
            resource_access = user_info.get("resource_access", {})
            client_roles = set(resource_access.get(self.configs.CLIENT_ID, {}).get("roles", []))
            if role_names.intersection(client_roles):
                return True

        except Exception as e:
            logger.debug(f"Role check failed: {e!s}")
            return False
        else:
            return False

    @override
    def has_all_roles(self, token: str, role_names: frozenset[str]) -> bool:
        """Check if a user has all the specified roles.

        Args:
            token: Access token
            role_names: Set of role names to check

        Returns:
            True if user has all the roles, False otherwise
        """
        try:
            user_info = self.get_userinfo(token)
            if not user_info:
                return False

            # Get all user roles
            all_roles = set()

            # Add realm roles
            realm_access = user_info.get("realm_access", {})
            all_roles.update(realm_access.get("roles", []))

            # Add client roles
            resource_access = user_info.get("resource_access", {})
            client_roles = resource_access.get(self.configs.CLIENT_ID, {}).get("roles", [])
            all_roles.update(client_roles)

            # Check if all required roles are present
            return role_names.issubset(all_roles)

        except Exception as e:
            logger.debug(f"All roles check failed: {e!s}")
            return False

    @override
    def check_permissions(self, token: str, resource: str, scope: str) -> bool:
        """Check if a user has permission to access a resource with the specified scope.

        Args:
            token: Access token
            resource: Resource name
            scope: Permission scope

        Returns:
            True if permission granted, False otherwise
        """
        try:
            # Use UMA permissions endpoint to check specific resource and scope
            permissions = self._openid_adapter.uma_permissions(token, permissions=f"{resource}#{scope}")

            # Check if the response indicates permission is granted
            if not permissions or not isinstance(permissions, list):
                logger.debug("No permissions returned or invalid response format")
                return False

            # Look for the specific permission in the response
            for perm in permissions:
                if perm.get("rsname") == resource and scope in perm.get("scopes", []):
                    return True

        except KeycloakError as e:
            logger.debug(f"Permission check failed with Keycloak error: {e!s}")
            return False
        except Exception as e:
            logger.debug(f"Permission check failed with unexpected error: {e!s}")
            return False
        else:
            return False

    @override
    def create_realm(self, realm_name: str, skip_exists: bool = True, **kwargs: Any) -> dict[str, Any] | None:
        """Create a Keycloak realm with minimum required fields and optional additional config.

        Args:
            realm_name: The realm identifier (required)
            skip_exists: Skip creation if realm already exists
            kwargs: Additional optional configurations for the realm

        Returns:
            Realm details
        """
        payload = {
            "realm": realm_name,
            "enabled": kwargs.get("enabled", True),
            "displayName": kwargs.get("display_name", realm_name),
        }

        # Add any additional parameters from kwargs
        for key, value in kwargs.items():
            # Skip display_name as it's already handled
            if key == "display_name":
                continue

            # Convert Python snake_case to Keycloak camelCase
            camel_key = StringUtils.snake_to_camel_case(key)
            payload[camel_key] = value

        try:
            self.admin_adapter.create_realm(payload=payload, skip_exists=skip_exists)
        except KeycloakError as e:
            logger.debug(f"Failed to create realm: {e!s}")

            # Handle realm already exists with skip_exists option
            if skip_exists:
                error_message = self._extract_error_message(e).lower()
                if "already exists" in error_message and "realm" in error_message:
                    return {"realm": realm_name, "status": "already_exists", "config": payload}

            # Use the mixin to handle realm-specific errors
            self._handle_realm_exception(e, "create_realm", realm_name)
        else:
            return {"realm": realm_name, "status": "created", "config": payload}

    @override
    def get_realm(self, realm_name: str) -> dict[str, Any] | None:
        """Get realm details by realm name.

        Args:
            realm_name: Name of the realm

        Returns:
            Realm details
        """
        try:
            return self.admin_adapter.get_realm(realm_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_realm")

    @override
    def update_realm(self, realm_name: str, **kwargs: Any) -> dict[str, Any] | None:
        """Update a realm. Kwargs are RealmRepresentation.

        Args:
            realm_name: Realm name (not the realm id).
            **kwargs: RealmRepresentation attributes to update (e.g. displayName).

        Returns:
            Response from Keycloak, or None on error (handled via exception).
        """
        try:
            return self.admin_adapter.update_realm(realm_name, dict(kwargs))
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "update_realm")

    @override
    def create_client(
        self,
        client_id: str,
        realm: str | None = None,
        skip_exists: bool = True,
        **kwargs: Any,
    ) -> dict[str, Any] | None:
        """Create a Keycloak client with minimum required fields and optional additional config.

        Args:
            client_id: The client identifier (required)
            realm: Target realm name (uses the current realm in KeycloakAdmin if not specified)
            skip_exists: Skip creation if client already exists
            kwargs: Additional optional configurations for the client

        Returns:
            Client details
        """
        original_realm = self.admin_adapter.connection.realm_name

        try:
            # Set the target realm if provided
            if realm and realm != original_realm:
                self.admin_adapter.connection.realm_name = realm

            public_client = kwargs.get("public_client", False)

            # Prepare the minimal client payload
            payload = {
                "clientId": client_id,
                "enabled": kwargs.get("enabled", True),
                "protocol": kwargs.get("protocol", "openid-connect"),
                "name": kwargs.get("name", client_id),
                "publicClient": public_client,
            }

            # Enable service accounts for confidential clients by default
            if not public_client:
                payload["serviceAccountsEnabled"] = kwargs.get("service_account_enabled", True)
                payload["clientAuthenticatorType"] = "client-secret"

            for key, value in kwargs.items():
                if key in ["enabled", "protocol", "name", "public_client", "service_account_enabled"]:
                    continue

                # Convert snake_case to camelCase
                camel_key = StringUtils.snake_to_camel_case(key)
                payload[camel_key] = value

            internal_client_id = None
            try:
                internal_client_id = self.admin_adapter.create_client(payload, skip_exists=skip_exists)
            except KeycloakError as e:
                logger.debug(f"Failed to create client: {e!s}")

                # Handle client already exists with skip_exists option
                if skip_exists:
                    error_message = self._extract_error_message(e).lower()
                    if "already exists" in error_message and "client" in error_message:
                        return {
                            "client_id": client_id,
                            "status": "already_exists",
                            "realm": self.admin_adapter.connection.realm_name,
                        }

                # Use the mixin to handle client-specific errors
                client_data = {"clientId": client_id, "name": kwargs.get("name", client_id)}
                self._handle_client_exception(e, "create_client", client_data)

            return {
                "client_id": client_id,
                "internal_client_id": internal_client_id,
                "realm": self.admin_adapter.connection.realm_name,
                "status": "created",
            }

        finally:
            # Always restore the original realm
            if realm and realm != original_realm:
                self.admin_adapter.connection.realm_name = original_realm

    @override
    def add_realm_roles_to_composite(self, composite_role_name: str, child_role_names: list[str]) -> None:
        """Add realm roles to a composite role.

        Args:
            composite_role_name: Name of the composite realm role
            child_role_names: List of child role names to add
        """
        try:
            child_roles = []
            for role_name in child_role_names:
                try:
                    role = self.admin_adapter.get_realm_role(role_name)
                    child_roles.append(role)
                except KeycloakGetError as e:
                    if e.response_code == 404:
                        logger.warning(f"Child role not found: {role_name}")
                        continue
                    raise

            if child_roles:
                self.admin_adapter.add_composite_realm_roles_to_role(role_name=composite_role_name, roles=child_roles)
                logger.info(f"Added {len(child_roles)} realm roles to composite role: {composite_role_name}")

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "add_realm_roles_to_composite")

    @override
    def add_client_roles_to_composite(
        self,
        composite_role_name: str,
        client_id: str,
        child_role_names: list[str],
    ) -> None:
        """Add client roles to a composite role.

        Args:
            composite_role_name: Name of the composite client role
            client_id: Client ID or client name
            child_role_names: List of child role names to add
        """
        try:
            internal_client_id = self.admin_adapter.get_client_id(client_id)
            if internal_client_id is None:
                raise ValueError("client_id resolved to None")

            child_roles = []
            for role_name in child_role_names:
                try:
                    # Keycloak admin adapter methods accept these types at runtime
                    role = self.admin_adapter.get_client_role(internal_client_id, role_name)
                    child_roles.append(role)
                except KeycloakGetError as e:
                    if e.response_code == 404:
                        logger.warning(f"Client role not found: {role_name}")
                        continue
                    raise

            if child_roles:
                if internal_client_id is None:
                    raise ValueError("Client ID not found")
                resolved_client_id: str = internal_client_id
                self.admin_adapter.add_composite_client_roles_to_role(
                    role_name=composite_role_name,
                    client_role_id=resolved_client_id,
                    roles=child_roles,
                )
                logger.info(f"Added {len(child_roles)} client roles to composite role: {composite_role_name}")

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "add_client_roles_to_composite")

    @override
    def get_composite_realm_roles(self, role_name: str) -> list[dict[str, Any]] | None:
        """Get composite roles for a realm role.

        Args:
            role_name: Name of the role

        Returns:
            List of composite roles
        """
        try:
            return self.admin_adapter.get_composite_realm_roles_of_role(role_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_composite_realm_roles")

    @override
    def get_organizations(self, query: dict | None = None) -> list[dict[str, Any]]:
        """Fetch all organizations, optionally filtered by query parameters."""
        try:
            return self.admin_adapter.get_organizations(query=query)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_organizations")

    @override
    def get_organization(self, organization_id: str) -> dict[str, Any]:
        """Get representation of the organization by ID."""
        try:
            return self.admin_adapter.get_organization(organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_organization")

    @override
    def create_organization(self, name: str, alias: str, **kwargs: Any) -> str | None:
        """Create a new organization. Name and alias must be unique. Returns org_id."""
        try:
            payload = {"name": name, "alias": alias}
            for key, value in kwargs.items():
                if key in ["name", "alias"]:
                    continue

                # Convert snake_case to camelCase
                camel_key = StringUtils.snake_to_camel_case(key)
                payload[camel_key] = value

            return self.admin_adapter.create_organization(payload=payload)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "create_organization")

    @override
    def update_organization(self, organization_id: str, **kwargs: Any) -> dict[str, Any]:
        """Update an existing organization. Kwargs are organization attributes (e.g. name, alias)."""
        try:
            payload = {}
            for key, value in kwargs.items():
                # Convert snake_case to camelCase
                camel_key = StringUtils.snake_to_camel_case(key)
                payload[camel_key] = value

            return self.admin_adapter.update_organization(organization_id=organization_id, payload=payload)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "update_organization")

    @override
    def delete_organization(self, organization_id: str) -> dict[str, Any]:
        """Delete an organization."""
        try:
            return self.admin_adapter.delete_organization(organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "delete_organization")

    @override
    def get_user_organizations(self, user_id: str) -> list[dict[str, Any]]:
        """Get organizations by user id."""
        try:
            return self.admin_adapter.get_user_organizations(user_id=user_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_organizations")

    @override
    def get_organization_members(self, organization_id: str, query: dict | None = None) -> list[dict[str, Any]]:
        """Get members by organization id, optionally filtered by query parameters."""
        try:
            return self.admin_adapter.get_organization_members(organization_id=organization_id, query=query)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_organization_members")

    @override
    def get_organization_members_count(self, organization_id: str) -> int:
        """Get the number of members in the organization."""
        try:
            return self.admin_adapter.get_organization_members_count(organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_organization_members_count")

    @override
    def organization_user_add(self, user_id: str, organization_id: str) -> bytes:
        """Add a user to an organization."""
        try:
            return self.admin_adapter.organization_user_add(user_id=user_id, organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "organization_user_add")

    @override
    def organization_user_remove(self, user_id: str, organization_id: str) -> dict[str, Any]:
        """Remove a user from an organization."""
        try:
            return self.admin_adapter.organization_user_remove(user_id=user_id, organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "organization_user_remove")

archipy.adapters.keycloak.adapters.KeycloakAdapter.configs instance-attribute

configs: KeycloakConfig = (
    KEYCLOAK
    if keycloak_configs is None
    else keycloak_configs
)

archipy.adapters.keycloak.adapters.KeycloakAdapter.admin_adapter property

admin_adapter: KeycloakAdmin

Get the admin adapter, refreshing it if necessary.

Returns:

Type Description
KeycloakAdmin

KeycloakAdmin instance

Raises:

Type Description
UnauthenticatedError

If admin client is not available due to authentication issues

UnavailableError

If Keycloak service is unavailable

archipy.adapters.keycloak.adapters.KeycloakAdapter.clear_all_caches

clear_all_caches() -> None

Clear all cached values.

Source code in archipy/adapters/keycloak/adapters.py
def clear_all_caches(self) -> None:
    """Clear all cached values."""
    for attr_name in dir(self):
        attr = getattr(self, attr_name)
        if hasattr(attr, "clear_cache"):
            attr.clear_cache()

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_public_key

get_public_key() -> PublicKeyType

Get the public key used to verify tokens.

Returns:

Type Description
PublicKeyType

JWK key object used to verify signatures

Raises:

Type Description
ServiceUnavailableError

If Keycloak service is unavailable

InternalError

If there's an internal error processing the public key

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour, public key rarely changes
def get_public_key(self) -> PublicKeyType:
    """Get the public key used to verify tokens.

    Returns:
        JWK key object used to verify signatures

    Raises:
        ServiceUnavailableError: If Keycloak service is unavailable
        InternalError: If there's an internal error processing the public key
    """
    try:
        from jwcrypto import jwk

        keys_info = self._openid_adapter.public_key()
        key = f"-----BEGIN PUBLIC KEY-----\n{keys_info}\n-----END PUBLIC KEY-----"
        return jwk.JWK.from_pem(key.encode("utf-8"))
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_public_key")
    except Exception as e:
        raise InternalError(additional_data={"operation": "get_public_key", "error": str(e)}) from e

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_token

get_token(
    username: str, password: str
) -> KeycloakTokenType | None

Get a user token by username and password using the Resource Owner Password Credentials Grant.

Warning

This method uses the direct password grant flow, which is less secure and not recommended for user login in production environments. Instead, prefer the web-based OAuth 2.0 Authorization Code Flow (use get_token_from_code) for secure authentication. Use this method only for testing, administrative tasks, or specific service accounts where direct credential use is acceptable and properly secured.

Parameters:

Name Type Description Default
username str

User's username

required
password str

User's password

required

Returns:

Type Description
KeycloakTokenType | None

Token response containing access_token, refresh_token, etc.

Raises:

Type Description
InvalidCredentialsError

If username or password is invalid

ServiceUnavailableError

If Keycloak service is unavailable

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_token(self, username: str, password: str) -> KeycloakTokenType | None:
    """Get a user token by username and password using the Resource Owner Password Credentials Grant.

    Warning:
        This method uses the direct password grant flow, which is less secure and not recommended
        for user login in production environments. Instead, prefer the web-based OAuth 2.0
        Authorization Code Flow (use `get_token_from_code`) for secure authentication.
        Use this method only for testing, administrative tasks, or specific service accounts
        where direct credential use is acceptable and properly secured.

    Args:
        username: User's username
        password: User's password

    Returns:
        Token response containing access_token, refresh_token, etc.

    Raises:
        InvalidCredentialsError: If username or password is invalid
        ServiceUnavailableError: If Keycloak service is unavailable
    """
    try:
        return self._openid_adapter.token(grant_type="password", username=username, password=password)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_token")

archipy.adapters.keycloak.adapters.KeycloakAdapter.refresh_token

refresh_token(
    refresh_token: str,
) -> KeycloakTokenType | None

Refresh an existing token using a refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token string

required

Returns:

Type Description
KeycloakTokenType | None

New token response containing access_token, refresh_token, etc.

Raises:

Type Description
InvalidTokenError

If refresh token is invalid or expired

ServiceUnavailableError

If Keycloak service is unavailable

Source code in archipy/adapters/keycloak/adapters.py
@override
def refresh_token(self, refresh_token: str) -> KeycloakTokenType | None:
    """Refresh an existing token using a refresh token.

    Args:
        refresh_token: Refresh token string

    Returns:
        New token response containing access_token, refresh_token, etc.

    Raises:
        InvalidTokenError: If refresh token is invalid or expired
        ServiceUnavailableError: If Keycloak service is unavailable
    """
    try:
        return self._openid_adapter.refresh_token(refresh_token)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "refresh_token")

archipy.adapters.keycloak.adapters.KeycloakAdapter.validate_token

validate_token(token: str) -> bool

Validate if a token is still valid.

Parameters:

Name Type Description Default
token str

Access token to validate

required

Returns:

Type Description
bool

True if token is valid, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def validate_token(self, token: str) -> bool:
    """Validate if a token is still valid.

    Args:
        token: Access token to validate

    Returns:
        True if token is valid, False otherwise
    """
    # Not caching validation results as tokens are time-sensitive
    try:
        # Let the underlying adapter handle key selection to align with expected types
        self._openid_adapter.decode_token(token)
    except Exception as e:
        logger.debug(f"Token validation failed: {e!s}")
        return False
    else:
        return True

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_userinfo

get_userinfo(token: str) -> KeycloakUserType | None

Get user information from a token.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
KeycloakUserType | None

User information

Raises:

Type Description
ValueError

If getting user info fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_userinfo(self, token: str) -> KeycloakUserType | None:
    """Get user information from a token.

    Args:
        token: Access token

    Returns:
        User information

    Raises:
        ValueError: If getting user info fails
    """
    if not self.validate_token(token):
        raise InvalidTokenError()
    try:
        # _get_userinfo_cached returns KeycloakUserType (dict[str, Any])
        # The ttl_cache_decorator loses type info, but runtime behavior is correct
        # Access underlying function for proper typing
        cached_func = self._get_userinfo_cached
        underlying_func = getattr(cached_func, "__wrapped__", None)
        if underlying_func is not None:
            # Call underlying function directly for type checking
            result: KeycloakUserType = underlying_func(self, token)
        else:
            # Fallback to cached version if __wrapped__ not available
            result_raw = cached_func(token)
            if not isinstance(result_raw, dict):
                return None
            # Type assertion: result_raw is a dict, which matches KeycloakUserType
            # Convert to proper type by creating a new dict with explicit typing
            result: KeycloakUserType = {str(k): v for k, v in result_raw.items()}
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_userinfo")
        return None
    else:
        return result

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_by_id

get_user_by_id(user_id: str) -> KeycloakUserType | None

Get user details by user ID.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If getting user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
    """Get user details by user ID.

    Args:
        user_id: User's ID

    Returns:
        User details or None if not found

    Raises:
        ValueError: If getting user fails
    """
    try:
        return self.admin_adapter.get_user(user_id)
    except KeycloakGetError as e:
        if e.response_code == 404:
            return None
        self._handle_keycloak_exception(e, "get_user_by_id")
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_by_id")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_by_username

get_user_by_username(
    username: str,
) -> KeycloakUserType | None

Get user details by username.

Parameters:

Name Type Description Default
username str

User's username

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If query fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_user_by_username(self, username: str) -> KeycloakUserType | None:
    """Get user details by username.

    Args:
        username: User's username

    Returns:
        User details or None if not found

    Raises:
        ValueError: If query fails
    """
    try:
        users = self.admin_adapter.get_users({"username": username})
        return users[0] if users else None
    except KeycloakError as e:
        raise InternalError() from e

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_by_email

get_user_by_email(email: str) -> KeycloakUserType | None

Get user details by email.

Parameters:

Name Type Description Default
email str

User's email

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If query fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_user_by_email(self, email: str) -> KeycloakUserType | None:
    """Get user details by email.

    Args:
        email: User's email

    Returns:
        User details or None if not found

    Raises:
        ValueError: If query fails
    """
    try:
        users = self.admin_adapter.get_users({"email": email})
        return users[0] if users else None
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_by_email")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_roles

get_user_roles(
    user_id: str,
) -> list[KeycloakRoleType] | None

Get roles assigned to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Returns:

Type Description
list[KeycloakRoleType] | None

List of roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_user_roles(self, user_id: str) -> list[KeycloakRoleType] | None:
    """Get roles assigned to a user.

    Args:
        user_id: User's ID

    Returns:
        List of roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return self.admin_adapter.get_realm_roles_of_user(user_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_roles")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_client_roles_for_user

get_client_roles_for_user(
    user_id: str, client_id: str
) -> list[KeycloakRoleType]

Get client-specific roles assigned to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required

Returns:

Type Description
list[KeycloakRoleType]

List of client-specific roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
    """Get client-specific roles assigned to a user.

    Args:
        user_id: User's ID
        client_id: Client ID

    Returns:
        List of client-specific roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return self.admin_adapter.get_client_roles_of_user(user_id, client_id)
    except KeycloakError as e:
        raise InternalError() from e

archipy.adapters.keycloak.adapters.KeycloakAdapter.create_user

create_user(user_data: dict[str, Any]) -> str | None

Create a new user in Keycloak.

Parameters:

Name Type Description Default
user_data dict[str, Any]

User data including username, email, etc.

required

Returns:

Type Description
str | None

ID of the created user

Raises:

Type Description
ValueError

If creating user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def create_user(self, user_data: dict[str, Any]) -> str | None:
    """Create a new user in Keycloak.

    Args:
        user_data: User data including username, email, etc.

    Returns:
        ID of the created user

    Raises:
        ValueError: If creating user fails
    """
    # This is a write operation, no caching needed
    try:
        user_id = self.admin_adapter.create_user(user_data)

        # Clear related caches
        self.clear_all_caches()

    except KeycloakError as e:
        self._handle_user_exception(e, "create_user", user_data)
    else:
        return user_id

archipy.adapters.keycloak.adapters.KeycloakAdapter.update_user

update_user(
    user_id: str, user_data: dict[str, Any]
) -> None

Update user details.

Parameters:

Name Type Description Default
user_id str

User's ID

required
user_data dict[str, Any]

User data to update

required

Raises:

Type Description
ValueError

If updating user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
    """Update user details.

    Args:
        user_id: User's ID
        user_data: User data to update

    Raises:
        ValueError: If updating user fails
    """
    # This is a write operation, no caching needed
    try:
        self.admin_adapter.update_user(user_id, user_data)

        # Clear user-related caches
        self.clear_all_caches()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "update_user")

archipy.adapters.keycloak.adapters.KeycloakAdapter.reset_password

reset_password(
    user_id: str, password: str, temporary: bool = False
) -> None

Reset a user's password.

Parameters:

Name Type Description Default
user_id str

User's ID

required
password str

New password

required
temporary bool

Whether the password is temporary and should be changed on next login

False

Raises:

Type Description
ValueError

If password reset fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
    """Reset a user's password.

    Args:
        user_id: User's ID
        password: New password
        temporary: Whether the password is temporary and should be changed on next login

    Raises:
        ValueError: If password reset fails
    """
    # This is a write operation, no caching needed
    try:
        self.admin_adapter.set_user_password(user_id, password, temporary)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "reset_password")

archipy.adapters.keycloak.adapters.KeycloakAdapter.assign_realm_role

assign_realm_role(user_id: str, role_name: str) -> None

Assign a realm role to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
role_name str

Role name to assign

required

Raises:

Type Description
ValueError

If role assignment fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def assign_realm_role(self, user_id: str, role_name: str) -> None:
    """Assign a realm role to a user.

    Args:
        user_id: User's ID
        role_name: Role name to assign

    Raises:
        ValueError: If role assignment fails
    """
    # This is a write operation, no caching needed
    try:
        # Get role representation
        role = self.admin_adapter.get_realm_role(role_name)
        # Assign role to user
        self.admin_adapter.assign_realm_roles(user_id, [role])

        # Clear role-related caches
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "assign_realm_role")

archipy.adapters.keycloak.adapters.KeycloakAdapter.remove_realm_role

remove_realm_role(user_id: str, role_name: str) -> None

Remove a realm role from a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
role_name str

Role name to remove

required

Raises:

Type Description
ValueError

If role removal fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def remove_realm_role(self, user_id: str, role_name: str) -> None:
    """Remove a realm role from a user.

    Args:
        user_id: User's ID
        role_name: Role name to remove

    Raises:
        ValueError: If role removal fails
    """
    # This is a write operation, no caching needed
    try:
        # Get role representation
        role = self.admin_adapter.get_realm_role(role_name)
        # Remove role from user
        self.admin_adapter.delete_realm_roles_of_user(user_id, [role])

        # Clear role-related caches
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "remove_realm_role")

archipy.adapters.keycloak.adapters.KeycloakAdapter.assign_client_role

assign_client_role(
    user_id: str, client_id: str, role_name: str
) -> None

Assign a client-specific role to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required
role_name str

Role name to assign

required

Raises:

Type Description
ValueError

If role assignment fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Assign a client-specific role to a user.

    Args:
        user_id: User's ID
        client_id: Client ID
        role_name: Role name to assign

    Raises:
        ValueError: If role assignment fails
    """
    # This is a write operation, no caching needed
    try:
        # Get client
        client = self.admin_adapter.get_client_id(client_id)
        if client is None:
            raise ValueError("client_id resolved to None")
        # Get role representation
        # Keycloak admin adapter methods accept these types at runtime
        role = self.admin_adapter.get_client_role(client, role_name)
        # Assign role to user
        self.admin_adapter.assign_client_role(user_id, client, [role])

        # Clear role-related caches
        if hasattr(self.get_client_roles_for_user, "clear_cache"):
            self.get_client_roles_for_user.clear_cache()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "assign_client_role")

archipy.adapters.keycloak.adapters.KeycloakAdapter.create_realm_role

create_realm_role(
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None

Create a new realm role.

Parameters:

Name Type Description Default
role_name str

Role name

required
description str | None

Optional role description

None
skip_exists bool

Skip creation if realm role already exists

True

Returns:

Type Description
dict[str, Any] | None

Created role details

Raises:

Type Description
ValueError

If role creation fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def create_realm_role(
    self,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None:
    """Create a new realm role.

    Args:
        role_name: Role name
        description: Optional role description
        skip_exists: Skip creation if realm role already exists

    Returns:
        Created role details

    Raises:
        ValueError: If role creation fails
    """
    # This is a write operation, no caching needed
    try:
        role_data = {"name": role_name}
        if description:
            role_data["description"] = description

        self.admin_adapter.create_realm_role(role_data, skip_exists=skip_exists)

        # Clear realm roles cache
        if hasattr(self.get_realm_roles, "clear_cache"):
            self.get_realm_roles.clear_cache()

        return self.admin_adapter.get_realm_role(role_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "create_realm_role")

archipy.adapters.keycloak.adapters.KeycloakAdapter.create_client_role

create_client_role(
    client_id: str,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None

Create a new client role.

Parameters:

Name Type Description Default
client_id str

Client ID or client name

required
role_name str

Role name

required
description str | None

Optional role description

None
skip_exists bool

Skip creation if client role already exists

True

Returns:

Type Description
dict[str, Any] | None

Created role details

Raises:

Type Description
ValueError

If role creation fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def create_client_role(
    self,
    client_id: str,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None:
    """Create a new client role.

    Args:
        client_id: Client ID or client name
        role_name: Role name
        description: Optional role description
        skip_exists: Skip creation if client role already exists

    Returns:
        Created role details

    Raises:
        ValueError: If role creation fails
    """
    # This is a write operation, no caching needed
    try:
        resolved_client_id = self.admin_adapter.get_client_id(client_id)
        if resolved_client_id is None:
            raise ValueError(f"Client ID not found: {client_id}")

        # Prepare role data
        role_data = {"name": role_name}
        if description:
            role_data["description"] = description

        # Create client role
        self.admin_adapter.create_client_role(resolved_client_id, role_data, skip_exists=skip_exists)

        # Clear related caches if they exist
        if hasattr(self.get_client_roles_for_user, "clear_cache"):
            self.get_client_roles_for_user.clear_cache()

        # Return created role
        return self.admin_adapter.get_client_role(resolved_client_id, role_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "create_client_role")

archipy.adapters.keycloak.adapters.KeycloakAdapter.delete_realm_role

delete_realm_role(role_name: str) -> None

Delete a realm role.

Parameters:

Name Type Description Default
role_name str

Role name to delete

required

Raises:

Type Description
ValueError

If role deletion fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def delete_realm_role(self, role_name: str) -> None:
    """Delete a realm role.

    Args:
        role_name: Role name to delete

    Raises:
        ValueError: If role deletion fails
    """
    # This is a write operation, no caching needed
    try:
        self.admin_adapter.delete_realm_role(role_name)

        # Clear realm roles cache
        if hasattr(self.get_realm_roles, "clear_cache"):
            self.get_realm_roles.clear_cache()

        # We also need to clear user role caches since they might contain this role
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "delete_realm_role")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_service_account_id

get_service_account_id() -> str | None

Get service account user ID for the current client.

Returns:

Type Description
str | None

Service account user ID

Raises:

Type Description
ValueError

If getting service account fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
def get_service_account_id(self) -> str | None:
    """Get service account user ID for the current client.

    Returns:
        Service account user ID

    Raises:
        ValueError: If getting service account fails
    """
    try:
        client_id = self.get_client_id(self.configs.CLIENT_ID)
        return self.admin_adapter.get_client_service_account_user(str(client_id)).get("id")
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_service_account_id")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_well_known_config

get_well_known_config() -> dict[str, Any] | None

Get the well-known OpenID configuration.

Returns:

Type Description
dict[str, Any] | None

OIDC configuration

Raises:

Type Description
ValueError

If getting configuration fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
def get_well_known_config(self) -> dict[str, Any] | None:
    """Get the well-known OpenID configuration.

    Returns:
        OIDC configuration

    Raises:
        ValueError: If getting configuration fails
    """
    try:
        return self._openid_adapter.well_known()
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_well_known_config")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_certs

get_certs() -> dict[str, Any] | None

Get the JWT verification certificates.

Returns:

Type Description
dict[str, Any] | None

Certificate information

Raises:

Type Description
ValueError

If getting certificates fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
def get_certs(self) -> dict[str, Any] | None:
    """Get the JWT verification certificates.

    Returns:
        Certificate information

    Raises:
        ValueError: If getting certificates fails
    """
    try:
        return self._openid_adapter.certs()
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_certs")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_token_from_code

get_token_from_code(
    code: str, redirect_uri: str
) -> KeycloakTokenType | None

Exchange authorization code for token.

Parameters:

Name Type Description Default
code str

Authorization code

required
redirect_uri str

Redirect URI used in authorization request

required

Returns:

Type Description
KeycloakTokenType | None

Token response

Raises:

Type Description
ValueError

If token exchange fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType | None:
    """Exchange authorization code for token.

    Args:
        code: Authorization code
        redirect_uri: Redirect URI used in authorization request

    Returns:
        Token response

    Raises:
        ValueError: If token exchange fails
    """
    # Authorization codes can only be used once, don't cache
    try:
        return self._openid_adapter.token(grant_type="authorization_code", code=code, redirect_uri=redirect_uri)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_token_from_code")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_client_credentials_token

get_client_credentials_token() -> KeycloakTokenType | None

Get token using client credentials.

Returns:

Type Description
KeycloakTokenType | None

Token response

Raises:

Type Description
ValueError

If token acquisition fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_client_credentials_token(self) -> KeycloakTokenType | None:
    """Get token using client credentials.

    Returns:
        Token response

    Raises:
        ValueError: If token acquisition fails
    """
    # Tokens are time-sensitive, don't cache
    try:
        return self._openid_adapter.token(grant_type="client_credentials")
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_client_credentials_token")

archipy.adapters.keycloak.adapters.KeycloakAdapter.search_users

search_users(
    query: str, max_results: int = 100
) -> list[KeycloakUserType] | None

Search for users by username, email, or name.

Parameters:

Name Type Description Default
query str

Search query

required
max_results int

Maximum number of results to return

100

Returns:

Type Description
list[KeycloakUserType] | None

List of matching users

Raises:

Type Description
ValueError

If search fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=30, maxsize=50)  # Cache for 30 seconds with limited entries
def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType] | None:
    """Search for users by username, email, or name.

    Args:
        query: Search query
        max_results: Maximum number of results to return

    Returns:
        List of matching users

    Raises:
        ValueError: If search fails
    """
    try:
        # Try searching by different fields
        users = []

        # Search by username
        users.extend(self.admin_adapter.get_users({"username": query, "max": max_results}))

        # Search by email if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            email_users = self.admin_adapter.get_users({"email": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in email_users if user["id"] not in user_ids])

        # Search by firstName if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            first_name_users = self.admin_adapter.get_users({"firstName": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in first_name_users if user["id"] not in user_ids])

        # Search by lastName if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            last_name_users = self.admin_adapter.get_users({"lastName": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in last_name_users if user["id"] not in user_ids])

        return users[:max_results]
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "search_users")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_client_secret

get_client_secret(client_id: str) -> str | None

Get client secret.

Parameters:

Name Type Description Default
client_id str

Client ID

required

Returns:

Type Description
str | None

Client secret

Raises:

Type Description
ValueError

If getting secret fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=50)  # Cache for 1 hour
def get_client_secret(self, client_id: str) -> str | None:
    """Get client secret.

    Args:
        client_id: Client ID

    Returns:
        Client secret

    Raises:
        ValueError: If getting secret fails
    """
    try:
        client = self.admin_adapter.get_client(client_id)
        return client.get("secret", "")
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_client_secret")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_client_id

get_client_id(client_name: str) -> str | None

Get client ID by client name.

Parameters:

Name Type Description Default
client_name str

Name of the client

required

Returns:

Type Description
str | None

Client ID

Raises:

Type Description
ValueError

If client not found

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=50)  # Cache for 1 hour
def get_client_id(self, client_name: str) -> str | None:
    """Get client ID by client name.

    Args:
        client_name: Name of the client

    Returns:
        Client ID

    Raises:
        ValueError: If client not found
    """
    try:
        return self.admin_adapter.get_client_id(client_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_client_id")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_realm_roles

get_realm_roles() -> list[dict[str, Any]] | None

Get all realm roles.

Returns:

Type Description
list[dict[str, Any]] | None

List of realm roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=1)  # Cache for 5 minutes
def get_realm_roles(self) -> list[dict[str, Any]] | None:
    """Get all realm roles.

    Returns:
        List of realm roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return self.admin_adapter.get_realm_roles()
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_realm_roles")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_realm_role

get_realm_role(role_name: str) -> dict | None

Get realm role.

Parameters:

Name Type Description Default
role_name str

Role name

required

Returns: A realm role

Raises:

Type Description
ValueError

If getting role fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=1)  # Cache for 5 minutes
def get_realm_role(self, role_name: str) -> dict | None:
    """Get realm role.

    Args:
        role_name: Role name
    Returns:
        A realm role

    Raises:
        ValueError: If getting role fails
    """
    try:
        return self.admin_adapter.get_realm_role(role_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_realm_role")

archipy.adapters.keycloak.adapters.KeycloakAdapter.remove_client_role

remove_client_role(
    user_id: str, client_id: str, role_name: str
) -> None

Remove a client-specific role from a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required
role_name str

Role name to remove

required

Raises:

Type Description
ValueError

If role removal fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Remove a client-specific role from a user.

    Args:
        user_id: User's ID
        client_id: Client ID
        role_name: Role name to remove

    Raises:
        ValueError: If role removal fails
    """
    try:
        client = self.admin_adapter.get_client_id(client_id)
        if client is None:
            raise ValueError("client_id resolved to None")
        # Keycloak admin adapter methods accept these types at runtime
        role = self.admin_adapter.get_client_role(client, role_name)
        self.admin_adapter.delete_client_roles_of_user(user_id, client, [role])

        if hasattr(self.get_client_roles_for_user, "clear_cache"):
            self.get_client_roles_for_user.clear_cache()
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "remove_client_role")

archipy.adapters.keycloak.adapters.KeycloakAdapter.clear_user_sessions

clear_user_sessions(user_id: str) -> None

Clear all sessions for a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Raises:

Type Description
ValueError

If clearing sessions fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def clear_user_sessions(self, user_id: str) -> None:
    """Clear all sessions for a user.

    Args:
        user_id: User's ID

    Raises:
        ValueError: If clearing sessions fails
    """
    try:
        self.admin_adapter.user_logout(user_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "clear_user_sessions")

archipy.adapters.keycloak.adapters.KeycloakAdapter.logout

logout(refresh_token: str) -> None

Logout user by invalidating their refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token to invalidate

required

Raises:

Type Description
ValueError

If logout fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def logout(self, refresh_token: str) -> None:
    """Logout user by invalidating their refresh token.

    Args:
        refresh_token: Refresh token to invalidate

    Raises:
        ValueError: If logout fails
    """
    try:
        self._openid_adapter.logout(refresh_token)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "logout")

archipy.adapters.keycloak.adapters.KeycloakAdapter.introspect_token

introspect_token(token: str) -> dict[str, Any] | None

Introspect token to get detailed information about it.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
dict[str, Any] | None

Token introspection details

Raises:

Type Description
ValueError

If token introspection fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def introspect_token(self, token: str) -> dict[str, Any] | None:
    """Introspect token to get detailed information about it.

    Args:
        token: Access token

    Returns:
        Token introspection details

    Raises:
        ValueError: If token introspection fails
    """
    try:
        return self._openid_adapter.introspect(token)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "introspect_token")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_token_info

get_token_info(token: str) -> dict[str, Any] | None

Decode token to get its claims.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
dict[str, Any] | None

Dictionary of token claims

Raises:

Type Description
ValueError

If token decoding fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_token_info(self, token: str) -> dict[str, Any] | None:
    """Decode token to get its claims.

    Args:
        token: Access token

    Returns:
        Dictionary of token claims

    Raises:
        ValueError: If token decoding fails
    """
    try:
        # Let the underlying adapter handle key selection to align with expected types
        return self._openid_adapter.decode_token(token)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_token_info")

archipy.adapters.keycloak.adapters.KeycloakAdapter.delete_user

delete_user(user_id: str) -> None

Delete a user from Keycloak by their ID.

Parameters:

Name Type Description Default
user_id str

The ID of the user to delete

required

Raises:

Type Description
ValueError

If the deletion fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def delete_user(self, user_id: str) -> None:
    """Delete a user from Keycloak by their ID.

    Args:
        user_id: The ID of the user to delete

    Raises:
        ValueError: If the deletion fails
    """
    try:
        self.admin_adapter.delete_user(user_id=user_id)

        if hasattr(self.get_user_by_username, "clear_cache"):
            self.get_user_by_username.clear_cache()

        logger.info(f"Successfully deleted user with ID {user_id}")
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "delete_user")

archipy.adapters.keycloak.adapters.KeycloakAdapter.has_role

has_role(token: str, role_name: str) -> bool

Check if a user has a specific role.

Parameters:

Name Type Description Default
token str

Access token

required
role_name str

Role name to check

required

Returns:

Type Description
bool

True if user has the role, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def has_role(self, token: str, role_name: str) -> bool:
    """Check if a user has a specific role.

    Args:
        token: Access token
        role_name: Role name to check

    Returns:
        True if user has the role, False otherwise
    """
    # Not caching this result as token validation is time-sensitive
    try:
        user_info = self.get_userinfo(token)
        if not user_info:
            return False

        # Check realm roles
        realm_access = user_info.get("realm_access", {})
        roles = realm_access.get("roles", [])
        if role_name in roles:
            return True

        # Check client roles
        resource_access = user_info.get("resource_access", {})
        client_roles = resource_access.get(self.configs.CLIENT_ID, {}).get("roles", [])
        if role_name in client_roles:
            return True

    except Exception as e:
        logger.debug(f"Role check failed: {e!s}")
        return False
    else:
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.has_any_of_roles

has_any_of_roles(
    token: str, role_names: frozenset[str]
) -> bool

Check if a user has any of the specified roles.

Parameters:

Name Type Description Default
token str

Access token

required
role_names frozenset[str]

Set of role names to check

required

Returns:

Type Description
bool

True if user has any of the roles, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def has_any_of_roles(self, token: str, role_names: frozenset[str]) -> bool:
    """Check if a user has any of the specified roles.

    Args:
        token: Access token
        role_names: Set of role names to check

    Returns:
        True if user has any of the roles, False otherwise
    """
    try:
        user_info = self.get_userinfo(token)
        if not user_info:
            return False

        # Check realm roles first
        realm_access = user_info.get("realm_access", {})
        realm_roles = set(realm_access.get("roles", []))
        if role_names.intersection(realm_roles):
            return True

        # Check roles for the configured client
        resource_access = user_info.get("resource_access", {})
        client_roles = set(resource_access.get(self.configs.CLIENT_ID, {}).get("roles", []))
        if role_names.intersection(client_roles):
            return True

    except Exception as e:
        logger.debug(f"Role check failed: {e!s}")
        return False
    else:
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.has_all_roles

has_all_roles(
    token: str, role_names: frozenset[str]
) -> bool

Check if a user has all the specified roles.

Parameters:

Name Type Description Default
token str

Access token

required
role_names frozenset[str]

Set of role names to check

required

Returns:

Type Description
bool

True if user has all the roles, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def has_all_roles(self, token: str, role_names: frozenset[str]) -> bool:
    """Check if a user has all the specified roles.

    Args:
        token: Access token
        role_names: Set of role names to check

    Returns:
        True if user has all the roles, False otherwise
    """
    try:
        user_info = self.get_userinfo(token)
        if not user_info:
            return False

        # Get all user roles
        all_roles = set()

        # Add realm roles
        realm_access = user_info.get("realm_access", {})
        all_roles.update(realm_access.get("roles", []))

        # Add client roles
        resource_access = user_info.get("resource_access", {})
        client_roles = resource_access.get(self.configs.CLIENT_ID, {}).get("roles", [])
        all_roles.update(client_roles)

        # Check if all required roles are present
        return role_names.issubset(all_roles)

    except Exception as e:
        logger.debug(f"All roles check failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.check_permissions

check_permissions(
    token: str, resource: str, scope: str
) -> bool

Check if a user has permission to access a resource with the specified scope.

Parameters:

Name Type Description Default
token str

Access token

required
resource str

Resource name

required
scope str

Permission scope

required

Returns:

Type Description
bool

True if permission granted, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def check_permissions(self, token: str, resource: str, scope: str) -> bool:
    """Check if a user has permission to access a resource with the specified scope.

    Args:
        token: Access token
        resource: Resource name
        scope: Permission scope

    Returns:
        True if permission granted, False otherwise
    """
    try:
        # Use UMA permissions endpoint to check specific resource and scope
        permissions = self._openid_adapter.uma_permissions(token, permissions=f"{resource}#{scope}")

        # Check if the response indicates permission is granted
        if not permissions or not isinstance(permissions, list):
            logger.debug("No permissions returned or invalid response format")
            return False

        # Look for the specific permission in the response
        for perm in permissions:
            if perm.get("rsname") == resource and scope in perm.get("scopes", []):
                return True

    except KeycloakError as e:
        logger.debug(f"Permission check failed with Keycloak error: {e!s}")
        return False
    except Exception as e:
        logger.debug(f"Permission check failed with unexpected error: {e!s}")
        return False
    else:
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.create_realm

create_realm(
    realm_name: str, skip_exists: bool = True, **kwargs: Any
) -> dict[str, Any] | None

Create a Keycloak realm with minimum required fields and optional additional config.

Parameters:

Name Type Description Default
realm_name str

The realm identifier (required)

required
skip_exists bool

Skip creation if realm already exists

True
kwargs Any

Additional optional configurations for the realm

{}

Returns:

Type Description
dict[str, Any] | None

Realm details

Source code in archipy/adapters/keycloak/adapters.py
@override
def create_realm(self, realm_name: str, skip_exists: bool = True, **kwargs: Any) -> dict[str, Any] | None:
    """Create a Keycloak realm with minimum required fields and optional additional config.

    Args:
        realm_name: The realm identifier (required)
        skip_exists: Skip creation if realm already exists
        kwargs: Additional optional configurations for the realm

    Returns:
        Realm details
    """
    payload = {
        "realm": realm_name,
        "enabled": kwargs.get("enabled", True),
        "displayName": kwargs.get("display_name", realm_name),
    }

    # Add any additional parameters from kwargs
    for key, value in kwargs.items():
        # Skip display_name as it's already handled
        if key == "display_name":
            continue

        # Convert Python snake_case to Keycloak camelCase
        camel_key = StringUtils.snake_to_camel_case(key)
        payload[camel_key] = value

    try:
        self.admin_adapter.create_realm(payload=payload, skip_exists=skip_exists)
    except KeycloakError as e:
        logger.debug(f"Failed to create realm: {e!s}")

        # Handle realm already exists with skip_exists option
        if skip_exists:
            error_message = self._extract_error_message(e).lower()
            if "already exists" in error_message and "realm" in error_message:
                return {"realm": realm_name, "status": "already_exists", "config": payload}

        # Use the mixin to handle realm-specific errors
        self._handle_realm_exception(e, "create_realm", realm_name)
    else:
        return {"realm": realm_name, "status": "created", "config": payload}

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_realm

get_realm(realm_name: str) -> dict[str, Any] | None

Get realm details by realm name.

Parameters:

Name Type Description Default
realm_name str

Name of the realm

required

Returns:

Type Description
dict[str, Any] | None

Realm details

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_realm(self, realm_name: str) -> dict[str, Any] | None:
    """Get realm details by realm name.

    Args:
        realm_name: Name of the realm

    Returns:
        Realm details
    """
    try:
        return self.admin_adapter.get_realm(realm_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_realm")

archipy.adapters.keycloak.adapters.KeycloakAdapter.update_realm

update_realm(
    realm_name: str, **kwargs: Any
) -> dict[str, Any] | None

Update a realm. Kwargs are RealmRepresentation.

Parameters:

Name Type Description Default
realm_name str

Realm name (not the realm id).

required
**kwargs Any

RealmRepresentation attributes to update (e.g. displayName).

{}

Returns:

Type Description
dict[str, Any] | None

Response from Keycloak, or None on error (handled via exception).

Source code in archipy/adapters/keycloak/adapters.py
@override
def update_realm(self, realm_name: str, **kwargs: Any) -> dict[str, Any] | None:
    """Update a realm. Kwargs are RealmRepresentation.

    Args:
        realm_name: Realm name (not the realm id).
        **kwargs: RealmRepresentation attributes to update (e.g. displayName).

    Returns:
        Response from Keycloak, or None on error (handled via exception).
    """
    try:
        return self.admin_adapter.update_realm(realm_name, dict(kwargs))
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "update_realm")

archipy.adapters.keycloak.adapters.KeycloakAdapter.create_client

create_client(
    client_id: str,
    realm: str | None = None,
    skip_exists: bool = True,
    **kwargs: Any,
) -> dict[str, Any] | None

Create a Keycloak client with minimum required fields and optional additional config.

Parameters:

Name Type Description Default
client_id str

The client identifier (required)

required
realm str | None

Target realm name (uses the current realm in KeycloakAdmin if not specified)

None
skip_exists bool

Skip creation if client already exists

True
kwargs Any

Additional optional configurations for the client

{}

Returns:

Type Description
dict[str, Any] | None

Client details

Source code in archipy/adapters/keycloak/adapters.py
@override
def create_client(
    self,
    client_id: str,
    realm: str | None = None,
    skip_exists: bool = True,
    **kwargs: Any,
) -> dict[str, Any] | None:
    """Create a Keycloak client with minimum required fields and optional additional config.

    Args:
        client_id: The client identifier (required)
        realm: Target realm name (uses the current realm in KeycloakAdmin if not specified)
        skip_exists: Skip creation if client already exists
        kwargs: Additional optional configurations for the client

    Returns:
        Client details
    """
    original_realm = self.admin_adapter.connection.realm_name

    try:
        # Set the target realm if provided
        if realm and realm != original_realm:
            self.admin_adapter.connection.realm_name = realm

        public_client = kwargs.get("public_client", False)

        # Prepare the minimal client payload
        payload = {
            "clientId": client_id,
            "enabled": kwargs.get("enabled", True),
            "protocol": kwargs.get("protocol", "openid-connect"),
            "name": kwargs.get("name", client_id),
            "publicClient": public_client,
        }

        # Enable service accounts for confidential clients by default
        if not public_client:
            payload["serviceAccountsEnabled"] = kwargs.get("service_account_enabled", True)
            payload["clientAuthenticatorType"] = "client-secret"

        for key, value in kwargs.items():
            if key in ["enabled", "protocol", "name", "public_client", "service_account_enabled"]:
                continue

            # Convert snake_case to camelCase
            camel_key = StringUtils.snake_to_camel_case(key)
            payload[camel_key] = value

        internal_client_id = None
        try:
            internal_client_id = self.admin_adapter.create_client(payload, skip_exists=skip_exists)
        except KeycloakError as e:
            logger.debug(f"Failed to create client: {e!s}")

            # Handle client already exists with skip_exists option
            if skip_exists:
                error_message = self._extract_error_message(e).lower()
                if "already exists" in error_message and "client" in error_message:
                    return {
                        "client_id": client_id,
                        "status": "already_exists",
                        "realm": self.admin_adapter.connection.realm_name,
                    }

            # Use the mixin to handle client-specific errors
            client_data = {"clientId": client_id, "name": kwargs.get("name", client_id)}
            self._handle_client_exception(e, "create_client", client_data)

        return {
            "client_id": client_id,
            "internal_client_id": internal_client_id,
            "realm": self.admin_adapter.connection.realm_name,
            "status": "created",
        }

    finally:
        # Always restore the original realm
        if realm and realm != original_realm:
            self.admin_adapter.connection.realm_name = original_realm

archipy.adapters.keycloak.adapters.KeycloakAdapter.add_realm_roles_to_composite

add_realm_roles_to_composite(
    composite_role_name: str, child_role_names: list[str]
) -> None

Add realm roles to a composite role.

Parameters:

Name Type Description Default
composite_role_name str

Name of the composite realm role

required
child_role_names list[str]

List of child role names to add

required
Source code in archipy/adapters/keycloak/adapters.py
@override
def add_realm_roles_to_composite(self, composite_role_name: str, child_role_names: list[str]) -> None:
    """Add realm roles to a composite role.

    Args:
        composite_role_name: Name of the composite realm role
        child_role_names: List of child role names to add
    """
    try:
        child_roles = []
        for role_name in child_role_names:
            try:
                role = self.admin_adapter.get_realm_role(role_name)
                child_roles.append(role)
            except KeycloakGetError as e:
                if e.response_code == 404:
                    logger.warning(f"Child role not found: {role_name}")
                    continue
                raise

        if child_roles:
            self.admin_adapter.add_composite_realm_roles_to_role(role_name=composite_role_name, roles=child_roles)
            logger.info(f"Added {len(child_roles)} realm roles to composite role: {composite_role_name}")

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "add_realm_roles_to_composite")

archipy.adapters.keycloak.adapters.KeycloakAdapter.add_client_roles_to_composite

add_client_roles_to_composite(
    composite_role_name: str,
    client_id: str,
    child_role_names: list[str],
) -> None

Add client roles to a composite role.

Parameters:

Name Type Description Default
composite_role_name str

Name of the composite client role

required
client_id str

Client ID or client name

required
child_role_names list[str]

List of child role names to add

required
Source code in archipy/adapters/keycloak/adapters.py
@override
def add_client_roles_to_composite(
    self,
    composite_role_name: str,
    client_id: str,
    child_role_names: list[str],
) -> None:
    """Add client roles to a composite role.

    Args:
        composite_role_name: Name of the composite client role
        client_id: Client ID or client name
        child_role_names: List of child role names to add
    """
    try:
        internal_client_id = self.admin_adapter.get_client_id(client_id)
        if internal_client_id is None:
            raise ValueError("client_id resolved to None")

        child_roles = []
        for role_name in child_role_names:
            try:
                # Keycloak admin adapter methods accept these types at runtime
                role = self.admin_adapter.get_client_role(internal_client_id, role_name)
                child_roles.append(role)
            except KeycloakGetError as e:
                if e.response_code == 404:
                    logger.warning(f"Client role not found: {role_name}")
                    continue
                raise

        if child_roles:
            if internal_client_id is None:
                raise ValueError("Client ID not found")
            resolved_client_id: str = internal_client_id
            self.admin_adapter.add_composite_client_roles_to_role(
                role_name=composite_role_name,
                client_role_id=resolved_client_id,
                roles=child_roles,
            )
            logger.info(f"Added {len(child_roles)} client roles to composite role: {composite_role_name}")

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "add_client_roles_to_composite")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_composite_realm_roles

get_composite_realm_roles(
    role_name: str,
) -> list[dict[str, Any]] | None

Get composite roles for a realm role.

Parameters:

Name Type Description Default
role_name str

Name of the role

required

Returns:

Type Description
list[dict[str, Any]] | None

List of composite roles

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_composite_realm_roles(self, role_name: str) -> list[dict[str, Any]] | None:
    """Get composite roles for a realm role.

    Args:
        role_name: Name of the role

    Returns:
        List of composite roles
    """
    try:
        return self.admin_adapter.get_composite_realm_roles_of_role(role_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_composite_realm_roles")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_organizations

get_organizations(
    query: dict | None = None,
) -> list[dict[str, Any]]

Fetch all organizations, optionally filtered by query parameters.

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_organizations(self, query: dict | None = None) -> list[dict[str, Any]]:
    """Fetch all organizations, optionally filtered by query parameters."""
    try:
        return self.admin_adapter.get_organizations(query=query)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_organizations")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_organization

get_organization(organization_id: str) -> dict[str, Any]

Get representation of the organization by ID.

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_organization(self, organization_id: str) -> dict[str, Any]:
    """Get representation of the organization by ID."""
    try:
        return self.admin_adapter.get_organization(organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_organization")

archipy.adapters.keycloak.adapters.KeycloakAdapter.create_organization

create_organization(
    name: str, alias: str, **kwargs: Any
) -> str | None

Create a new organization. Name and alias must be unique. Returns org_id.

Source code in archipy/adapters/keycloak/adapters.py
@override
def create_organization(self, name: str, alias: str, **kwargs: Any) -> str | None:
    """Create a new organization. Name and alias must be unique. Returns org_id."""
    try:
        payload = {"name": name, "alias": alias}
        for key, value in kwargs.items():
            if key in ["name", "alias"]:
                continue

            # Convert snake_case to camelCase
            camel_key = StringUtils.snake_to_camel_case(key)
            payload[camel_key] = value

        return self.admin_adapter.create_organization(payload=payload)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "create_organization")

archipy.adapters.keycloak.adapters.KeycloakAdapter.update_organization

update_organization(
    organization_id: str, **kwargs: Any
) -> dict[str, Any]

Update an existing organization. Kwargs are organization attributes (e.g. name, alias).

Source code in archipy/adapters/keycloak/adapters.py
@override
def update_organization(self, organization_id: str, **kwargs: Any) -> dict[str, Any]:
    """Update an existing organization. Kwargs are organization attributes (e.g. name, alias)."""
    try:
        payload = {}
        for key, value in kwargs.items():
            # Convert snake_case to camelCase
            camel_key = StringUtils.snake_to_camel_case(key)
            payload[camel_key] = value

        return self.admin_adapter.update_organization(organization_id=organization_id, payload=payload)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "update_organization")

archipy.adapters.keycloak.adapters.KeycloakAdapter.delete_organization

delete_organization(organization_id: str) -> dict[str, Any]

Delete an organization.

Source code in archipy/adapters/keycloak/adapters.py
@override
def delete_organization(self, organization_id: str) -> dict[str, Any]:
    """Delete an organization."""
    try:
        return self.admin_adapter.delete_organization(organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "delete_organization")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_organizations

get_user_organizations(
    user_id: str,
) -> list[dict[str, Any]]

Get organizations by user id.

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_user_organizations(self, user_id: str) -> list[dict[str, Any]]:
    """Get organizations by user id."""
    try:
        return self.admin_adapter.get_user_organizations(user_id=user_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_organizations")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_organization_members

get_organization_members(
    organization_id: str, query: dict | None = None
) -> list[dict[str, Any]]

Get members by organization id, optionally filtered by query parameters.

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_organization_members(self, organization_id: str, query: dict | None = None) -> list[dict[str, Any]]:
    """Get members by organization id, optionally filtered by query parameters."""
    try:
        return self.admin_adapter.get_organization_members(organization_id=organization_id, query=query)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_organization_members")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_organization_members_count

get_organization_members_count(organization_id: str) -> int

Get the number of members in the organization.

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_organization_members_count(self, organization_id: str) -> int:
    """Get the number of members in the organization."""
    try:
        return self.admin_adapter.get_organization_members_count(organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_organization_members_count")

archipy.adapters.keycloak.adapters.KeycloakAdapter.organization_user_add

organization_user_add(
    user_id: str, organization_id: str
) -> bytes

Add a user to an organization.

Source code in archipy/adapters/keycloak/adapters.py
@override
def organization_user_add(self, user_id: str, organization_id: str) -> bytes:
    """Add a user to an organization."""
    try:
        return self.admin_adapter.organization_user_add(user_id=user_id, organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "organization_user_add")

archipy.adapters.keycloak.adapters.KeycloakAdapter.organization_user_remove

organization_user_remove(
    user_id: str, organization_id: str
) -> dict[str, Any]

Remove a user from an organization.

Source code in archipy/adapters/keycloak/adapters.py
@override
def organization_user_remove(self, user_id: str, organization_id: str) -> dict[str, Any]:
    """Remove a user from an organization."""
    try:
        return self.admin_adapter.organization_user_remove(user_id=user_id, organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "organization_user_remove")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_organization_idps abstractmethod

get_organization_idps(
    organization_id: str,
) -> list[dict[str, Any]]

Get IDPs by organization id.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_organization_idps(self, organization_id: str) -> list[dict[str, Any]]:
    """Get IDPs by organization id."""
    raise NotImplementedError

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter

Bases: AsyncKeycloakPort, KeycloakExceptionHandlerMixin

Concrete implementation of the KeycloakPort interface using python-keycloak library.

This implementation includes TTL caching for appropriate operations to improve performance while ensuring cache entries expire after a configured time to prevent stale data.

Source code in archipy/adapters/keycloak/adapters.py
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
class AsyncKeycloakAdapter(AsyncKeycloakPort, KeycloakExceptionHandlerMixin):
    """Concrete implementation of the KeycloakPort interface using python-keycloak library.

    This implementation includes TTL caching for appropriate operations to improve performance
    while ensuring cache entries expire after a configured time to prevent stale data.
    """

    def __init__(self, keycloak_configs: KeycloakConfig | None = None) -> None:
        """Initialize KeycloakAdapter with configuration.

        Args:
            keycloak_configs: Optional Keycloak configuration. If None, global config is used.
        """
        self.configs: KeycloakConfig = (
            BaseConfig.global_config().KEYCLOAK if keycloak_configs is None else keycloak_configs
        )

        # Initialize the OpenID client for authentication
        self.openid_adapter = self._get_openid_client(self.configs)

        # Cache for admin client to avoid unnecessary re-authentication
        self._admin_adapter: KeycloakAdmin | None = None
        self._admin_token_expiry: float = 0.0

        # Initialize admin client if admin mode is enabled and credentials are provided
        if self.configs.IS_ADMIN_MODE_ENABLED and (
            self.configs.CLIENT_SECRET_KEY or (self.configs.ADMIN_USERNAME and self.configs.ADMIN_PASSWORD)
        ):
            self._initialize_admin_client()

    def clear_all_caches(self) -> None:
        """Clear all cached values."""
        for attr_name in dir(self):
            attr = getattr(self, attr_name)
            if hasattr(attr, "cache_clear"):
                attr.cache_clear()

    @staticmethod
    def _get_openid_client(configs: KeycloakConfig) -> KeycloakOpenID:
        """Create and configure a KeycloakOpenID instance.

        Args:
            configs: Keycloak configuration

        Returns:
            Configured KeycloakOpenID client
        """
        server_url = configs.SERVER_URL
        client_id = configs.CLIENT_ID
        if not server_url or not client_id:
            raise ValueError("SERVER_URL and CLIENT_ID must be provided")
        return KeycloakOpenID(
            server_url=server_url,
            client_id=client_id,
            realm_name=configs.REALM_NAME,
            client_secret_key=configs.CLIENT_SECRET_KEY,
            verify=configs.VERIFY_SSL,
            timeout=configs.TIMEOUT,
        )

    def _initialize_admin_client(self) -> None:
        """Initialize or refresh the admin client."""
        try:
            # Check if admin credentials are available
            if self.configs.ADMIN_USERNAME and self.configs.ADMIN_PASSWORD:
                # Create admin client using admin credentials
                self._admin_adapter = KeycloakAdmin(
                    server_url=self.configs.SERVER_URL,
                    username=self.configs.ADMIN_USERNAME,
                    password=self.configs.ADMIN_PASSWORD,
                    realm_name=self.configs.REALM_NAME,
                    user_realm_name=self.configs.ADMIN_REALM_NAME,
                    verify=self.configs.VERIFY_SSL,
                    timeout=self.configs.TIMEOUT,
                )
                # Since we're using direct credentials, set a long expiry time
                self._admin_token_expiry = time.time() + 3600  # 1 hour
                logger.debug("Admin client initialized with admin credentials")
            elif self.configs.CLIENT_SECRET_KEY:
                # Get token using client credentials
                token = self.openid_adapter.token(grant_type="client_credentials")

                # Set token expiry time (current time + expires_in - buffer)
                # Using a 30-second buffer to ensure we refresh before expiration
                self._admin_token_expiry = time.time() + token.get("expires_in", 60) - 30

                # Create admin client with the token
                self._admin_adapter = KeycloakAdmin(
                    server_url=self.configs.SERVER_URL,
                    realm_name=self.configs.REALM_NAME,
                    token=token,
                    verify=self.configs.VERIFY_SSL,
                    timeout=self.configs.TIMEOUT,
                )
                logger.debug("Admin client initialized with client credentials")
            else:
                raise UnauthenticatedError(
                    additional_data={"detail": "Neither admin credentials nor client secret provided"},
                )

        except KeycloakAuthenticationError as e:
            self._admin_adapter = None
            self._admin_token_expiry = 0
            raise UnauthenticatedError(
                additional_data={"detail": "Failed to authenticate with Keycloak service account"},
            ) from e
        except KeycloakConnectionError as e:
            self._admin_adapter = None
            self._admin_token_expiry = 0
            raise ConnectionTimeoutError("Failed to connect to Keycloak server") from e
        except KeycloakError as e:
            self._admin_adapter = None
            self._admin_token_expiry = 0
            self._handle_keycloak_exception(e, "_initialize_admin_client")

    @property
    def admin_adapter(self) -> KeycloakAdmin:
        """Get the admin adapter, refreshing it if necessary.

        Returns:
            KeycloakAdmin instance

        Raises:
            UnauthenticatedError: If admin client is not available due to authentication issues
            UnavailableError: If Keycloak service is unavailable
        """
        if not self.configs.IS_ADMIN_MODE_ENABLED or not (
            self.configs.CLIENT_SECRET_KEY or (self.configs.ADMIN_USERNAME and self.configs.ADMIN_PASSWORD)
        ):
            raise UnauthenticatedError(
                additional_data={
                    "detail": "Admin mode is disabled or neither admin credentials nor client secret provided",
                },
            )

        # Check if token is about to expire and refresh if needed
        if self._admin_adapter is None or time.time() >= self._admin_token_expiry:
            self._initialize_admin_client()

        if self._admin_adapter is None:
            raise UnavailableError("Keycloak admin client is not available")

        return self._admin_adapter

    @override
    @alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour, public key rarely changes
    async def get_public_key(self) -> PublicKeyType:
        """Get the public key used to verify tokens.

        Returns:
            JWK key object used to verify signatures

        Raises:
            ServiceUnavailableError: If Keycloak service is unavailable
            InternalError: If there's an internal error processing the public key
        """
        try:
            from jwcrypto import jwk

            keys_info = await self.openid_adapter.a_public_key()
            key = f"-----BEGIN PUBLIC KEY-----\n{keys_info}\n-----END PUBLIC KEY-----"
            return jwk.JWK.from_pem(key.encode("utf-8"))
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_public_key")
        except Exception as e:
            raise InternalError(additional_data={"operation": "get_public_key", "error": str(e)}) from e

    @override
    async def get_token(self, username: str, password: str) -> KeycloakTokenType | None:
        """Get a user token by username and password using the Resource Owner Password Credentials Grant.

        Warning:
            This method uses the direct password grant flow, which is less secure and not recommended
            for user login in production environments. Instead, prefer the web-based OAuth 2.0
            Authorization Code Flow (use `get_token_from_code`) for secure authentication.
            Use this method only for testing, administrative tasks, or specific service accounts
            where direct credential use is acceptable and properly secured.

        Args:
            username: User's username
            password: User's password

        Returns:
            Token response containing access_token, refresh_token, etc.

        Raises:
            InvalidCredentialsError: If username or password is invalid
            ServiceUnavailableError: If Keycloak service is unavailable
        """
        try:
            return await self.openid_adapter.a_token(grant_type="password", username=username, password=password)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_token")

    @override
    async def refresh_token(self, refresh_token: str) -> KeycloakTokenType | None:
        """Refresh an existing token using a refresh token.

        Args:
            refresh_token: Refresh token string

        Returns:
            New token response containing access_token, refresh_token, etc.

        Raises:
            InvalidTokenError: If refresh token is invalid or expired
            ServiceUnavailableError: If Keycloak service is unavailable
        """
        try:
            return await self.openid_adapter.a_refresh_token(refresh_token)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "refresh_token")

    @override
    async def validate_token(self, token: str) -> bool:
        """Validate if a token is still valid.

        Args:
            token: Access token to validate

        Returns:
            True if token is valid, False otherwise
        """
        # Not caching validation results as tokens are time-sensitive
        try:
            await self.openid_adapter.a_decode_token(
                token,
                key=await self.get_public_key(),
            )
        except Exception as e:
            logger.debug(f"Token validation failed: {e!s}")
            return False
        else:
            return True

    @override
    async def get_userinfo(self, token: str) -> KeycloakUserType | None:
        """Get user information from a token.

        Args:
            token: Access token

        Returns:
            User information

        Raises:
            ValueError: If getting user info fails
        """
        if not await self.validate_token(token):
            raise InvalidTokenError()
        try:
            return await self._get_userinfo_cached(token)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_userinfo")

    @alru_cache(ttl=30, maxsize=100)  # Cache for 30 seconds
    async def _get_userinfo_cached(self, token: str) -> KeycloakUserType:
        return await self.openid_adapter.a_userinfo(token)  # type: ignore[return-value]

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
        """Get user details by user ID.

        Args:
            user_id: User's ID

        Returns:
            User details or None if not found

        Raises:
            ValueError: If getting user fails
        """
        try:
            return await self.admin_adapter.a_get_user(user_id)
        except KeycloakGetError as e:
            if e.response_code == 404:
                return None
            raise InternalError() from e
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_by_id")

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_user_by_username(self, username: str) -> KeycloakUserType | None:
        """Get user details by username.

        Args:
            username: User's username

        Returns:
            User details or None if not found

        Raises:
            ValueError: If query fails
        """
        try:
            users = await self.admin_adapter.a_get_users({"username": username})
            return users[0] if users else None
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_by_username")

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_user_by_email(self, email: str) -> KeycloakUserType | None:
        """Get user details by email.

        Args:
            email: User's email

        Returns:
            User details or None if not found

        Raises:
            ValueError: If query fails
        """
        try:
            users = await self.admin_adapter.a_get_users({"email": email})
            return users[0] if users else None
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_by_email")

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_user_roles(self, user_id: str) -> list[KeycloakRoleType] | None:
        """Get roles assigned to a user.

        Args:
            user_id: User's ID

        Returns:
            List of roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return await self.admin_adapter.a_get_realm_roles_of_user(user_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_roles")

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
        """Get client-specific roles assigned to a user.

        Args:
            user_id: User's ID
            client_id: Client ID

        Returns:
            List of client-specific roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return await self.admin_adapter.a_get_client_roles_of_user(user_id, client_id)
        except KeycloakError as e:
            raise InternalError() from e

    @override
    async def create_user(self, user_data: dict[str, Any]) -> str | None:
        """Create a new user in Keycloak.

        Args:
            user_data: User data including username, email, etc.

        Returns:
            ID of the created user

        Raises:
            ValueError: If creating user fails
        """
        # This is a write operation, no caching needed
        try:
            user_id = await self.admin_adapter.a_create_user(user_data)

            # Clear related caches
            self.clear_all_caches()
        except KeycloakError as e:
            self._handle_user_exception(e, "create_user", user_data)
        else:
            return user_id

    @override
    async def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
        """Update user details.

        Args:
            user_id: User's ID
            user_data: User data to update

        Raises:
            ValueError: If updating user fails
        """
        # This is a write operation, no caching needed
        try:
            await self.admin_adapter.a_update_user(user_id, user_data)

            # Clear user-related caches
            self.clear_all_caches()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "update_user")

    @override
    async def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
        """Reset a user's password.

        Args:
            user_id: User's ID
            password: New password
            temporary: Whether the password is temporary and should be changed on next login

        Raises:
            ValueError: If password reset fails
        """
        # This is a write operation, no caching needed
        try:
            await self.admin_adapter.a_set_user_password(user_id, password, temporary)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "reset_password")

    @override
    async def assign_realm_role(self, user_id: str, role_name: str) -> None:
        """Assign a realm role to a user.

        Args:
            user_id: User's ID
            role_name: Role name to assign

        Raises:
            ValueError: If role assignment fails
        """
        # This is a write operation, no caching needed
        try:
            # Get role representation
            role = await self.admin_adapter.a_get_realm_role(role_name)
            # Assign role to user
            await self.admin_adapter.a_assign_realm_roles(user_id, [role])

            # Clear role-related caches
            if hasattr(self.get_user_roles, "cache_clear"):
                self.get_user_roles.cache_clear()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "assign_realm_role")

    @override
    async def remove_realm_role(self, user_id: str, role_name: str) -> None:
        """Remove a realm role from a user.

        Args:
            user_id: User's ID
            role_name: Role name to remove

        Raises:
            ValueError: If role removal fails
        """
        # This is a write operation, no caching needed
        try:
            # Get role representation
            role = await self.admin_adapter.a_get_realm_role(role_name)
            # Remove role from user
            await self.admin_adapter.a_delete_realm_roles_of_user(user_id, [role])

            # Clear role-related caches
            if hasattr(self.get_user_roles, "cache_clear"):
                self.get_user_roles.cache_clear()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "remove_realm_role")

    @override
    async def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Assign a client-specific role to a user.

        Args:
            user_id: User's ID
            client_id: Client ID
            role_name: Role name to assign

        Raises:
            ValueError: If role assignment fails
        """
        # This is a write operation, no caching needed
        try:
            # Get client
            client = await self.admin_adapter.a_get_client_id(client_id)
            if client is None:
                raise ValueError("client_id resolved to None")
            # Get role representation
            # Keycloak admin adapter methods accept these types at runtime
            role = await self.admin_adapter.a_get_client_role(client, role_name)
            # Assign role to user
            await self.admin_adapter.a_assign_client_role(user_id, client, [role])

            # Clear role-related caches
            if hasattr(self.get_client_roles_for_user, "cache_clear"):
                self.get_client_roles_for_user.cache_clear()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "assign_client_role")

    @override
    async def create_realm_role(
        self,
        role_name: str,
        description: str | None = None,
        skip_exists: bool = True,
    ) -> dict[str, Any] | None:
        """Create a new realm role.

        Args:
            role_name: Role name
            description: Optional role description
            skip_exists: Skip creation if role already exists

        Returns:
            Created role details

        Raises:
            ValueError: If role creation fails
        """
        # This is a write operation, no caching needed
        try:
            role_data = {"name": role_name}
            if description:
                role_data["description"] = description

            await self.admin_adapter.a_create_realm_role(role_data, skip_exists=skip_exists)

            # Clear realm roles cache
            if hasattr(self.get_realm_roles, "cache_clear"):
                self.get_realm_roles.cache_clear()

            return await self.admin_adapter.a_get_realm_role(role_name)

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "create_realm_role")

    @override
    async def create_client_role(
        self,
        client_id: str,
        role_name: str,
        description: str | None = None,
        skip_exists: bool = True,
    ) -> dict[str, Any] | None:
        """Create a new client role.

        Args:
            client_id: Client ID or client name
            role_name: Role name
            skip_exists: Skip creation if role already exists
            description: Optional role description

        Returns:
            Created role details
        """
        # This is a write operation, no caching needed
        try:
            resolved_client_id = await self.admin_adapter.a_get_client_id(client_id)
            if resolved_client_id is None:
                raise ValueError(f"Client ID not found: {client_id}")

            # Prepare role data
            role_data = {"name": role_name}
            if description:
                role_data["description"] = description

            # Create client role
            await self.admin_adapter.a_create_client_role(resolved_client_id, role_data, skip_exists=skip_exists)

            # Clear related caches if they exist
            if hasattr(self.get_client_roles_for_user, "cache_clear"):
                self.get_client_roles_for_user.cache_clear()

            # Return created role
            return await self.admin_adapter.a_get_client_role(resolved_client_id, role_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "create_client_role")

    @override
    async def delete_realm_role(self, role_name: str) -> None:
        """Delete a realm role.

        Args:
            role_name: Role name to delete

        Raises:
            ValueError: If role deletion fails
        """
        # This is a write operation, no caching needed
        try:
            await self.admin_adapter.a_delete_realm_role(role_name)

            # Clear realm roles cache
            if hasattr(self.get_realm_roles, "cache_clear"):
                self.get_realm_roles.cache_clear()

            # We also need to clear user role caches since they might contain this role
            if hasattr(self.get_user_roles, "cache_clear"):
                self.get_user_roles.cache_clear()

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "delete_realm_role")

    @override
    @alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
    async def get_service_account_id(self) -> str | None:
        """Get service account user ID for the current client.

        Returns:
            Service account user ID

        Raises:
            ValueError: If getting service account fails
        """
        try:
            client_id = await self.get_client_id(self.configs.CLIENT_ID)
            if client_id is None:
                return None
            service_account = await self.admin_adapter.a_get_client_service_account_user(client_id)
            return service_account.get("id")
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_service_account_id")

    @override
    @alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
    async def get_well_known_config(self) -> dict[str, Any] | None:
        """Get the well-known OpenID configuration.

        Returns:
            OIDC configuration

        Raises:
            ValueError: If getting configuration fails
        """
        try:
            return await self.openid_adapter.a_well_known()
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_well_known_config")

    @override
    @alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
    async def get_certs(self) -> dict[str, Any] | None:
        """Get the JWT verification certificates.

        Returns:
            Certificate information

        Raises:
            ValueError: If getting certificates fails
        """
        try:
            return await self.openid_adapter.a_certs()
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_certs")

    @override
    async def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType | None:
        """Exchange authorization code for token.

        Args:
            code: Authorization code
            redirect_uri: Redirect URI used in authorization request

        Returns:
            Token response

        Raises:
            ValueError: If token exchange fails
        """
        # Authorization codes can only be used once, don't cache
        try:
            return await self.openid_adapter.a_token(
                grant_type="authorization_code",
                code=code,
                redirect_uri=redirect_uri,
            )
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_token_from_code")

    @override
    async def get_client_credentials_token(self) -> KeycloakTokenType | None:
        """Get token using client credentials.

        Returns:
            Token response

        Raises:
            ValueError: If token acquisition fails
        """
        # Tokens are time-sensitive, don't cache
        try:
            return await self.openid_adapter.a_token(grant_type="client_credentials")
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_client_credentials_token")

    @override
    @alru_cache(ttl=30, maxsize=50)  # Cache for 30 seconds with limited entries
    async def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType] | None:
        """Search for users by username, email, or name.

        Args:
            query: Search query
            max_results: Maximum number of results to return

        Returns:
            List of matching users

        Raises:
            ValueError: If search fails
        """
        try:
            # Try searching by different fields
            users = []

            # Search by username
            users.extend(await self.admin_adapter.a_get_users({"username": query, "max": max_results}))

            # Search by email if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                email_users = await self.admin_adapter.a_get_users({"email": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in email_users if user["id"] not in user_ids])

            # Search by firstName if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                first_name_users = await self.admin_adapter.a_get_users({"firstName": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in first_name_users if user["id"] not in user_ids])

            # Search by lastName if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                last_name_users = await self.admin_adapter.a_get_users({"lastName": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in last_name_users if user["id"] not in user_ids])

            return users[:max_results]
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "search_users")

    @override
    @alru_cache(ttl=3600, maxsize=50)  # Cache for 1 hour
    async def get_client_secret(self, client_id: str) -> str | None:
        """Get client secret.

        Args:
            client_id: Client ID

        Returns:
            Client secret

        Raises:
            ValueError: If getting secret fails
        """
        try:
            client = await self.admin_adapter.a_get_client(client_id)
            return client.get("secret", "")
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_client_secret")

    @override
    @alru_cache(ttl=3600, maxsize=50)  # Cache for 1 hour
    async def get_client_id(self, client_name: str) -> str | None:
        """Get client ID by client name.

        Args:
            client_name: Name of the client

        Returns:
            Client ID

        Raises:
            ValueError: If client not found
        """
        try:
            return await self.admin_adapter.a_get_client_id(client_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_client_id")

    @override
    @alru_cache(ttl=300, maxsize=1)  # Cache for 5 minutes
    async def get_realm_roles(self) -> list[dict[str, Any]] | None:
        """Get all realm roles.

        Returns:
            List of realm roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return await self.admin_adapter.a_get_realm_roles()
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_realm_roles")

    @override
    @alru_cache(ttl=300, maxsize=1)  # Cache for 5 minutes
    async def get_realm_role(self, role_name: str) -> dict | None:
        """Get realm role.

        Args:
            role_name: Role name
        Returns:
            A realm role

        Raises:
            ValueError: If getting role fails
        """
        try:
            return await self.admin_adapter.a_get_realm_role(role_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_realm_role")

    @override
    async def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Remove a client-specific role from a user.

        Args:
            user_id: User's ID
            client_id: Client ID
            role_name: Role name to remove

        Raises:
            ValueError: If role removal fails
        """
        try:
            client = await self.admin_adapter.a_get_client_id(client_id)
            if client is None:
                raise ValueError("client_id resolved to None")
            # Keycloak admin adapter methods accept these types at runtime
            role = await self.admin_adapter.a_get_client_role(client, role_name)
            await self.admin_adapter.a_delete_client_roles_of_user(user_id, client, [role])

            if hasattr(self.get_client_roles_for_user, "cache_clear"):
                self.get_client_roles_for_user.cache_clear()
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "remove_client_role")

    @override
    async def clear_user_sessions(self, user_id: str) -> None:
        """Clear all sessions for a user.

        Args:
            user_id: User's ID

        Raises:
            ValueError: If clearing sessions fails
        """
        try:
            await self.admin_adapter.a_user_logout(user_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "clear_user_sessions")

    @override
    async def logout(self, refresh_token: str) -> None:
        """Logout user by invalidating their refresh token.

        Args:
            refresh_token: Refresh token to invalidate

        Raises:
            ValueError: If logout fails
        """
        try:
            await self.openid_adapter.a_logout(refresh_token)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "logout")

    @override
    async def introspect_token(self, token: str) -> dict[str, Any] | None:
        """Introspect token to get detailed information about it.

        Args:
            token: Access token

        Returns:
            Token introspection details

        Raises:
            ValueError: If token introspection fails
        """
        try:
            return await self.openid_adapter.a_introspect(token)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "introspect_token")

    @override
    async def get_token_info(self, token: str) -> dict[str, Any] | None:
        """Decode token to get its claims.

        Args:
            token: Access token

        Returns:
            Dictionary of token claims

        Raises:
            ValueError: If token decoding fails
        """
        try:
            return await self.openid_adapter.a_decode_token(
                token,
                key=await self.get_public_key(),
            )
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_token_info")

    @override
    async def delete_user(self, user_id: str) -> None:
        """Delete a user from Keycloak by their ID.

        Args:
            user_id: The ID of the user to delete

        Raises:
            ValueError: If the deletion fails
        """
        try:
            await self.admin_adapter.a_delete_user(user_id=user_id)

            if hasattr(self.get_user_by_username, "cache_clear"):
                self.get_user_by_username.cache_clear()

            logger.info(f"Successfully deleted user with ID {user_id}")

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "delete_user")

    @override
    async def has_role(self, token: str, role_name: str) -> bool:
        """Check if a user has a specific role.

        Args:
            token: Access token
            role_name: Role name to check

        Returns:
            True if user has the role, False otherwise
        """
        # Not caching this result as token validation is time-sensitive
        try:
            user_info = await self.get_userinfo(token)
            if not user_info:
                return False

            # Check realm roles
            realm_access = user_info.get("realm_access", {})
            roles = realm_access.get("roles", [])
            if role_name in roles:
                return True

            # Check roles for the configured client
            resource_access = user_info.get("resource_access", {})
            client_roles = resource_access.get(self.configs.CLIENT_ID, {}).get("roles", [])
            if role_name in client_roles:
                return True

        except Exception as e:
            logger.debug(f"Role check failed: {e!s}")
            return False
        else:
            return False

    @override
    async def has_any_of_roles(self, token: str, role_names: frozenset[str]) -> bool:
        """Check if a user has any of the specified roles.

        Args:
            token: Access token
            role_names: Set of role names to check

        Returns:
            True if user has any of the roles, False otherwise
        """
        try:
            user_info = await self.get_userinfo(token)
            if not user_info:
                return False

            # Check realm roles first
            realm_access = user_info.get("realm_access", {})
            realm_roles = set(realm_access.get("roles", []))
            if role_names.intersection(realm_roles):
                return True

            # Check roles for the configured client
            resource_access = user_info.get("resource_access", {})
            client_roles = set(resource_access.get(self.configs.CLIENT_ID, {}).get("roles", []))
            if role_names.intersection(client_roles):
                return True

        except Exception as e:
            logger.debug(f"Role check failed: {e!s}")
            return False
        else:
            return False

    @override
    async def has_all_roles(self, token: str, role_names: frozenset[str]) -> bool:
        """Check if a user has all the specified roles.

        Args:
            token: Access token
            role_names: Set of role names to check

        Returns:
            True if user has all the roles, False otherwise
        """
        try:
            user_info = await self.get_userinfo(token)
            if not user_info:
                return False

            # Get all user roles
            all_roles = set()

            # Add realm roles
            realm_access = user_info.get("realm_access", {})
            all_roles.update(realm_access.get("roles", []))

            # Add roles from the configured client
            resource_access = user_info.get("resource_access", {})
            client_roles = resource_access.get(self.configs.CLIENT_ID, {}).get("roles", [])
            all_roles.update(client_roles)

            # Check if all required roles are present
            return role_names.issubset(all_roles)

        except Exception as e:
            logger.debug(f"All roles check failed: {e!s}")
            return False

    @override
    async def check_permissions(self, token: str, resource: str, scope: str) -> bool:
        """Check if a user has permission to access a resource with the specified scope.

        Args:
            token: Access token
            resource: Resource name
            scope: Permission scope

        Returns:
            True if permission granted, False otherwise
        """
        try:
            # Use UMA permissions endpoint to check specific resource and scope
            permissions = await self.openid_adapter.a_uma_permissions(token, permissions=f"{resource}#{scope}")

            # Check if the response indicates permission is granted
            if not permissions or not isinstance(permissions, list):
                logger.debug("No permissions returned or invalid response format")
                return False

            # Look for the specific permission in the response
            for perm in permissions:
                if perm.get("rsname") == resource and scope in perm.get("scopes", []):
                    return True

        except KeycloakError as e:
            logger.debug(f"Permission check failed with Keycloak error: {e!s}")
            return False
        except Exception as e:
            logger.debug(f"Permission check failed with unexpected error: {e!s}")
            return False
        else:
            return False

    @override
    async def create_realm(self, realm_name: str, skip_exists: bool = True, **kwargs: Any) -> dict[str, Any] | None:
        """Create a Keycloak realm with minimum required fields and optional additional config.

        Args:
            realm_name: The realm identifier (required)
            skip_exists: Skip creation if realm already exists
            kwargs: Additional optional configurations for the realm

        Returns:
            Dictionary with realm information and status

        Raises:
            InternalError: If realm creation fails
        """
        payload = {
            "realm": realm_name,
            "enabled": kwargs.get("enabled", True),
            "displayName": kwargs.get("display_name", realm_name),
        }

        # Add any additional parameters from kwargs
        for key, value in kwargs.items():
            # Skip display_name as it's already handled
            if key == "display_name":
                continue

            # Convert Python snake_case to Keycloak camelCase
            camel_key = StringUtils.snake_to_camel_case(key)
            payload[camel_key] = value

        try:
            await self.admin_adapter.a_create_realm(payload=payload, skip_exists=skip_exists)
        except KeycloakError as e:
            logger.debug(f"Failed to create realm: {e!s}")

            # Handle realm already exists with skip_exists option
            if skip_exists:
                error_message = self._extract_error_message(e).lower()
                if "already exists" in error_message and "realm" in error_message:
                    return {"realm": realm_name, "status": "already_exists", "config": payload}

            # Use the mixin to handle realm-specific errors
            self._handle_realm_exception(e, "create_realm", realm_name)
        else:
            return {"realm": realm_name, "status": "created", "config": payload}

    @override
    async def get_realm(self, realm_name: str) -> dict[str, Any] | None:
        """Get realm details by realm name.

        Args:
            realm_name: Name of the realm

        Returns:
            Realm details

        Raises:
            InternalError: If getting realm fails
        """
        try:
            return await self.admin_adapter.a_get_realm(realm_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_realm")

    @override
    async def update_realm(self, realm_name: str, **kwargs: Any) -> dict[str, Any] | None:
        """Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled).

        Args:
            realm_name: Realm name (not the realm id).
            **kwargs: RealmRepresentation attributes to update (e.g. displayName, organizationsEnabled).

        Returns:
            Response from Keycloak, or None on error (handled via exception).
        """
        try:
            return await self.admin_adapter.a_update_realm(realm_name, dict(kwargs))
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "update_realm")

    @override
    async def create_client(
        self,
        client_id: str,
        realm: str | None = None,
        skip_exists: bool = True,
        **kwargs: Any,
    ) -> dict[str, Any] | None:
        """Create a Keycloak client with minimum required fields and optional additional config.

        Args:
            client_id: The client identifier (required)
            realm: Target realm name (uses the current realm in KeycloakAdmin if not specified)
            skip_exists: Skip creation if client already exists
            kwargs: Additional optional configurations for the client

        Returns:
            Dictionary with client information

        Raises:
            InternalError: If client creation fails
        """
        original_realm = self.admin_adapter.connection.realm_name

        try:
            # Set the target realm if provided
            if realm and realm != original_realm:
                self.admin_adapter.connection.realm_name = realm

            public_client = kwargs.get("public_client", False)

            # Prepare the minimal client payload
            payload = {
                "clientId": client_id,
                "enabled": kwargs.get("enabled", True),
                "protocol": kwargs.get("protocol", "openid-connect"),
                "name": kwargs.get("name", client_id),
                "publicClient": public_client,
            }

            # Enable service accounts for confidential clients by default
            if not public_client:
                payload["serviceAccountsEnabled"] = kwargs.get("service_account_enabled", True)
                payload["clientAuthenticatorType"] = "client-secret"

            for key, value in kwargs.items():
                if key in ["enabled", "protocol", "name", "public_client", "service_account_enabled"]:
                    continue

                # Convert snake_case to camelCase
                camel_key = StringUtils.snake_to_camel_case(key)
                payload[camel_key] = value

            internal_client_id = None
            try:
                internal_client_id = await self.admin_adapter.a_create_client(payload, skip_exists=skip_exists)
            except KeycloakError as e:
                logger.debug(f"Failed to create client: {e!s}")

                # Handle client already exists with skip_exists option
                if skip_exists:
                    error_message = self._extract_error_message(e).lower()
                    if "already exists" in error_message and "client" in error_message:
                        return {
                            "client_id": client_id,
                            "status": "already_exists",
                            "realm": self.admin_adapter.connection.realm_name,
                        }

                # Use the mixin to handle client-specific errors
                client_data = {"clientId": client_id, "name": kwargs.get("name", client_id)}
                self._handle_client_exception(e, "create_client", client_data)

            return {
                "client_id": client_id,
                "internal_client_id": internal_client_id,
                "realm": self.admin_adapter.connection.realm_name,
                "status": "created",
            }

        finally:
            # Always restore the original realm
            if realm and realm != original_realm:
                self.admin_adapter.connection.realm_name = original_realm

    @override
    async def add_realm_roles_to_composite(self, composite_role_name: str, child_role_names: list[str]) -> None:
        """Add realm roles to a composite role.

        Args:
            composite_role_name: Name of the composite role
            child_role_names: List of child role names to add
        """
        try:
            child_roles = []
            for role_name in child_role_names:
                try:
                    role = await self.admin_adapter.a_get_realm_role(role_name)
                    child_roles.append(role)
                except KeycloakGetError as e:
                    if e.response_code == 404:
                        logger.warning(f"Child role not found: {role_name}")
                        continue
                    raise

            if child_roles:
                await self.admin_adapter.a_add_composite_realm_roles_to_role(
                    role_name=composite_role_name,
                    roles=child_roles,
                )
                logger.info(f"Added {len(child_roles)} realm roles to composite role: {composite_role_name}")

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "add_realm_roles_to_composite")

    @override
    async def add_client_roles_to_composite(
        self,
        composite_role_name: str,
        client_id: str,
        child_role_names: list[str],
    ) -> None:
        """Add client roles to a composite role.

        Args:
            composite_role_name: Name of the composite role
            client_id: Client ID or client name
            child_role_names: List of child role names to add
        """
        try:
            internal_client_id = await self.admin_adapter.a_get_client_id(client_id)
            if internal_client_id is None:
                raise ValueError("client_id resolved to None")

            child_roles = []
            for role_name in child_role_names:
                try:
                    # Keycloak admin adapter methods accept these types at runtime
                    role = await self.admin_adapter.a_get_client_role(internal_client_id, role_name)
                    child_roles.append(role)
                except KeycloakGetError as e:
                    if e.response_code == 404:
                        logger.warning(f"Client role not found: {role_name}")
                        continue
                    raise

            if child_roles:
                if internal_client_id is None:
                    raise ValueError("Client ID not found")
                resolved_client_id: str = internal_client_id
                await self.admin_adapter.a_add_composite_client_roles_to_role(
                    role_name=composite_role_name,
                    client_role_id=resolved_client_id,
                    roles=child_roles,
                )
                logger.info(f"Added {len(child_roles)} client roles to composite role: {composite_role_name}")

        except KeycloakError as e:
            self._handle_keycloak_exception(e, "add_client_roles_to_composite")

    @override
    async def get_composite_realm_roles(self, role_name: str) -> list[dict[str, Any]] | None:
        """Get composite roles for a realm role.

        Args:
            role_name: Name of the role

        Returns:
            List of composite roles
        """
        try:
            return await self.admin_adapter.a_get_composite_realm_roles_of_role(role_name)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_composite_realm_roles")

    @override
    async def get_organizations(self, query: dict | None = None) -> list[dict[str, Any]]:
        """Fetch all organizations, optionally filtered by query parameters."""
        try:
            return await self.admin_adapter.a_get_organizations(query=query)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_organizations")

    @override
    async def get_organization(self, organization_id: str) -> dict[str, Any]:
        """Get representation of the organization by ID."""
        try:
            return await self.admin_adapter.a_get_organization(organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_organization")

    @override
    async def create_organization(self, name: str, alias: str, **kwargs: Any) -> str | None:
        """Create a new organization. Name and alias must be unique. Returns org_id."""
        try:
            payload = {"name": name, "alias": alias}
            for key, value in kwargs.items():
                if key in ["name", "alias"]:
                    continue

                # Convert snake_case to camelCase
                camel_key = StringUtils.snake_to_camel_case(key)
                payload[camel_key] = value

            return await self.admin_adapter.a_create_organization(payload=payload)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "create_organization")

    @override
    async def update_organization(self, organization_id: str, **kwargs: Any) -> dict[str, Any]:
        """Update an existing organization. Kwargs are organization attributes (e.g. name, alias)."""
        try:
            payload = {}
            for key, value in kwargs.items():
                # Convert snake_case to camelCase
                camel_key = StringUtils.snake_to_camel_case(key)
                payload[camel_key] = value

            return await self.admin_adapter.a_update_organization(organization_id=organization_id, payload=payload)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "update_organization")

    @override
    async def delete_organization(self, organization_id: str) -> dict[str, Any]:
        """Delete an organization."""
        try:
            return await self.admin_adapter.a_delete_organization(organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "delete_organization")

    @override
    async def get_user_organizations(self, user_id: str) -> list[dict[str, Any]]:
        """Get organizations by user id."""
        try:
            return await self.admin_adapter.a_get_user_organizations(user_id=user_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_user_organizations")

    @override
    async def get_organization_members(self, organization_id: str, query: dict | None = None) -> list[dict[str, Any]]:
        """Get members by organization id, optionally filtered by query parameters."""
        try:
            return await self.admin_adapter.a_get_organization_members(organization_id=organization_id, query=query)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_organization_members")

    @override
    async def get_organization_members_count(self, organization_id: str) -> int:
        """Get the number of members in the organization."""
        try:
            return await self.admin_adapter.a_get_organization_members_count(organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "get_organization_members_count")

    @override
    async def organization_user_add(self, user_id: str, organization_id: str) -> bytes:
        """Add a user to an organization."""
        try:
            return await self.admin_adapter.a_organization_user_add(user_id=user_id, organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "organization_user_add")

    @override
    async def organization_user_remove(self, user_id: str, organization_id: str) -> dict[str, Any]:
        """Remove a user from an organization."""
        try:
            return await self.admin_adapter.a_organization_user_remove(user_id=user_id, organization_id=organization_id)
        except KeycloakError as e:
            self._handle_keycloak_exception(e, "organization_user_remove")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.configs instance-attribute

configs: KeycloakConfig = (
    KEYCLOAK
    if keycloak_configs is None
    else keycloak_configs
)

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.openid_adapter instance-attribute

openid_adapter = _get_openid_client(configs)

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.admin_adapter property

admin_adapter: KeycloakAdmin

Get the admin adapter, refreshing it if necessary.

Returns:

Type Description
KeycloakAdmin

KeycloakAdmin instance

Raises:

Type Description
UnauthenticatedError

If admin client is not available due to authentication issues

UnavailableError

If Keycloak service is unavailable

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.clear_all_caches

clear_all_caches() -> None

Clear all cached values.

Source code in archipy/adapters/keycloak/adapters.py
def clear_all_caches(self) -> None:
    """Clear all cached values."""
    for attr_name in dir(self):
        attr = getattr(self, attr_name)
        if hasattr(attr, "cache_clear"):
            attr.cache_clear()

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_public_key async

get_public_key() -> PublicKeyType

Get the public key used to verify tokens.

Returns:

Type Description
PublicKeyType

JWK key object used to verify signatures

Raises:

Type Description
ServiceUnavailableError

If Keycloak service is unavailable

InternalError

If there's an internal error processing the public key

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour, public key rarely changes
async def get_public_key(self) -> PublicKeyType:
    """Get the public key used to verify tokens.

    Returns:
        JWK key object used to verify signatures

    Raises:
        ServiceUnavailableError: If Keycloak service is unavailable
        InternalError: If there's an internal error processing the public key
    """
    try:
        from jwcrypto import jwk

        keys_info = await self.openid_adapter.a_public_key()
        key = f"-----BEGIN PUBLIC KEY-----\n{keys_info}\n-----END PUBLIC KEY-----"
        return jwk.JWK.from_pem(key.encode("utf-8"))
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_public_key")
    except Exception as e:
        raise InternalError(additional_data={"operation": "get_public_key", "error": str(e)}) from e

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_token async

get_token(
    username: str, password: str
) -> KeycloakTokenType | None

Get a user token by username and password using the Resource Owner Password Credentials Grant.

Warning

This method uses the direct password grant flow, which is less secure and not recommended for user login in production environments. Instead, prefer the web-based OAuth 2.0 Authorization Code Flow (use get_token_from_code) for secure authentication. Use this method only for testing, administrative tasks, or specific service accounts where direct credential use is acceptable and properly secured.

Parameters:

Name Type Description Default
username str

User's username

required
password str

User's password

required

Returns:

Type Description
KeycloakTokenType | None

Token response containing access_token, refresh_token, etc.

Raises:

Type Description
InvalidCredentialsError

If username or password is invalid

ServiceUnavailableError

If Keycloak service is unavailable

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_token(self, username: str, password: str) -> KeycloakTokenType | None:
    """Get a user token by username and password using the Resource Owner Password Credentials Grant.

    Warning:
        This method uses the direct password grant flow, which is less secure and not recommended
        for user login in production environments. Instead, prefer the web-based OAuth 2.0
        Authorization Code Flow (use `get_token_from_code`) for secure authentication.
        Use this method only for testing, administrative tasks, or specific service accounts
        where direct credential use is acceptable and properly secured.

    Args:
        username: User's username
        password: User's password

    Returns:
        Token response containing access_token, refresh_token, etc.

    Raises:
        InvalidCredentialsError: If username or password is invalid
        ServiceUnavailableError: If Keycloak service is unavailable
    """
    try:
        return await self.openid_adapter.a_token(grant_type="password", username=username, password=password)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_token")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.refresh_token async

refresh_token(
    refresh_token: str,
) -> KeycloakTokenType | None

Refresh an existing token using a refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token string

required

Returns:

Type Description
KeycloakTokenType | None

New token response containing access_token, refresh_token, etc.

Raises:

Type Description
InvalidTokenError

If refresh token is invalid or expired

ServiceUnavailableError

If Keycloak service is unavailable

Source code in archipy/adapters/keycloak/adapters.py
@override
async def refresh_token(self, refresh_token: str) -> KeycloakTokenType | None:
    """Refresh an existing token using a refresh token.

    Args:
        refresh_token: Refresh token string

    Returns:
        New token response containing access_token, refresh_token, etc.

    Raises:
        InvalidTokenError: If refresh token is invalid or expired
        ServiceUnavailableError: If Keycloak service is unavailable
    """
    try:
        return await self.openid_adapter.a_refresh_token(refresh_token)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "refresh_token")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.validate_token async

validate_token(token: str) -> bool

Validate if a token is still valid.

Parameters:

Name Type Description Default
token str

Access token to validate

required

Returns:

Type Description
bool

True if token is valid, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def validate_token(self, token: str) -> bool:
    """Validate if a token is still valid.

    Args:
        token: Access token to validate

    Returns:
        True if token is valid, False otherwise
    """
    # Not caching validation results as tokens are time-sensitive
    try:
        await self.openid_adapter.a_decode_token(
            token,
            key=await self.get_public_key(),
        )
    except Exception as e:
        logger.debug(f"Token validation failed: {e!s}")
        return False
    else:
        return True

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_userinfo async

get_userinfo(token: str) -> KeycloakUserType | None

Get user information from a token.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
KeycloakUserType | None

User information

Raises:

Type Description
ValueError

If getting user info fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_userinfo(self, token: str) -> KeycloakUserType | None:
    """Get user information from a token.

    Args:
        token: Access token

    Returns:
        User information

    Raises:
        ValueError: If getting user info fails
    """
    if not await self.validate_token(token):
        raise InvalidTokenError()
    try:
        return await self._get_userinfo_cached(token)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_userinfo")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_by_id async

get_user_by_id(user_id: str) -> KeycloakUserType | None

Get user details by user ID.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If getting user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
    """Get user details by user ID.

    Args:
        user_id: User's ID

    Returns:
        User details or None if not found

    Raises:
        ValueError: If getting user fails
    """
    try:
        return await self.admin_adapter.a_get_user(user_id)
    except KeycloakGetError as e:
        if e.response_code == 404:
            return None
        raise InternalError() from e
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_by_id")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_by_username async

get_user_by_username(
    username: str,
) -> KeycloakUserType | None

Get user details by username.

Parameters:

Name Type Description Default
username str

User's username

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If query fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_user_by_username(self, username: str) -> KeycloakUserType | None:
    """Get user details by username.

    Args:
        username: User's username

    Returns:
        User details or None if not found

    Raises:
        ValueError: If query fails
    """
    try:
        users = await self.admin_adapter.a_get_users({"username": username})
        return users[0] if users else None
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_by_username")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_by_email async

get_user_by_email(email: str) -> KeycloakUserType | None

Get user details by email.

Parameters:

Name Type Description Default
email str

User's email

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If query fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_user_by_email(self, email: str) -> KeycloakUserType | None:
    """Get user details by email.

    Args:
        email: User's email

    Returns:
        User details or None if not found

    Raises:
        ValueError: If query fails
    """
    try:
        users = await self.admin_adapter.a_get_users({"email": email})
        return users[0] if users else None
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_by_email")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_roles async

get_user_roles(
    user_id: str,
) -> list[KeycloakRoleType] | None

Get roles assigned to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Returns:

Type Description
list[KeycloakRoleType] | None

List of roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_user_roles(self, user_id: str) -> list[KeycloakRoleType] | None:
    """Get roles assigned to a user.

    Args:
        user_id: User's ID

    Returns:
        List of roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return await self.admin_adapter.a_get_realm_roles_of_user(user_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_roles")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_client_roles_for_user async

get_client_roles_for_user(
    user_id: str, client_id: str
) -> list[KeycloakRoleType]

Get client-specific roles assigned to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required

Returns:

Type Description
list[KeycloakRoleType]

List of client-specific roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
    """Get client-specific roles assigned to a user.

    Args:
        user_id: User's ID
        client_id: Client ID

    Returns:
        List of client-specific roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return await self.admin_adapter.a_get_client_roles_of_user(user_id, client_id)
    except KeycloakError as e:
        raise InternalError() from e

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.create_user async

create_user(user_data: dict[str, Any]) -> str | None

Create a new user in Keycloak.

Parameters:

Name Type Description Default
user_data dict[str, Any]

User data including username, email, etc.

required

Returns:

Type Description
str | None

ID of the created user

Raises:

Type Description
ValueError

If creating user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def create_user(self, user_data: dict[str, Any]) -> str | None:
    """Create a new user in Keycloak.

    Args:
        user_data: User data including username, email, etc.

    Returns:
        ID of the created user

    Raises:
        ValueError: If creating user fails
    """
    # This is a write operation, no caching needed
    try:
        user_id = await self.admin_adapter.a_create_user(user_data)

        # Clear related caches
        self.clear_all_caches()
    except KeycloakError as e:
        self._handle_user_exception(e, "create_user", user_data)
    else:
        return user_id

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.update_user async

update_user(
    user_id: str, user_data: dict[str, Any]
) -> None

Update user details.

Parameters:

Name Type Description Default
user_id str

User's ID

required
user_data dict[str, Any]

User data to update

required

Raises:

Type Description
ValueError

If updating user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
    """Update user details.

    Args:
        user_id: User's ID
        user_data: User data to update

    Raises:
        ValueError: If updating user fails
    """
    # This is a write operation, no caching needed
    try:
        await self.admin_adapter.a_update_user(user_id, user_data)

        # Clear user-related caches
        self.clear_all_caches()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "update_user")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.reset_password async

reset_password(
    user_id: str, password: str, temporary: bool = False
) -> None

Reset a user's password.

Parameters:

Name Type Description Default
user_id str

User's ID

required
password str

New password

required
temporary bool

Whether the password is temporary and should be changed on next login

False

Raises:

Type Description
ValueError

If password reset fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
    """Reset a user's password.

    Args:
        user_id: User's ID
        password: New password
        temporary: Whether the password is temporary and should be changed on next login

    Raises:
        ValueError: If password reset fails
    """
    # This is a write operation, no caching needed
    try:
        await self.admin_adapter.a_set_user_password(user_id, password, temporary)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "reset_password")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.assign_realm_role async

assign_realm_role(user_id: str, role_name: str) -> None

Assign a realm role to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
role_name str

Role name to assign

required

Raises:

Type Description
ValueError

If role assignment fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def assign_realm_role(self, user_id: str, role_name: str) -> None:
    """Assign a realm role to a user.

    Args:
        user_id: User's ID
        role_name: Role name to assign

    Raises:
        ValueError: If role assignment fails
    """
    # This is a write operation, no caching needed
    try:
        # Get role representation
        role = await self.admin_adapter.a_get_realm_role(role_name)
        # Assign role to user
        await self.admin_adapter.a_assign_realm_roles(user_id, [role])

        # Clear role-related caches
        if hasattr(self.get_user_roles, "cache_clear"):
            self.get_user_roles.cache_clear()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "assign_realm_role")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.remove_realm_role async

remove_realm_role(user_id: str, role_name: str) -> None

Remove a realm role from a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
role_name str

Role name to remove

required

Raises:

Type Description
ValueError

If role removal fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def remove_realm_role(self, user_id: str, role_name: str) -> None:
    """Remove a realm role from a user.

    Args:
        user_id: User's ID
        role_name: Role name to remove

    Raises:
        ValueError: If role removal fails
    """
    # This is a write operation, no caching needed
    try:
        # Get role representation
        role = await self.admin_adapter.a_get_realm_role(role_name)
        # Remove role from user
        await self.admin_adapter.a_delete_realm_roles_of_user(user_id, [role])

        # Clear role-related caches
        if hasattr(self.get_user_roles, "cache_clear"):
            self.get_user_roles.cache_clear()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "remove_realm_role")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.assign_client_role async

assign_client_role(
    user_id: str, client_id: str, role_name: str
) -> None

Assign a client-specific role to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required
role_name str

Role name to assign

required

Raises:

Type Description
ValueError

If role assignment fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Assign a client-specific role to a user.

    Args:
        user_id: User's ID
        client_id: Client ID
        role_name: Role name to assign

    Raises:
        ValueError: If role assignment fails
    """
    # This is a write operation, no caching needed
    try:
        # Get client
        client = await self.admin_adapter.a_get_client_id(client_id)
        if client is None:
            raise ValueError("client_id resolved to None")
        # Get role representation
        # Keycloak admin adapter methods accept these types at runtime
        role = await self.admin_adapter.a_get_client_role(client, role_name)
        # Assign role to user
        await self.admin_adapter.a_assign_client_role(user_id, client, [role])

        # Clear role-related caches
        if hasattr(self.get_client_roles_for_user, "cache_clear"):
            self.get_client_roles_for_user.cache_clear()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "assign_client_role")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.create_realm_role async

create_realm_role(
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None

Create a new realm role.

Parameters:

Name Type Description Default
role_name str

Role name

required
description str | None

Optional role description

None
skip_exists bool

Skip creation if role already exists

True

Returns:

Type Description
dict[str, Any] | None

Created role details

Raises:

Type Description
ValueError

If role creation fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def create_realm_role(
    self,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None:
    """Create a new realm role.

    Args:
        role_name: Role name
        description: Optional role description
        skip_exists: Skip creation if role already exists

    Returns:
        Created role details

    Raises:
        ValueError: If role creation fails
    """
    # This is a write operation, no caching needed
    try:
        role_data = {"name": role_name}
        if description:
            role_data["description"] = description

        await self.admin_adapter.a_create_realm_role(role_data, skip_exists=skip_exists)

        # Clear realm roles cache
        if hasattr(self.get_realm_roles, "cache_clear"):
            self.get_realm_roles.cache_clear()

        return await self.admin_adapter.a_get_realm_role(role_name)

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "create_realm_role")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.create_client_role async

create_client_role(
    client_id: str,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None

Create a new client role.

Parameters:

Name Type Description Default
client_id str

Client ID or client name

required
role_name str

Role name

required
skip_exists bool

Skip creation if role already exists

True
description str | None

Optional role description

None

Returns:

Type Description
dict[str, Any] | None

Created role details

Source code in archipy/adapters/keycloak/adapters.py
@override
async def create_client_role(
    self,
    client_id: str,
    role_name: str,
    description: str | None = None,
    skip_exists: bool = True,
) -> dict[str, Any] | None:
    """Create a new client role.

    Args:
        client_id: Client ID or client name
        role_name: Role name
        skip_exists: Skip creation if role already exists
        description: Optional role description

    Returns:
        Created role details
    """
    # This is a write operation, no caching needed
    try:
        resolved_client_id = await self.admin_adapter.a_get_client_id(client_id)
        if resolved_client_id is None:
            raise ValueError(f"Client ID not found: {client_id}")

        # Prepare role data
        role_data = {"name": role_name}
        if description:
            role_data["description"] = description

        # Create client role
        await self.admin_adapter.a_create_client_role(resolved_client_id, role_data, skip_exists=skip_exists)

        # Clear related caches if they exist
        if hasattr(self.get_client_roles_for_user, "cache_clear"):
            self.get_client_roles_for_user.cache_clear()

        # Return created role
        return await self.admin_adapter.a_get_client_role(resolved_client_id, role_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "create_client_role")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.delete_realm_role async

delete_realm_role(role_name: str) -> None

Delete a realm role.

Parameters:

Name Type Description Default
role_name str

Role name to delete

required

Raises:

Type Description
ValueError

If role deletion fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def delete_realm_role(self, role_name: str) -> None:
    """Delete a realm role.

    Args:
        role_name: Role name to delete

    Raises:
        ValueError: If role deletion fails
    """
    # This is a write operation, no caching needed
    try:
        await self.admin_adapter.a_delete_realm_role(role_name)

        # Clear realm roles cache
        if hasattr(self.get_realm_roles, "cache_clear"):
            self.get_realm_roles.cache_clear()

        # We also need to clear user role caches since they might contain this role
        if hasattr(self.get_user_roles, "cache_clear"):
            self.get_user_roles.cache_clear()

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "delete_realm_role")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_service_account_id async

get_service_account_id() -> str | None

Get service account user ID for the current client.

Returns:

Type Description
str | None

Service account user ID

Raises:

Type Description
ValueError

If getting service account fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
async def get_service_account_id(self) -> str | None:
    """Get service account user ID for the current client.

    Returns:
        Service account user ID

    Raises:
        ValueError: If getting service account fails
    """
    try:
        client_id = await self.get_client_id(self.configs.CLIENT_ID)
        if client_id is None:
            return None
        service_account = await self.admin_adapter.a_get_client_service_account_user(client_id)
        return service_account.get("id")
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_service_account_id")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_well_known_config async

get_well_known_config() -> dict[str, Any] | None

Get the well-known OpenID configuration.

Returns:

Type Description
dict[str, Any] | None

OIDC configuration

Raises:

Type Description
ValueError

If getting configuration fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
async def get_well_known_config(self) -> dict[str, Any] | None:
    """Get the well-known OpenID configuration.

    Returns:
        OIDC configuration

    Raises:
        ValueError: If getting configuration fails
    """
    try:
        return await self.openid_adapter.a_well_known()
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_well_known_config")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_certs async

get_certs() -> dict[str, Any] | None

Get the JWT verification certificates.

Returns:

Type Description
dict[str, Any] | None

Certificate information

Raises:

Type Description
ValueError

If getting certificates fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
async def get_certs(self) -> dict[str, Any] | None:
    """Get the JWT verification certificates.

    Returns:
        Certificate information

    Raises:
        ValueError: If getting certificates fails
    """
    try:
        return await self.openid_adapter.a_certs()
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_certs")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_token_from_code async

get_token_from_code(
    code: str, redirect_uri: str
) -> KeycloakTokenType | None

Exchange authorization code for token.

Parameters:

Name Type Description Default
code str

Authorization code

required
redirect_uri str

Redirect URI used in authorization request

required

Returns:

Type Description
KeycloakTokenType | None

Token response

Raises:

Type Description
ValueError

If token exchange fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType | None:
    """Exchange authorization code for token.

    Args:
        code: Authorization code
        redirect_uri: Redirect URI used in authorization request

    Returns:
        Token response

    Raises:
        ValueError: If token exchange fails
    """
    # Authorization codes can only be used once, don't cache
    try:
        return await self.openid_adapter.a_token(
            grant_type="authorization_code",
            code=code,
            redirect_uri=redirect_uri,
        )
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_token_from_code")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_client_credentials_token async

get_client_credentials_token() -> KeycloakTokenType | None

Get token using client credentials.

Returns:

Type Description
KeycloakTokenType | None

Token response

Raises:

Type Description
ValueError

If token acquisition fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_client_credentials_token(self) -> KeycloakTokenType | None:
    """Get token using client credentials.

    Returns:
        Token response

    Raises:
        ValueError: If token acquisition fails
    """
    # Tokens are time-sensitive, don't cache
    try:
        return await self.openid_adapter.a_token(grant_type="client_credentials")
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_client_credentials_token")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.search_users async

search_users(
    query: str, max_results: int = 100
) -> list[KeycloakUserType] | None

Search for users by username, email, or name.

Parameters:

Name Type Description Default
query str

Search query

required
max_results int

Maximum number of results to return

100

Returns:

Type Description
list[KeycloakUserType] | None

List of matching users

Raises:

Type Description
ValueError

If search fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=30, maxsize=50)  # Cache for 30 seconds with limited entries
async def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType] | None:
    """Search for users by username, email, or name.

    Args:
        query: Search query
        max_results: Maximum number of results to return

    Returns:
        List of matching users

    Raises:
        ValueError: If search fails
    """
    try:
        # Try searching by different fields
        users = []

        # Search by username
        users.extend(await self.admin_adapter.a_get_users({"username": query, "max": max_results}))

        # Search by email if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            email_users = await self.admin_adapter.a_get_users({"email": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in email_users if user["id"] not in user_ids])

        # Search by firstName if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            first_name_users = await self.admin_adapter.a_get_users({"firstName": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in first_name_users if user["id"] not in user_ids])

        # Search by lastName if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            last_name_users = await self.admin_adapter.a_get_users({"lastName": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in last_name_users if user["id"] not in user_ids])

        return users[:max_results]
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "search_users")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_client_secret async

get_client_secret(client_id: str) -> str | None

Get client secret.

Parameters:

Name Type Description Default
client_id str

Client ID

required

Returns:

Type Description
str | None

Client secret

Raises:

Type Description
ValueError

If getting secret fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=50)  # Cache for 1 hour
async def get_client_secret(self, client_id: str) -> str | None:
    """Get client secret.

    Args:
        client_id: Client ID

    Returns:
        Client secret

    Raises:
        ValueError: If getting secret fails
    """
    try:
        client = await self.admin_adapter.a_get_client(client_id)
        return client.get("secret", "")
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_client_secret")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_client_id async

get_client_id(client_name: str) -> str | None

Get client ID by client name.

Parameters:

Name Type Description Default
client_name str

Name of the client

required

Returns:

Type Description
str | None

Client ID

Raises:

Type Description
ValueError

If client not found

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=50)  # Cache for 1 hour
async def get_client_id(self, client_name: str) -> str | None:
    """Get client ID by client name.

    Args:
        client_name: Name of the client

    Returns:
        Client ID

    Raises:
        ValueError: If client not found
    """
    try:
        return await self.admin_adapter.a_get_client_id(client_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_client_id")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_realm_roles async

get_realm_roles() -> list[dict[str, Any]] | None

Get all realm roles.

Returns:

Type Description
list[dict[str, Any]] | None

List of realm roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=1)  # Cache for 5 minutes
async def get_realm_roles(self) -> list[dict[str, Any]] | None:
    """Get all realm roles.

    Returns:
        List of realm roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return await self.admin_adapter.a_get_realm_roles()
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_realm_roles")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_realm_role async

get_realm_role(role_name: str) -> dict | None

Get realm role.

Parameters:

Name Type Description Default
role_name str

Role name

required

Returns: A realm role

Raises:

Type Description
ValueError

If getting role fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=1)  # Cache for 5 minutes
async def get_realm_role(self, role_name: str) -> dict | None:
    """Get realm role.

    Args:
        role_name: Role name
    Returns:
        A realm role

    Raises:
        ValueError: If getting role fails
    """
    try:
        return await self.admin_adapter.a_get_realm_role(role_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_realm_role")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.remove_client_role async

remove_client_role(
    user_id: str, client_id: str, role_name: str
) -> None

Remove a client-specific role from a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required
role_name str

Role name to remove

required

Raises:

Type Description
ValueError

If role removal fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Remove a client-specific role from a user.

    Args:
        user_id: User's ID
        client_id: Client ID
        role_name: Role name to remove

    Raises:
        ValueError: If role removal fails
    """
    try:
        client = await self.admin_adapter.a_get_client_id(client_id)
        if client is None:
            raise ValueError("client_id resolved to None")
        # Keycloak admin adapter methods accept these types at runtime
        role = await self.admin_adapter.a_get_client_role(client, role_name)
        await self.admin_adapter.a_delete_client_roles_of_user(user_id, client, [role])

        if hasattr(self.get_client_roles_for_user, "cache_clear"):
            self.get_client_roles_for_user.cache_clear()
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "remove_client_role")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.clear_user_sessions async

clear_user_sessions(user_id: str) -> None

Clear all sessions for a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Raises:

Type Description
ValueError

If clearing sessions fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def clear_user_sessions(self, user_id: str) -> None:
    """Clear all sessions for a user.

    Args:
        user_id: User's ID

    Raises:
        ValueError: If clearing sessions fails
    """
    try:
        await self.admin_adapter.a_user_logout(user_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "clear_user_sessions")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.logout async

logout(refresh_token: str) -> None

Logout user by invalidating their refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token to invalidate

required

Raises:

Type Description
ValueError

If logout fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def logout(self, refresh_token: str) -> None:
    """Logout user by invalidating their refresh token.

    Args:
        refresh_token: Refresh token to invalidate

    Raises:
        ValueError: If logout fails
    """
    try:
        await self.openid_adapter.a_logout(refresh_token)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "logout")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.introspect_token async

introspect_token(token: str) -> dict[str, Any] | None

Introspect token to get detailed information about it.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
dict[str, Any] | None

Token introspection details

Raises:

Type Description
ValueError

If token introspection fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def introspect_token(self, token: str) -> dict[str, Any] | None:
    """Introspect token to get detailed information about it.

    Args:
        token: Access token

    Returns:
        Token introspection details

    Raises:
        ValueError: If token introspection fails
    """
    try:
        return await self.openid_adapter.a_introspect(token)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "introspect_token")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_token_info async

get_token_info(token: str) -> dict[str, Any] | None

Decode token to get its claims.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
dict[str, Any] | None

Dictionary of token claims

Raises:

Type Description
ValueError

If token decoding fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_token_info(self, token: str) -> dict[str, Any] | None:
    """Decode token to get its claims.

    Args:
        token: Access token

    Returns:
        Dictionary of token claims

    Raises:
        ValueError: If token decoding fails
    """
    try:
        return await self.openid_adapter.a_decode_token(
            token,
            key=await self.get_public_key(),
        )
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_token_info")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.delete_user async

delete_user(user_id: str) -> None

Delete a user from Keycloak by their ID.

Parameters:

Name Type Description Default
user_id str

The ID of the user to delete

required

Raises:

Type Description
ValueError

If the deletion fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def delete_user(self, user_id: str) -> None:
    """Delete a user from Keycloak by their ID.

    Args:
        user_id: The ID of the user to delete

    Raises:
        ValueError: If the deletion fails
    """
    try:
        await self.admin_adapter.a_delete_user(user_id=user_id)

        if hasattr(self.get_user_by_username, "cache_clear"):
            self.get_user_by_username.cache_clear()

        logger.info(f"Successfully deleted user with ID {user_id}")

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "delete_user")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.has_role async

has_role(token: str, role_name: str) -> bool

Check if a user has a specific role.

Parameters:

Name Type Description Default
token str

Access token

required
role_name str

Role name to check

required

Returns:

Type Description
bool

True if user has the role, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def has_role(self, token: str, role_name: str) -> bool:
    """Check if a user has a specific role.

    Args:
        token: Access token
        role_name: Role name to check

    Returns:
        True if user has the role, False otherwise
    """
    # Not caching this result as token validation is time-sensitive
    try:
        user_info = await self.get_userinfo(token)
        if not user_info:
            return False

        # Check realm roles
        realm_access = user_info.get("realm_access", {})
        roles = realm_access.get("roles", [])
        if role_name in roles:
            return True

        # Check roles for the configured client
        resource_access = user_info.get("resource_access", {})
        client_roles = resource_access.get(self.configs.CLIENT_ID, {}).get("roles", [])
        if role_name in client_roles:
            return True

    except Exception as e:
        logger.debug(f"Role check failed: {e!s}")
        return False
    else:
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.has_any_of_roles async

has_any_of_roles(
    token: str, role_names: frozenset[str]
) -> bool

Check if a user has any of the specified roles.

Parameters:

Name Type Description Default
token str

Access token

required
role_names frozenset[str]

Set of role names to check

required

Returns:

Type Description
bool

True if user has any of the roles, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def has_any_of_roles(self, token: str, role_names: frozenset[str]) -> bool:
    """Check if a user has any of the specified roles.

    Args:
        token: Access token
        role_names: Set of role names to check

    Returns:
        True if user has any of the roles, False otherwise
    """
    try:
        user_info = await self.get_userinfo(token)
        if not user_info:
            return False

        # Check realm roles first
        realm_access = user_info.get("realm_access", {})
        realm_roles = set(realm_access.get("roles", []))
        if role_names.intersection(realm_roles):
            return True

        # Check roles for the configured client
        resource_access = user_info.get("resource_access", {})
        client_roles = set(resource_access.get(self.configs.CLIENT_ID, {}).get("roles", []))
        if role_names.intersection(client_roles):
            return True

    except Exception as e:
        logger.debug(f"Role check failed: {e!s}")
        return False
    else:
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.has_all_roles async

has_all_roles(
    token: str, role_names: frozenset[str]
) -> bool

Check if a user has all the specified roles.

Parameters:

Name Type Description Default
token str

Access token

required
role_names frozenset[str]

Set of role names to check

required

Returns:

Type Description
bool

True if user has all the roles, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def has_all_roles(self, token: str, role_names: frozenset[str]) -> bool:
    """Check if a user has all the specified roles.

    Args:
        token: Access token
        role_names: Set of role names to check

    Returns:
        True if user has all the roles, False otherwise
    """
    try:
        user_info = await self.get_userinfo(token)
        if not user_info:
            return False

        # Get all user roles
        all_roles = set()

        # Add realm roles
        realm_access = user_info.get("realm_access", {})
        all_roles.update(realm_access.get("roles", []))

        # Add roles from the configured client
        resource_access = user_info.get("resource_access", {})
        client_roles = resource_access.get(self.configs.CLIENT_ID, {}).get("roles", [])
        all_roles.update(client_roles)

        # Check if all required roles are present
        return role_names.issubset(all_roles)

    except Exception as e:
        logger.debug(f"All roles check failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.check_permissions async

check_permissions(
    token: str, resource: str, scope: str
) -> bool

Check if a user has permission to access a resource with the specified scope.

Parameters:

Name Type Description Default
token str

Access token

required
resource str

Resource name

required
scope str

Permission scope

required

Returns:

Type Description
bool

True if permission granted, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def check_permissions(self, token: str, resource: str, scope: str) -> bool:
    """Check if a user has permission to access a resource with the specified scope.

    Args:
        token: Access token
        resource: Resource name
        scope: Permission scope

    Returns:
        True if permission granted, False otherwise
    """
    try:
        # Use UMA permissions endpoint to check specific resource and scope
        permissions = await self.openid_adapter.a_uma_permissions(token, permissions=f"{resource}#{scope}")

        # Check if the response indicates permission is granted
        if not permissions or not isinstance(permissions, list):
            logger.debug("No permissions returned or invalid response format")
            return False

        # Look for the specific permission in the response
        for perm in permissions:
            if perm.get("rsname") == resource and scope in perm.get("scopes", []):
                return True

    except KeycloakError as e:
        logger.debug(f"Permission check failed with Keycloak error: {e!s}")
        return False
    except Exception as e:
        logger.debug(f"Permission check failed with unexpected error: {e!s}")
        return False
    else:
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.create_realm async

create_realm(
    realm_name: str, skip_exists: bool = True, **kwargs: Any
) -> dict[str, Any] | None

Create a Keycloak realm with minimum required fields and optional additional config.

Parameters:

Name Type Description Default
realm_name str

The realm identifier (required)

required
skip_exists bool

Skip creation if realm already exists

True
kwargs Any

Additional optional configurations for the realm

{}

Returns:

Type Description
dict[str, Any] | None

Dictionary with realm information and status

Raises:

Type Description
InternalError

If realm creation fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def create_realm(self, realm_name: str, skip_exists: bool = True, **kwargs: Any) -> dict[str, Any] | None:
    """Create a Keycloak realm with minimum required fields and optional additional config.

    Args:
        realm_name: The realm identifier (required)
        skip_exists: Skip creation if realm already exists
        kwargs: Additional optional configurations for the realm

    Returns:
        Dictionary with realm information and status

    Raises:
        InternalError: If realm creation fails
    """
    payload = {
        "realm": realm_name,
        "enabled": kwargs.get("enabled", True),
        "displayName": kwargs.get("display_name", realm_name),
    }

    # Add any additional parameters from kwargs
    for key, value in kwargs.items():
        # Skip display_name as it's already handled
        if key == "display_name":
            continue

        # Convert Python snake_case to Keycloak camelCase
        camel_key = StringUtils.snake_to_camel_case(key)
        payload[camel_key] = value

    try:
        await self.admin_adapter.a_create_realm(payload=payload, skip_exists=skip_exists)
    except KeycloakError as e:
        logger.debug(f"Failed to create realm: {e!s}")

        # Handle realm already exists with skip_exists option
        if skip_exists:
            error_message = self._extract_error_message(e).lower()
            if "already exists" in error_message and "realm" in error_message:
                return {"realm": realm_name, "status": "already_exists", "config": payload}

        # Use the mixin to handle realm-specific errors
        self._handle_realm_exception(e, "create_realm", realm_name)
    else:
        return {"realm": realm_name, "status": "created", "config": payload}

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_realm async

get_realm(realm_name: str) -> dict[str, Any] | None

Get realm details by realm name.

Parameters:

Name Type Description Default
realm_name str

Name of the realm

required

Returns:

Type Description
dict[str, Any] | None

Realm details

Raises:

Type Description
InternalError

If getting realm fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_realm(self, realm_name: str) -> dict[str, Any] | None:
    """Get realm details by realm name.

    Args:
        realm_name: Name of the realm

    Returns:
        Realm details

    Raises:
        InternalError: If getting realm fails
    """
    try:
        return await self.admin_adapter.a_get_realm(realm_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_realm")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.update_realm async

update_realm(
    realm_name: str, **kwargs: Any
) -> dict[str, Any] | None

Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled).

Parameters:

Name Type Description Default
realm_name str

Realm name (not the realm id).

required
**kwargs Any

RealmRepresentation attributes to update (e.g. displayName, organizationsEnabled).

{}

Returns:

Type Description
dict[str, Any] | None

Response from Keycloak, or None on error (handled via exception).

Source code in archipy/adapters/keycloak/adapters.py
@override
async def update_realm(self, realm_name: str, **kwargs: Any) -> dict[str, Any] | None:
    """Update a realm. Kwargs are RealmRepresentation top-level attributes (e.g. displayName, organizationsEnabled).

    Args:
        realm_name: Realm name (not the realm id).
        **kwargs: RealmRepresentation attributes to update (e.g. displayName, organizationsEnabled).

    Returns:
        Response from Keycloak, or None on error (handled via exception).
    """
    try:
        return await self.admin_adapter.a_update_realm(realm_name, dict(kwargs))
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "update_realm")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.create_client async

create_client(
    client_id: str,
    realm: str | None = None,
    skip_exists: bool = True,
    **kwargs: Any,
) -> dict[str, Any] | None

Create a Keycloak client with minimum required fields and optional additional config.

Parameters:

Name Type Description Default
client_id str

The client identifier (required)

required
realm str | None

Target realm name (uses the current realm in KeycloakAdmin if not specified)

None
skip_exists bool

Skip creation if client already exists

True
kwargs Any

Additional optional configurations for the client

{}

Returns:

Type Description
dict[str, Any] | None

Dictionary with client information

Raises:

Type Description
InternalError

If client creation fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def create_client(
    self,
    client_id: str,
    realm: str | None = None,
    skip_exists: bool = True,
    **kwargs: Any,
) -> dict[str, Any] | None:
    """Create a Keycloak client with minimum required fields and optional additional config.

    Args:
        client_id: The client identifier (required)
        realm: Target realm name (uses the current realm in KeycloakAdmin if not specified)
        skip_exists: Skip creation if client already exists
        kwargs: Additional optional configurations for the client

    Returns:
        Dictionary with client information

    Raises:
        InternalError: If client creation fails
    """
    original_realm = self.admin_adapter.connection.realm_name

    try:
        # Set the target realm if provided
        if realm and realm != original_realm:
            self.admin_adapter.connection.realm_name = realm

        public_client = kwargs.get("public_client", False)

        # Prepare the minimal client payload
        payload = {
            "clientId": client_id,
            "enabled": kwargs.get("enabled", True),
            "protocol": kwargs.get("protocol", "openid-connect"),
            "name": kwargs.get("name", client_id),
            "publicClient": public_client,
        }

        # Enable service accounts for confidential clients by default
        if not public_client:
            payload["serviceAccountsEnabled"] = kwargs.get("service_account_enabled", True)
            payload["clientAuthenticatorType"] = "client-secret"

        for key, value in kwargs.items():
            if key in ["enabled", "protocol", "name", "public_client", "service_account_enabled"]:
                continue

            # Convert snake_case to camelCase
            camel_key = StringUtils.snake_to_camel_case(key)
            payload[camel_key] = value

        internal_client_id = None
        try:
            internal_client_id = await self.admin_adapter.a_create_client(payload, skip_exists=skip_exists)
        except KeycloakError as e:
            logger.debug(f"Failed to create client: {e!s}")

            # Handle client already exists with skip_exists option
            if skip_exists:
                error_message = self._extract_error_message(e).lower()
                if "already exists" in error_message and "client" in error_message:
                    return {
                        "client_id": client_id,
                        "status": "already_exists",
                        "realm": self.admin_adapter.connection.realm_name,
                    }

            # Use the mixin to handle client-specific errors
            client_data = {"clientId": client_id, "name": kwargs.get("name", client_id)}
            self._handle_client_exception(e, "create_client", client_data)

        return {
            "client_id": client_id,
            "internal_client_id": internal_client_id,
            "realm": self.admin_adapter.connection.realm_name,
            "status": "created",
        }

    finally:
        # Always restore the original realm
        if realm and realm != original_realm:
            self.admin_adapter.connection.realm_name = original_realm

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.add_realm_roles_to_composite async

add_realm_roles_to_composite(
    composite_role_name: str, child_role_names: list[str]
) -> None

Add realm roles to a composite role.

Parameters:

Name Type Description Default
composite_role_name str

Name of the composite role

required
child_role_names list[str]

List of child role names to add

required
Source code in archipy/adapters/keycloak/adapters.py
@override
async def add_realm_roles_to_composite(self, composite_role_name: str, child_role_names: list[str]) -> None:
    """Add realm roles to a composite role.

    Args:
        composite_role_name: Name of the composite role
        child_role_names: List of child role names to add
    """
    try:
        child_roles = []
        for role_name in child_role_names:
            try:
                role = await self.admin_adapter.a_get_realm_role(role_name)
                child_roles.append(role)
            except KeycloakGetError as e:
                if e.response_code == 404:
                    logger.warning(f"Child role not found: {role_name}")
                    continue
                raise

        if child_roles:
            await self.admin_adapter.a_add_composite_realm_roles_to_role(
                role_name=composite_role_name,
                roles=child_roles,
            )
            logger.info(f"Added {len(child_roles)} realm roles to composite role: {composite_role_name}")

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "add_realm_roles_to_composite")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.add_client_roles_to_composite async

add_client_roles_to_composite(
    composite_role_name: str,
    client_id: str,
    child_role_names: list[str],
) -> None

Add client roles to a composite role.

Parameters:

Name Type Description Default
composite_role_name str

Name of the composite role

required
client_id str

Client ID or client name

required
child_role_names list[str]

List of child role names to add

required
Source code in archipy/adapters/keycloak/adapters.py
@override
async def add_client_roles_to_composite(
    self,
    composite_role_name: str,
    client_id: str,
    child_role_names: list[str],
) -> None:
    """Add client roles to a composite role.

    Args:
        composite_role_name: Name of the composite role
        client_id: Client ID or client name
        child_role_names: List of child role names to add
    """
    try:
        internal_client_id = await self.admin_adapter.a_get_client_id(client_id)
        if internal_client_id is None:
            raise ValueError("client_id resolved to None")

        child_roles = []
        for role_name in child_role_names:
            try:
                # Keycloak admin adapter methods accept these types at runtime
                role = await self.admin_adapter.a_get_client_role(internal_client_id, role_name)
                child_roles.append(role)
            except KeycloakGetError as e:
                if e.response_code == 404:
                    logger.warning(f"Client role not found: {role_name}")
                    continue
                raise

        if child_roles:
            if internal_client_id is None:
                raise ValueError("Client ID not found")
            resolved_client_id: str = internal_client_id
            await self.admin_adapter.a_add_composite_client_roles_to_role(
                role_name=composite_role_name,
                client_role_id=resolved_client_id,
                roles=child_roles,
            )
            logger.info(f"Added {len(child_roles)} client roles to composite role: {composite_role_name}")

    except KeycloakError as e:
        self._handle_keycloak_exception(e, "add_client_roles_to_composite")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_composite_realm_roles async

get_composite_realm_roles(
    role_name: str,
) -> list[dict[str, Any]] | None

Get composite roles for a realm role.

Parameters:

Name Type Description Default
role_name str

Name of the role

required

Returns:

Type Description
list[dict[str, Any]] | None

List of composite roles

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_composite_realm_roles(self, role_name: str) -> list[dict[str, Any]] | None:
    """Get composite roles for a realm role.

    Args:
        role_name: Name of the role

    Returns:
        List of composite roles
    """
    try:
        return await self.admin_adapter.a_get_composite_realm_roles_of_role(role_name)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_composite_realm_roles")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_organizations async

get_organizations(
    query: dict | None = None,
) -> list[dict[str, Any]]

Fetch all organizations, optionally filtered by query parameters.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_organizations(self, query: dict | None = None) -> list[dict[str, Any]]:
    """Fetch all organizations, optionally filtered by query parameters."""
    try:
        return await self.admin_adapter.a_get_organizations(query=query)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_organizations")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_organization async

get_organization(organization_id: str) -> dict[str, Any]

Get representation of the organization by ID.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_organization(self, organization_id: str) -> dict[str, Any]:
    """Get representation of the organization by ID."""
    try:
        return await self.admin_adapter.a_get_organization(organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_organization")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.create_organization async

create_organization(
    name: str, alias: str, **kwargs: Any
) -> str | None

Create a new organization. Name and alias must be unique. Returns org_id.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def create_organization(self, name: str, alias: str, **kwargs: Any) -> str | None:
    """Create a new organization. Name and alias must be unique. Returns org_id."""
    try:
        payload = {"name": name, "alias": alias}
        for key, value in kwargs.items():
            if key in ["name", "alias"]:
                continue

            # Convert snake_case to camelCase
            camel_key = StringUtils.snake_to_camel_case(key)
            payload[camel_key] = value

        return await self.admin_adapter.a_create_organization(payload=payload)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "create_organization")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.update_organization async

update_organization(
    organization_id: str, **kwargs: Any
) -> dict[str, Any]

Update an existing organization. Kwargs are organization attributes (e.g. name, alias).

Source code in archipy/adapters/keycloak/adapters.py
@override
async def update_organization(self, organization_id: str, **kwargs: Any) -> dict[str, Any]:
    """Update an existing organization. Kwargs are organization attributes (e.g. name, alias)."""
    try:
        payload = {}
        for key, value in kwargs.items():
            # Convert snake_case to camelCase
            camel_key = StringUtils.snake_to_camel_case(key)
            payload[camel_key] = value

        return await self.admin_adapter.a_update_organization(organization_id=organization_id, payload=payload)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "update_organization")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.delete_organization async

delete_organization(organization_id: str) -> dict[str, Any]

Delete an organization.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def delete_organization(self, organization_id: str) -> dict[str, Any]:
    """Delete an organization."""
    try:
        return await self.admin_adapter.a_delete_organization(organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "delete_organization")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_organizations async

get_user_organizations(
    user_id: str,
) -> list[dict[str, Any]]

Get organizations by user id.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_user_organizations(self, user_id: str) -> list[dict[str, Any]]:
    """Get organizations by user id."""
    try:
        return await self.admin_adapter.a_get_user_organizations(user_id=user_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_user_organizations")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_organization_members async

get_organization_members(
    organization_id: str, query: dict | None = None
) -> list[dict[str, Any]]

Get members by organization id, optionally filtered by query parameters.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_organization_members(self, organization_id: str, query: dict | None = None) -> list[dict[str, Any]]:
    """Get members by organization id, optionally filtered by query parameters."""
    try:
        return await self.admin_adapter.a_get_organization_members(organization_id=organization_id, query=query)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_organization_members")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_organization_members_count async

get_organization_members_count(organization_id: str) -> int

Get the number of members in the organization.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_organization_members_count(self, organization_id: str) -> int:
    """Get the number of members in the organization."""
    try:
        return await self.admin_adapter.a_get_organization_members_count(organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "get_organization_members_count")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.organization_user_add async

organization_user_add(
    user_id: str, organization_id: str
) -> bytes

Add a user to an organization.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def organization_user_add(self, user_id: str, organization_id: str) -> bytes:
    """Add a user to an organization."""
    try:
        return await self.admin_adapter.a_organization_user_add(user_id=user_id, organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "organization_user_add")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.organization_user_remove async

organization_user_remove(
    user_id: str, organization_id: str
) -> dict[str, Any]

Remove a user from an organization.

Source code in archipy/adapters/keycloak/adapters.py
@override
async def organization_user_remove(self, user_id: str, organization_id: str) -> dict[str, Any]:
    """Remove a user from an organization."""
    try:
        return await self.admin_adapter.a_organization_user_remove(user_id=user_id, organization_id=organization_id)
    except KeycloakError as e:
        self._handle_keycloak_exception(e, "organization_user_remove")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_organization_idps abstractmethod async

get_organization_idps(
    organization_id: str,
) -> list[dict[str, Any]]

Get IDPs by organization id.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_organization_idps(self, organization_id: str) -> list[dict[str, Any]]:
    """Get IDPs by organization id."""
    raise NotImplementedError

options: show_root_toc_entry: false heading_level: 3