Temporal Adapter Tutorial¶
This example demonstrates how to use the Temporal adapter for workflow orchestration and activity execution with proper error handling and Python 3.14 type hints.
Installation¶
Configuration¶
Configure the Temporal adapter via environment variables or a TemporalConfig object.
Environment Variables¶
TEMPORAL__HOST=localhost
TEMPORAL__PORT=7233
TEMPORAL__NAMESPACE=default
TEMPORAL__TASK_QUEUE=my-task-queue
TEMPORAL__TLS_CA_CERT=/path/to/ca.crt
TEMPORAL__TLS_CLIENT_CERT=/path/to/client.crt
TEMPORAL__TLS_CLIENT_KEY=/path/to/client.key
TEMPORAL__WORKFLOW_EXECUTION_TIMEOUT=300
TEMPORAL__WORKFLOW_RUN_TIMEOUT=60
TEMPORAL__ACTIVITY_START_TO_CLOSE_TIMEOUT=30
TEMPORAL__RETRY_MAXIMUM_ATTEMPTS=3
TEMPORAL__RETRY_BACKOFF_COEFFICIENT=2.0
TEMPORAL__RETRY_MAXIMUM_INTERVAL=60
Direct Configuration¶
from archipy.configs.config_template import TemporalConfig
config = TemporalConfig(
HOST="localhost",
PORT=7233,
NAMESPACE="default",
TASK_QUEUE="my-task-queue",
WORKFLOW_EXECUTION_TIMEOUT=300,
WORKFLOW_RUN_TIMEOUT=60,
ACTIVITY_START_TO_CLOSE_TIMEOUT=30,
RETRY_MAXIMUM_ATTEMPTS=3,
RETRY_BACKOFF_COEFFICIENT=2.0,
RETRY_MAXIMUM_INTERVAL=60,
)
Basic Usage¶
import asyncio
import logging
import time
from temporalio import activity, workflow
from archipy.adapters.temporal import BaseActivity, BaseWorkflow, TemporalAdapter
from archipy.configs.config_template import TemporalConfig
from archipy.models.errors import ConfigurationError, InternalError
# Configure logging
logger = logging.getLogger(__name__)
# Define a simple workflow
class MyWorkflow(BaseWorkflow[dict, str]):
"""Simple workflow example."""
@workflow.run
async def run(self, workflow_input: dict[str, list[str]]) -> str:
"""Main workflow logic.
Args:
workflow_input: Input data for the workflow
Returns:
Result message from the workflow
"""
self._log_workflow_event("workflow_started", {"input": workflow_input})
# Execute an activity - configuration is automatically applied from TemporalConfig
try:
result = await self._execute_activity_with_retry(
process_data_activity,
workflow_input
# start_to_close_timeout, heartbeat_timeout, retry_policy, task_queue
# are automatically set from TemporalConfig if not provided
)
except Exception as e:
self._log_workflow_event("activity_failed", {"error": str(e)})
raise
else:
self._log_workflow_event("workflow_completed", {"result": result})
return f"Workflow completed: {result}"
# Define a simple activity function
@activity.defn
async def process_data_activity(data: dict[str, list[str]]) -> str:
"""Process data in an activity.
Args:
data: Input data to process
Returns:
Processed result
"""
logger.info(f"Processing {len(data)} items")
time.sleep(1) # Simulate processing
return f"Processed {len(data)} items"
# Execute workflow
async def main() -> None:
"""Execute the workflow and handle cleanup."""
try:
# Execute workflow and wait for result
result = await temporal.execute_workflow(
MyWorkflow,
{"items": ["a", "b", "c"]},
workflow_id="my-workflow-123",
task_queue="my-task-queue"
)
except InternalError as e:
logger.error(f"Workflow execution failed: {e}")
raise
else:
logger.info(f"Workflow result: {result}")
finally:
await temporal.close()
# Run the workflow
asyncio.run(main())
Configuration Override Examples¶
import asyncio
import logging
import random
from datetime import timedelta
from temporalio.common import RetryPolicy
from archipy.models.errors import InternalError
# Configure logging
logger = logging.getLogger(__name__)
class ConfigOverrideWorkflow(BaseWorkflow[dict, str]):
"""Workflow showing how to override default configurations."""
@workflow.run
async def run(self, workflow_input: dict[str, str]) -> str:
"""Workflow with custom timeouts and retry policies."""
try:
# Override activity timeout for a long-running activity
long_result = await self._execute_activity_with_retry(
long_running_activity,
workflow_input,
start_to_close_timeout=timedelta(minutes=10), # Override default 30 seconds
heartbeat_timeout=timedelta(seconds=30) # Override default 10 seconds
)
except Exception as e:
logger.error(f"Long running activity failed: {e}")
raise
try:
# Override retry policy for a critical activity
critical_result = await self._execute_activity_with_retry(
critical_activity,
workflow_input,
retry_policy=RetryPolicy(
maximum_attempts=10, # Override default 3 attempts
backoff_coefficient=1.5, # Override default 2.0
maximum_interval=timedelta(seconds=30) # Override default 60 seconds
)
)
except Exception as e:
logger.error(f"Critical activity failed after retries: {e}")
raise
try:
# Use custom task queue
special_result = await self._execute_activity_with_retry(
special_activity,
workflow_input,
task_queue="special-workers" # Override default task queue
)
except Exception as e:
logger.error(f"Special activity failed: {e}")
raise
try:
# Execute child workflow with custom timeout
child_result = await self._execute_child_workflow(
ChildWorkflow,
{"parent_data": workflow_input},
execution_timeout=timedelta(minutes=15) # Override default 5 minutes
)
except Exception as e:
logger.error(f"Child workflow failed: {e}")
raise
else:
return f"All results: {long_result}, {critical_result}, {special_result}, {child_result}"
@activity.defn
async def long_running_activity(data: dict[str, str]) -> str:
"""Activity that takes a long time to complete."""
logger.info("Starting long running activity")
await asyncio.sleep(300) # 5 minutes
return f"Long work completed: {data}"
@activity.defn
async def critical_activity(data: dict[str, str]) -> str:
"""Critical activity that needs more retry attempts."""
if random.random() < 0.8: # 80% failure rate for demo # noqa: S311
logger.warning("Critical operation failed, will retry")
raise Exception("Critical operation failed")
return f"Critical work completed: {data}"
@activity.defn
async def special_activity(data: dict[str, str]) -> str:
"""Activity that runs on special workers."""
logger.info("Processing on special worker")
return f"Special work completed: {data}"
class ChildWorkflow(BaseWorkflow[dict, str]):
"""Child workflow with its own logic."""
@workflow.run
async def run(self, workflow_input: dict[str, dict[str, str]]) -> str:
"""Child workflow with its own logic."""
logger.info(f"Child workflow processing: {workflow_input}")
return f"Child workflow processed: {workflow_input['parent_data']}"
Using Atomic Activities¶
Activities can use atomic transactions for database operations:
import logging
from datetime import timedelta
from archipy.adapters.temporal import AtomicActivity
from archipy.helpers.decorators.sqlalchemy_atomic import postgres_sqlalchemy_atomic_decorator
from archipy.models.errors import DatabaseConnectionError, DatabaseQueryError
# Configure logging
logger = logging.getLogger(__name__)
# Define an activity with atomic transaction support
class UserCreationActivity(AtomicActivity[dict, dict]):
"""Activity for creating users with atomic database transactions."""
def __init__(self, user_service) -> None:
"""Initialize with your business logic service.
Args:
user_service: Service containing business logic and repository access
"""
super().__init__(user_service, db_type="postgres")
async def _do_execute(self, activity_input: dict[str, str]) -> dict[str, str]:
"""Create user with atomic transaction.
Args:
activity_input: User data to create
Returns:
Created user information
Raises:
DatabaseQueryError: If database operation fails
DatabaseConnectionError: If database connection fails
"""
try:
# Execute business logic with atomic transaction
user = await self._call_atomic_method("create_user", activity_input)
# Additional database operations within the same transaction
profile = await self._call_atomic_method(
"create_user_profile",
user.uuid,
activity_input.get("profile", {})
)
except (DatabaseQueryError, DatabaseConnectionError) as e:
self._log_activity_event("user_creation_failed", {
"error": str(e),
"input": activity_input
})
raise
else:
result = {
"user_id": str(user.uuid),
"username": user.username,
"profile_id": str(profile.uuid)
}
logger.info(f"User created successfully: {user.username}")
return result
# Use in workflow
class UserOnboardingWorkflow(BaseWorkflow[dict, dict]):
"""User onboarding workflow."""
@workflow.run
async def run(self, workflow_input: dict[str, dict[str, str]]) -> dict[str, dict[str, str] | bool]:
"""User onboarding workflow.
Args:
workflow_input: User registration data
Returns:
Onboarding result
"""
self._log_workflow_event("onboarding_started")
try:
# Execute atomic user creation activity
user_result = await self._execute_activity_with_retry(
UserCreationActivity.execute_atomic,
workflow_input["user_data"],
start_to_close_timeout=timedelta(seconds=60)
)
except Exception as e:
logger.error(f"User creation failed: {e}")
raise
else:
logger.info(f"User created: {user_result['user_id']}")
try:
# Execute welcome email activity
email_result = await self._execute_activity_with_retry(
send_welcome_email_activity,
{
"user_id": user_result["user_id"],
"email": workflow_input["user_data"]["email"]
}
)
except Exception as e:
logger.error(f"Welcome email failed: {e}")
# Don't fail the workflow if email fails
email_result = False
self._log_workflow_event("onboarding_completed", {
"user_id": user_result["user_id"]
})
return {
"user": user_result,
"email_sent": email_result
}
Async Operations with Workers¶
import asyncio
import logging
from archipy.adapters.temporal import TemporalWorkerManager
from archipy.models.errors.temporal_errors import WorkerConnectionError, WorkerShutdownError
# Configure logging
logger = logging.getLogger(__name__)
async def run_worker() -> None:
"""Start a Temporal worker to execute workflows and activities."""
worker_manager = TemporalWorkerManager()
try:
# Start worker with workflows and activities
worker_handle = await worker_manager.start_worker(
task_queue="my-task-queue",
workflows=[MyWorkflow, UserOnboardingWorkflow],
activities=[UserCreationActivity, process_data_activity, send_welcome_email_activity],
max_concurrent_workflow_tasks=10,
max_concurrent_activities=20
)
except WorkerConnectionError as e:
logger.error(f"Failed to start worker: {e}")
raise
else:
logger.info(f"Worker started: {worker_handle.identity}")
try:
# Keep worker running
await worker_handle.wait_until_stopped()
except WorkerShutdownError as e:
logger.error(f"Worker shutdown error: {e}")
raise
finally:
# Graceful shutdown
await worker_manager.shutdown_all_workers()
# Activity with business logic integration
@activity.defn
async def send_welcome_email_activity(data: dict[str, str]) -> bool:
"""Send welcome email activity.
Args:
data: Email data containing user_id and email
Returns:
True if email sent successfully
"""
logger.info(f"Sending welcome email to {data['email']}")
# This would integrate with your email service
return True
Error Handling¶
import asyncio
import logging
from archipy.models.errors import (
DatabaseConnectionError,
DatabaseQueryError,
InternalError,
NotFoundError,
)
from archipy.models.errors.temporal_errors import TemporalError, WorkerConnectionError, WorkerShutdownError
# Configure logging
logger = logging.getLogger(__name__)
async def robust_workflow_execution() -> None:
"""Example of proper error handling with Temporal operations."""
temporal = TemporalAdapter()
try:
# Start workflow with error handling
workflow_handle = await temporal.start_workflow(
UserOnboardingWorkflow,
{
"user_data": {
"username": "john_doe",
"email": "john@example.com",
"profile": {"age": 30, "city": "New York"}
}
},
workflow_id="user-onboarding-001",
execution_timeout=300, # 5 minutes
run_timeout=120 # 2 minutes per run
)
except WorkerConnectionError as e:
logger.error(f"Worker connection failed: {e}")
raise
except InternalError as e:
logger.error(f"Failed to start workflow: {e}")
raise
else:
logger.info(f"Workflow started: {workflow_handle.id}")
try:
# Wait for result with timeout
result = await workflow_handle.result()
except (DatabaseQueryError, DatabaseConnectionError) as e:
logger.error(f"Database error in workflow: {e}")
raise
except TemporalError as e:
logger.error(f"Temporal operation failed: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
else:
logger.info(f"User onboarded successfully: {result}")
finally:
# Always cleanup
await temporal.close()
# Activity-level error handling
class RobustUserActivity(AtomicActivity[dict, dict]):
"""Activity with comprehensive error handling."""
def __init__(self, user_service, db_type: str = "postgres") -> None:
super().__init__(user_service, db_type)
async def _do_execute(self, activity_input: dict[str, str]) -> dict[str, str]:
"""Execute with comprehensive error handling."""
try:
result = await self._call_atomic_method("process_user_data", activity_input)
except DatabaseQueryError as e:
self._log_activity_event("database_query_failed", {
"error": str(e),
"query_type": "user_creation"
})
# Re-raise to let Temporal handle retries
raise
except DatabaseConnectionError as e:
self._log_activity_event("database_connection_failed", {
"error": str(e)
})
# This might be retryable
raise
except NotFoundError as e:
self._log_activity_event("resource_not_found", {
"error": str(e)
})
# This is likely not retryable
raise
except Exception as e:
self._log_activity_event("unexpected_error", {
"error": str(e),
"error_type": type(e).__name__
})
raise
else:
logger.info("Activity executed successfully")
return result
async def _handle_error(self, activity_input: dict[str, str], error: Exception) -> None:
"""Custom error handling for this activity."""
# Log specific error details
self._log_activity_event("activity_error_handler", {
"error_type": type(error).__name__,
"input_username": activity_input.get("username", "unknown"),
"retry_attempt": getattr(error, "attempt_count", "unknown")
})
# Call parent error handler
await super()._handle_error(activity_input, error)
Best Practices¶
- Workflow Design: Keep workflows as coordinators - let activities handle business logic
- Error Handling: Use specific error types and proper error chains with
raise ... from e - Transactions: Use
AtomicActivityfor database operations requiring consistency - Testing: Mock adapters and activities for unit testing
- Configuration: Use environment-specific configurations for different deployments
- Monitoring: Leverage workflow logging and error tracking
- Timeouts: Set appropriate timeouts for workflows and activities
- Retries: Configure retry policies based on error types and business requirements
- Metrics: Enable Prometheus metrics for production observability
Prometheus Metrics Integration¶
The Temporal adapter supports comprehensive metrics collection via Prometheus when enabled in configuration. The Temporal SDK automatically emits detailed metrics about workflow and activity execution, task queue operations, and worker performance.
Configuration¶
Enable Prometheus metrics for Temporal by setting both the global Prometheus flag and the Temporal-specific metrics flag:
import logging
from archipy.adapters.temporal import TemporalAdapter, TemporalWorkerManager
from archipy.configs.base_config import BaseConfig
from archipy.configs.config_template import PrometheusConfig, TemporalConfig
# Configure logging
logger = logging.getLogger(__name__)
# Configure global Prometheus settings
config = BaseConfig()
config.PROMETHEUS = PrometheusConfig(
IS_ENABLED=True, # Enable Prometheus globally
SERVER_PORT=8200 # Metrics endpoint port
)
# Configure Temporal with metrics enabled
temporal_config = TemporalConfig(
HOST="localhost",
PORT=7233,
NAMESPACE="default",
TASK_QUEUE="my-task-queue",
ENABLE_METRICS=True # Enable Temporal metrics
)
# Create adapter - metrics will be automatically configured
# The TemporalRuntimeManager singleton ensures consistent Runtime across all clients/workers
temporal_adapter = TemporalAdapter(temporal_config)
worker_manager = TemporalWorkerManager(temporal_config)
logger.info("Temporal adapter created with Prometheus metrics enabled")
Environment-Based Configuration¶
BaseConfig automatically reads TEMPORAL__* environment variables — no manual os.getenv() calls needed:
# TEMPORAL__HOST=temporal.production.com
# TEMPORAL__ENABLE_METRICS=true
# TEMPORAL__METRICS_PORT=8201
# Or via pyproject.toml [tool.configs] section:
# [tool.configs.TEMPORAL]
# ENABLE_METRICS = true
# HOST = "temporal.production.com"
Available Metrics¶
When metrics are enabled, the Temporal SDK automatically exposes comprehensive metrics at
http://localhost:8200/metrics (or your configured port). These metrics are shared with any existing FastAPI or gRPC
metrics.
Client-Side Metrics¶
Metrics emitted by the Temporal client (adapter):
temporal_request_*- Client request counts and latencytemporal_long_request_*- Long-polling request metricstemporal_workflow_*- Workflow operation metrics (start, execute, cancel, terminate)temporal_sticky_cache_*- Sticky workflow cache statistics
Worker-Side Metrics¶
Metrics emitted by Temporal workers:
temporal_worker_task_slots_available- Available task slots (gauge)temporal_worker_task_slots_used- Task slots currently in use (gauge)temporal_worker_task_queue_poll_*- Task queue polling metricstemporal_workflow_task_execution_*- Workflow task execution metricstemporal_activity_task_execution_*- Activity task execution metricstemporal_workflow_task_replay_latency- Workflow replay latency histogramtemporal_activity_execution_*- Activity execution counts and latency
Prometheus Queries¶
Example PromQL queries for monitoring Temporal workflows:
# Workflow execution rate (per second)
rate(temporal_workflow_task_execution_total[5m])
# Average workflow task execution time
rate(temporal_workflow_task_execution_latency_sum[5m])
/ rate(temporal_workflow_task_execution_latency_count[5m])
# Active workers by task queue
temporal_worker_task_slots_used{task_queue="my-task-queue"}
# Activity failure rate
rate(temporal_activity_task_execution_failed_total[5m])
# Task queue poll success rate
rate(temporal_worker_task_queue_poll_succeed_total[5m])
/ rate(temporal_worker_task_queue_poll_total[5m])
# P95 activity execution latency
histogram_quantile(0.95,
rate(temporal_activity_execution_latency_bucket[5m])
)
Grafana Dashboard¶
Create a Grafana dashboard to visualize Temporal metrics:
{
"dashboard": {
"title": "Temporal Workflows",
"panels": [
{
"title": "Workflow Execution Rate",
"targets": [
{
"expr": "rate(temporal_workflow_task_execution_total[5m])"
}
]
},
{
"title": "Active Workers",
"targets": [
{
"expr": "temporal_worker_task_slots_used"
}
]
},
{
"title": "Activity Success Rate",
"targets": [
{
"expr": "rate(temporal_activity_task_execution_succeed_total[5m]) / rate(temporal_activity_task_execution_total[5m])"
}
]
},
{
"title": "Workflow Task Latency (P95)",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(temporal_workflow_task_execution_latency_bucket[5m]))"
}
]
}
]
}
}
Complete Example with Metrics¶
import asyncio
import logging
from temporalio import activity, workflow
from archipy.adapters.temporal import BaseWorkflow, TemporalAdapter, TemporalWorkerManager
from archipy.configs.base_config import BaseConfig
from archipy.configs.config_template import PrometheusConfig, TemporalConfig
# Configure logging
logger = logging.getLogger(__name__)
# Configure Prometheus
config = BaseConfig()
config.PROMETHEUS = PrometheusConfig(IS_ENABLED=True, SERVER_PORT=8200)
# Configure Temporal with metrics
temporal_config = TemporalConfig(
HOST="localhost",
PORT=7233,
NAMESPACE="default",
TASK_QUEUE="metrics-demo",
ENABLE_METRICS=True
)
# Define workflow
class MetricsWorkflow(BaseWorkflow[str, str]):
"""Workflow with metrics collection."""
@workflow.run
async def run(self, name: str) -> str:
"""Process with metrics tracking."""
self._log_workflow_event("workflow_started", {"name": name})
result = await self._execute_activity_with_retry(
greet_activity,
name
)
self._log_workflow_event("workflow_completed", {"result": result})
return result
@activity.defn
async def greet_activity(name: str) -> str:
"""Activity with metrics tracking."""
logger.info(f"Processing greeting for {name}")
await asyncio.sleep(0.1) # Simulate work
return f"Hello, {name}!"
async def main() -> None:
"""Run workflow with metrics collection."""
# Create adapter and worker
temporal_adapter = TemporalAdapter(temporal_config)
worker_manager = TemporalWorkerManager(temporal_config)
try:
# Start worker
worker_handle = await worker_manager.start_worker(
task_queue="metrics-demo",
workflows=[MetricsWorkflow],
activities=[greet_activity]
)
logger.info("Worker started with metrics enabled")
# Execute workflows - metrics will be collected
results = []
for i in range(10):
result = await temporal_adapter.execute_workflow(
MetricsWorkflow,
f"User{i}",
workflow_id=f"metrics-workflow-{i}",
task_queue="metrics-demo"
)
results.append(result)
logger.info(f"Completed {len(results)} workflows")
logger.info("Metrics available at http://localhost:8200/metrics")
finally:
await worker_manager.shutdown_all_workers()
await temporal_adapter.close()
if __name__ == "__main__":
asyncio.run(main())
Monitoring Best Practices¶
- Set Alerts: Configure Prometheus alerts for high failure rates or latency
- Track Task Queues: Monitor task queue depths and worker availability
- Workflow Duration: Alert on workflows exceeding expected execution time
- Activity Failures: Track activity failure patterns for debugging
- Worker Health: Monitor worker task slot usage and availability
- Resource Usage: Correlate Temporal metrics with system resource metrics
Troubleshooting¶
If metrics are not appearing:
- Verify
PROMETHEUS.IS_ENABLED = Truein global config - Verify
ENABLE_METRICS = Truein TemporalConfig - Check that the Prometheus endpoint is accessible:
curl http://localhost:8200/metrics - Ensure the
temporaliopackage is installed with metrics support - Check logs for any Runtime initialization errors
The Temporal Runtime with Prometheus is created lazily on first client connection using a singleton manager (
TemporalRuntimeManager) that ensures consistent Runtime configuration across all clients and workers. The singleton
pattern prevents multiple Runtime instances and guarantees thread-safe access.
See Also¶
- Error Handling — Exception handling patterns with proper chaining
- Configuration Management — Temporal configuration setup
- BDD Testing — Testing workflow operations
- SQLAlchemy Decorators — Atomic transaction usage
- API Reference — Full Temporal adapter API documentation