Keycloak Adapter Tutorial¶
The Keycloak adapter provides an interface for interacting with Keycloak's API to manage authentication and authorization. ArchiPy offers both synchronous and asynchronous implementations.
Installation¶
For full API reference, see the Keycloak Adapters API Documentation.
Configuration¶
Configure the Keycloak adapter via environment variables or a KeycloakConfig object.
Environment Variables¶
KEYCLOAK__SERVER_URL=https://keycloak.example.com
KEYCLOAK__REALM_NAME=my-realm
KEYCLOAK__CLIENT_ID=my-client
KEYCLOAK__CLIENT_SECRET_KEY=client-secret
KEYCLOAK__VERIFY_SSL=true
KEYCLOAK__TIMEOUT=10
Direct Configuration¶
from archipy.configs.config_template import KeycloakConfig
config = KeycloakConfig(
SERVER_URL="https://keycloak.example.com",
REALM_NAME="my-realm",
CLIENT_ID="my-client",
CLIENT_SECRET_KEY="client-secret", # noqa: S105
VERIFY_SSL=True,
TIMEOUT=10,
)
Basic Usage¶
The Keycloak adapter provides both synchronous and asynchronous implementations. Use the synchronous adapter for
standard request-response flows and the asynchronous adapter with async/await patterns.
Synchronous Adapter¶
The synchronous adapter provides a blocking API for Keycloak operations.
import logging
from archipy.adapters.keycloak.adapters import KeycloakAdapter
from archipy.configs.config_template import KeycloakConfig
from archipy.models.errors import (
InternalError,
InvalidCredentialsError,
NotFoundError,
PermissionDeniedError,
UnauthenticatedError,
)
# Configure logging
logger = logging.getLogger(__name__)
# Using global configuration
keycloak = KeycloakAdapter()
# Or with custom configuration
custom_config = KeycloakConfig(
SERVER_URL="https://keycloak.example.com",
REALM_NAME="another-realm",
CLIENT_ID="another-client",
CLIENT_SECRET_KEY="client-secret", # noqa: S105
)
keycloak = KeycloakAdapter(custom_config)
# Authentication
try:
# Get token with username/password
token = keycloak.get_token("username", "password")
except InvalidCredentialsError as e:
logger.error(f"Authentication failed: {e}")
raise
else:
access_token = token["access_token"]
refresh_token = token["refresh_token"]
logger.info("Authentication successful")
try:
# Refresh an existing token
new_token = keycloak.refresh_token(refresh_token)
except (UnauthenticatedError, InvalidCredentialsError) as e:
logger.error(f"Token refresh failed: {e}")
raise
else:
logger.info("Token refreshed successfully")
try:
# Validate a token
is_valid = keycloak.validate_token(access_token)
except UnauthenticatedError as e:
logger.error(f"Token validation failed: {e}")
raise
else:
logger.info(f"Token valid: {is_valid}")
try:
# Get user info from token
user_info = keycloak.get_userinfo(access_token)
except (UnauthenticatedError, InternalError) as e:
logger.error(f"Failed to get user info: {e}")
raise
else:
logger.info(f"User info retrieved: {user_info.get('preferred_username')}")
try:
# Get token using client credentials
client_token = keycloak.get_client_credentials_token()
except InvalidCredentialsError as e:
logger.error(f"Client credentials authentication failed: {e}")
raise
else:
logger.info("Client token obtained")
try:
# Logout (invalidate refresh token)
keycloak.logout(refresh_token)
except InternalError as e:
logger.error(f"Logout failed: {e}")
raise
else:
logger.info("Logout successful")
# User operations (requires admin privileges)
try:
# Get user by ID
user = keycloak.get_user_by_id("user-uuid")
except NotFoundError as e:
logger.error(f"User not found: {e}")
raise
except PermissionDeniedError as e:
logger.error(f"Permission denied: {e}")
raise
else:
logger.info(f"User retrieved: {user.get('username')}")
try:
# Get user by username
user = keycloak.get_user_by_username("johndoe")
except NotFoundError as e:
logger.error(f"User not found: {e}")
raise
else:
logger.info(f"User retrieved by username: {user.get('username')}")
try:
# Get user by email
user = keycloak.get_user_by_email("john@example.com")
except NotFoundError as e:
logger.error(f"User not found by email: {e}")
raise
else:
logger.info(f"User retrieved by email: {user.get('username')}")
try:
# Create a new user
user_data = {
"username": "newuser",
"email": "newuser@example.com",
"enabled": True,
"firstName": "New",
"lastName": "User",
"credentials": [{
"type": "password",
"value": "initial-password",
"temporary": True
}]
}
user_id = keycloak.create_user(user_data)
except PermissionDeniedError as e:
logger.error(f"Permission denied to create user: {e}")
raise
except InternalError as e:
logger.error(f"Failed to create user: {e}")
raise
else:
logger.info(f"User created with ID: {user_id}")
try:
# Update a user
update_data = {"firstName": "Updated", "email": "updated@example.com"}
keycloak.update_user(user_id, update_data)
except (NotFoundError, PermissionDeniedError, InternalError) as e:
logger.error(f"Failed to update user: {e}")
raise
else:
logger.info(f"User updated: {user_id}")
try:
# Reset password
keycloak.reset_password(user_id, "new-password", temporary=True)
except (NotFoundError, PermissionDeniedError, InternalError) as e:
logger.error(f"Failed to reset password: {e}")
raise
else:
logger.info(f"Password reset for user: {user_id}")
try:
# Search for users
users = keycloak.search_users("john", max_results=10)
except InternalError as e:
logger.error(f"Failed to search users: {e}")
raise
else:
logger.info(f"Found {len(users)} users")
try:
# Clear all user sessions
keycloak.clear_user_sessions(user_id)
except (NotFoundError, InternalError) as e:
logger.error(f"Failed to clear sessions: {e}")
raise
else:
logger.info(f"Sessions cleared for user: {user_id}")
try:
# Delete a user
keycloak.delete_user(user_id)
except (NotFoundError, PermissionDeniedError, InternalError) as e:
logger.error(f"Failed to delete user: {e}")
raise
else:
logger.info(f"User deleted: {user_id}")
# Role operations
try:
# Get user roles
roles = keycloak.get_user_roles(user_id)
except (NotFoundError, PermissionDeniedError, InternalError) as e:
logger.error(f"Failed to get user roles: {e}")
raise
else:
logger.info(f"User has {len(roles)} roles")
try:
# Get client roles for user
client_roles = keycloak.get_client_roles_for_user(user_id, "client-id")
# Check if user has role
has_role = keycloak.has_role(access_token, "admin")
# Check if user has any of the specified roles
has_any = keycloak.has_any_of_roles(access_token, {"admin", "manager"})
# Check if user has all specified roles
has_all = keycloak.has_all_roles(access_token, {"user", "viewer"})
# Assign realm role
keycloak.assign_realm_role(user_id, "admin")
# Remove realm role
keycloak.remove_realm_role(user_id, "admin")
# Assign client role
keycloak.assign_client_role(user_id, "client-id", "client-admin")
# Remove client role
keycloak.remove_client_role(user_id, "client-id", "client-admin")
# Get realm roles
all_roles = keycloak.get_realm_roles()
# Get a specific realm role
role = keycloak.get_realm_role("admin")
# Create a realm role
new_role = keycloak.create_realm_role("new-role", "A new role description")
# Delete a realm role
keycloak.delete_realm_role("new-role")
except (NotFoundError, PermissionDeniedError, InternalError) as e:
logger.error(f"Keycloak role operation failed: {e}")
raise
# Client operations
try:
# Get client ID from name
client_id = keycloak.get_client_id("client-name")
# Get client secret
secret = keycloak.get_client_secret(client_id)
# Get service account ID
service_account_id = keycloak.get_service_account_id()
except (NotFoundError, InternalError) as e:
logger.error(f"Keycloak client operation failed: {e}")
raise
# System operations
try:
# Get public key for token verification
public_key = keycloak.get_public_key()
# Get well-known OpenID configuration
config = keycloak.get_well_known_config()
# Get JWT certificates
certs = keycloak.get_certs()
except InternalError as e:
logger.error(f"Keycloak system operation failed: {e}")
raise
# Authorization
try:
# Exchange authorization code for token
token = keycloak.get_token_from_code("auth-code", "https://my-app.example.com/callback")
# Check permissions
has_permission = keycloak.check_permissions(access_token, "resource-name", "view")
except (UnauthenticatedError, PermissionDeniedError, InternalError) as e:
logger.error(f"Keycloak authorization operation failed: {e}")
raise
Asynchronous Adapter¶
The asynchronous adapter provides a non-blocking API using async/await syntax:
import asyncio
import logging
from archipy.adapters.keycloak.adapters import AsyncKeycloakAdapter
from archipy.models.errors import (
InternalError,
InvalidCredentialsError,
NotFoundError,
PermissionDeniedError,
UnauthenticatedError,
)
logger = logging.getLogger(__name__)
async def main() -> None:
# Initialize with global config
keycloak = AsyncKeycloakAdapter()
try:
# Get token
token = await keycloak.get_token("username", "password")
access_token = token["access_token"]
except InvalidCredentialsError as e:
logger.error(f"Authentication failed: {e}")
raise
else:
logger.info("Authentication successful")
try:
# Get user info
user_info = await keycloak.get_userinfo(access_token)
logger.info(f"Logged in as: {user_info.get('preferred_username')}")
# Check if user has role
if await keycloak.has_role(access_token, "admin"):
logger.info("User has admin role")
except (UnauthenticatedError, InternalError) as e:
logger.error(f"Failed to get user info or check role: {e}")
raise
try:
# Search for users
users = await keycloak.search_users("john")
logger.info(f"Found {len(users)} users matching 'john'")
# Create a new user
user_data = {
"username": "async_user",
"email": "async@example.com",
"enabled": True,
}
user_id = await keycloak.create_user(user_data)
logger.info(f"Created user with ID: {user_id}")
# Delete the user
await keycloak.delete_user(user_id)
logger.info(f"Deleted user: {user_id}")
except (NotFoundError, PermissionDeniedError, InternalError) as e:
logger.error(f"User management operation failed: {e}")
raise
# Run the async function
asyncio.run(main())
Caching¶
Both adapters use TTL (Time-To-Live) caching for appropriate operations to improve performance. Cache durations are configured for each method based on how frequently the data typically changes:
- Public keys and certificate information: 1 hour
- User information from tokens: 30 seconds
- User details and role information: 5 minutes
You can clear all caches if needed:
from archipy.adapters.keycloak.adapters import AsyncKeycloakAdapter, KeycloakAdapter
# Sync adapter
keycloak = KeycloakAdapter()
keycloak.clear_all_caches()
# Async adapter
async_keycloak = AsyncKeycloakAdapter()
async_keycloak.clear_all_caches()
Security Considerations¶
- Token validation is performed without caching to ensure security.
- The adapter automatically refreshes admin tokens before they expire.
- Write operations (like user creation/updates) automatically clear relevant caches.
- For production use, prefer the authorization code flow over direct username/password authentication.
See Also¶
- Error Handling — Exception handling patterns with proper chaining
- Configuration Management — Keycloak configuration setup
- BDD Testing — Testing Keycloak operations
- Keycloak Utils — Authentication utilities with Keycloak
- API Reference — Full Keycloak adapter API documentation