Skip to content

Keycloak Adapter Tutorial

The Keycloak adapter provides an interface for interacting with Keycloak's API to manage authentication and authorization. ArchiPy offers both synchronous and asynchronous implementations.

Installation

uv add "archipy[keycloak]"

For full API reference, see the Keycloak Adapters API Documentation.

Configuration

Configure the Keycloak adapter via environment variables or a KeycloakConfig object.

Environment Variables

KEYCLOAK__SERVER_URL=https://keycloak.example.com
KEYCLOAK__REALM_NAME=my-realm
KEYCLOAK__CLIENT_ID=my-client
KEYCLOAK__CLIENT_SECRET_KEY=client-secret
KEYCLOAK__VERIFY_SSL=true
KEYCLOAK__TIMEOUT=10

Direct Configuration

from archipy.configs.config_template import KeycloakConfig

config = KeycloakConfig(
    SERVER_URL="https://keycloak.example.com",
    REALM_NAME="my-realm",
    CLIENT_ID="my-client",
    CLIENT_SECRET_KEY="client-secret",  # noqa: S105
    VERIFY_SSL=True,
    TIMEOUT=10,
)

Basic Usage

The Keycloak adapter provides both synchronous and asynchronous implementations. Use the synchronous adapter for standard request-response flows and the asynchronous adapter with async/await patterns.

Synchronous Adapter

The synchronous adapter provides a blocking API for Keycloak operations.

import logging

from archipy.adapters.keycloak.adapters import KeycloakAdapter
from archipy.configs.config_template import KeycloakConfig
from archipy.models.errors import (
    InternalError,
    InvalidCredentialsError,
    NotFoundError,
    PermissionDeniedError,
    UnauthenticatedError,
)

# Configure logging
logger = logging.getLogger(__name__)

# Using global configuration
keycloak = KeycloakAdapter()

# Or with custom configuration
custom_config = KeycloakConfig(
    SERVER_URL="https://keycloak.example.com",
    REALM_NAME="another-realm",
    CLIENT_ID="another-client",
    CLIENT_SECRET_KEY="client-secret",  # noqa: S105
)
keycloak = KeycloakAdapter(custom_config)

# Authentication
try:
    # Get token with username/password
    token = keycloak.get_token("username", "password")
except InvalidCredentialsError as e:
    logger.error(f"Authentication failed: {e}")
    raise
else:
    access_token = token["access_token"]
    refresh_token = token["refresh_token"]
    logger.info("Authentication successful")

try:
    # Refresh an existing token
    new_token = keycloak.refresh_token(refresh_token)
except (UnauthenticatedError, InvalidCredentialsError) as e:
    logger.error(f"Token refresh failed: {e}")
    raise
else:
    logger.info("Token refreshed successfully")

try:
    # Validate a token
    is_valid = keycloak.validate_token(access_token)
except UnauthenticatedError as e:
    logger.error(f"Token validation failed: {e}")
    raise
else:
    logger.info(f"Token valid: {is_valid}")

try:
    # Get user info from token
    user_info = keycloak.get_userinfo(access_token)
except (UnauthenticatedError, InternalError) as e:
    logger.error(f"Failed to get user info: {e}")
    raise
else:
    logger.info(f"User info retrieved: {user_info.get('preferred_username')}")

try:
    # Get token using client credentials
    client_token = keycloak.get_client_credentials_token()
except InvalidCredentialsError as e:
    logger.error(f"Client credentials authentication failed: {e}")
    raise
else:
    logger.info("Client token obtained")

try:
    # Logout (invalidate refresh token)
    keycloak.logout(refresh_token)
except InternalError as e:
    logger.error(f"Logout failed: {e}")
    raise
else:
    logger.info("Logout successful")

# User operations (requires admin privileges)
try:
    # Get user by ID
    user = keycloak.get_user_by_id("user-uuid")
except NotFoundError as e:
    logger.error(f"User not found: {e}")
    raise
except PermissionDeniedError as e:
    logger.error(f"Permission denied: {e}")
    raise
else:
    logger.info(f"User retrieved: {user.get('username')}")

try:
    # Get user by username
    user = keycloak.get_user_by_username("johndoe")
except NotFoundError as e:
    logger.error(f"User not found: {e}")
    raise
else:
    logger.info(f"User retrieved by username: {user.get('username')}")

try:
    # Get user by email
    user = keycloak.get_user_by_email("john@example.com")
except NotFoundError as e:
    logger.error(f"User not found by email: {e}")
    raise
else:
    logger.info(f"User retrieved by email: {user.get('username')}")

try:
    # Create a new user
    user_data = {
        "username": "newuser",
        "email": "newuser@example.com",
        "enabled": True,
        "firstName": "New",
        "lastName": "User",
        "credentials": [{
            "type": "password",
            "value": "initial-password",
            "temporary": True
        }]
    }
    user_id = keycloak.create_user(user_data)
except PermissionDeniedError as e:
    logger.error(f"Permission denied to create user: {e}")
    raise
except InternalError as e:
    logger.error(f"Failed to create user: {e}")
    raise
else:
    logger.info(f"User created with ID: {user_id}")

try:
    # Update a user
    update_data = {"firstName": "Updated", "email": "updated@example.com"}
    keycloak.update_user(user_id, update_data)
except (NotFoundError, PermissionDeniedError, InternalError) as e:
    logger.error(f"Failed to update user: {e}")
    raise
else:
    logger.info(f"User updated: {user_id}")

try:
    # Reset password
    keycloak.reset_password(user_id, "new-password", temporary=True)
except (NotFoundError, PermissionDeniedError, InternalError) as e:
    logger.error(f"Failed to reset password: {e}")
    raise
else:
    logger.info(f"Password reset for user: {user_id}")

try:
    # Search for users
    users = keycloak.search_users("john", max_results=10)
except InternalError as e:
    logger.error(f"Failed to search users: {e}")
    raise
else:
    logger.info(f"Found {len(users)} users")

try:
    # Clear all user sessions
    keycloak.clear_user_sessions(user_id)
except (NotFoundError, InternalError) as e:
    logger.error(f"Failed to clear sessions: {e}")
    raise
else:
    logger.info(f"Sessions cleared for user: {user_id}")

try:
    # Delete a user
    keycloak.delete_user(user_id)
except (NotFoundError, PermissionDeniedError, InternalError) as e:
    logger.error(f"Failed to delete user: {e}")
    raise
else:
    logger.info(f"User deleted: {user_id}")

# Role operations
try:
    # Get user roles
    roles = keycloak.get_user_roles(user_id)
except (NotFoundError, PermissionDeniedError, InternalError) as e:
    logger.error(f"Failed to get user roles: {e}")
    raise
else:
    logger.info(f"User has {len(roles)} roles")

try:

    # Get client roles for user
    client_roles = keycloak.get_client_roles_for_user(user_id, "client-id")

    # Check if user has role
    has_role = keycloak.has_role(access_token, "admin")

    # Check if user has any of the specified roles
    has_any = keycloak.has_any_of_roles(access_token, {"admin", "manager"})

    # Check if user has all specified roles
    has_all = keycloak.has_all_roles(access_token, {"user", "viewer"})

    # Assign realm role
    keycloak.assign_realm_role(user_id, "admin")

    # Remove realm role
    keycloak.remove_realm_role(user_id, "admin")

    # Assign client role
    keycloak.assign_client_role(user_id, "client-id", "client-admin")

    # Remove client role
    keycloak.remove_client_role(user_id, "client-id", "client-admin")

    # Get realm roles
    all_roles = keycloak.get_realm_roles()

    # Get a specific realm role
    role = keycloak.get_realm_role("admin")

    # Create a realm role
    new_role = keycloak.create_realm_role("new-role", "A new role description")

    # Delete a realm role
    keycloak.delete_realm_role("new-role")
except (NotFoundError, PermissionDeniedError, InternalError) as e:
    logger.error(f"Keycloak role operation failed: {e}")
    raise

# Client operations
try:
    # Get client ID from name
    client_id = keycloak.get_client_id("client-name")

    # Get client secret
    secret = keycloak.get_client_secret(client_id)

    # Get service account ID
    service_account_id = keycloak.get_service_account_id()
except (NotFoundError, InternalError) as e:
    logger.error(f"Keycloak client operation failed: {e}")
    raise

# System operations
try:
    # Get public key for token verification
    public_key = keycloak.get_public_key()

    # Get well-known OpenID configuration
    config = keycloak.get_well_known_config()

    # Get JWT certificates
    certs = keycloak.get_certs()
except InternalError as e:
    logger.error(f"Keycloak system operation failed: {e}")
    raise

# Authorization
try:
    # Exchange authorization code for token
    token = keycloak.get_token_from_code("auth-code", "https://my-app.example.com/callback")

    # Check permissions
    has_permission = keycloak.check_permissions(access_token, "resource-name", "view")
except (UnauthenticatedError, PermissionDeniedError, InternalError) as e:
    logger.error(f"Keycloak authorization operation failed: {e}")
    raise

Asynchronous Adapter

The asynchronous adapter provides a non-blocking API using async/await syntax:

import asyncio
import logging

from archipy.adapters.keycloak.adapters import AsyncKeycloakAdapter
from archipy.models.errors import (
    InternalError,
    InvalidCredentialsError,
    NotFoundError,
    PermissionDeniedError,
    UnauthenticatedError,
)

logger = logging.getLogger(__name__)


async def main() -> None:
    # Initialize with global config
    keycloak = AsyncKeycloakAdapter()

    try:
        # Get token
        token = await keycloak.get_token("username", "password")
        access_token = token["access_token"]
    except InvalidCredentialsError as e:
        logger.error(f"Authentication failed: {e}")
        raise
    else:
        logger.info("Authentication successful")

    try:
        # Get user info
        user_info = await keycloak.get_userinfo(access_token)
        logger.info(f"Logged in as: {user_info.get('preferred_username')}")

        # Check if user has role
        if await keycloak.has_role(access_token, "admin"):
            logger.info("User has admin role")
    except (UnauthenticatedError, InternalError) as e:
        logger.error(f"Failed to get user info or check role: {e}")
        raise

    try:
        # Search for users
        users = await keycloak.search_users("john")
        logger.info(f"Found {len(users)} users matching 'john'")

        # Create a new user
        user_data = {
            "username": "async_user",
            "email": "async@example.com",
            "enabled": True,
        }
        user_id = await keycloak.create_user(user_data)
        logger.info(f"Created user with ID: {user_id}")

        # Delete the user
        await keycloak.delete_user(user_id)
        logger.info(f"Deleted user: {user_id}")
    except (NotFoundError, PermissionDeniedError, InternalError) as e:
        logger.error(f"User management operation failed: {e}")
        raise


# Run the async function
asyncio.run(main())

Caching

Both adapters use TTL (Time-To-Live) caching for appropriate operations to improve performance. Cache durations are configured for each method based on how frequently the data typically changes:

  • Public keys and certificate information: 1 hour
  • User information from tokens: 30 seconds
  • User details and role information: 5 minutes

You can clear all caches if needed:

from archipy.adapters.keycloak.adapters import AsyncKeycloakAdapter, KeycloakAdapter

# Sync adapter
keycloak = KeycloakAdapter()
keycloak.clear_all_caches()

# Async adapter
async_keycloak = AsyncKeycloakAdapter()
async_keycloak.clear_all_caches()

Security Considerations

  • Token validation is performed without caching to ensure security.
  • The adapter automatically refreshes admin tokens before they expire.
  • Write operations (like user creation/updates) automatically clear relevant caches.
  • For production use, prefer the authorization code flow over direct username/password authentication.

See Also