Skip to content

Temporal

The temporal adapter provides integration with Temporal, a durable workflow orchestration platform, including workflow and activity definitions, worker management, and runtime configuration.

Ports

Abstract port interface defining the Temporal adapter contract.

Port interfaces for Temporal workflow orchestration.

This module defines the abstract interfaces for Temporal workflow and activity operations, providing a standardized contract for workflow orchestration within the ArchiPy architecture.

archipy.adapters.temporal.ports.T module-attribute

T = TypeVar('T')

archipy.adapters.temporal.ports.TemporalPort

Interface for Temporal workflow operations providing a standardized access pattern.

This interface defines the contract for Temporal adapters, ensuring consistent implementation of workflow operations across different adapters. It covers workflow lifecycle management, execution control, and query operations.

Implementing classes should provide concrete implementations for all methods, typically by wrapping a Temporal client library.

Source code in archipy/adapters/temporal/ports.py
class TemporalPort:
    """Interface for Temporal workflow operations providing a standardized access pattern.

    This interface defines the contract for Temporal adapters, ensuring consistent
    implementation of workflow operations across different adapters. It covers
    workflow lifecycle management, execution control, and query operations.

    Implementing classes should provide concrete implementations for all
    methods, typically by wrapping a Temporal client library.
    """

    @abstractmethod
    async def start_workflow(
        self,
        workflow: str | Callable,
        arg: Any = None,
        workflow_id: str | None = None,
        task_queue: str | None = None,
        execution_timeout: int | None = None,
        run_timeout: int | None = None,
        task_timeout: int | None = None,
        memo: dict[str, Any] | None = None,
        search_attributes: dict[str, Any] | None = None,
    ) -> Any:  # WorkflowHandle
        """Start a workflow execution asynchronously.

        Args:
            workflow (str | Callable): The workflow function or workflow type name.
            arg (Any, optional): Input argument for the workflow. Defaults to None.
            workflow_id (str, optional): Unique identifier for the workflow execution.
                If None, a UUID will be generated. Defaults to None.
            task_queue (str, optional): Task queue name for workflow execution.
                If None, uses the default task queue. Defaults to None.
            execution_timeout (int, optional): Maximum workflow execution time in seconds.
                Overrides config default. Defaults to None.
            run_timeout (int, optional): Maximum single workflow run time in seconds.
                Overrides config default. Defaults to None.
            task_timeout (int, optional): Maximum workflow task processing time in seconds.
                Overrides config default. Defaults to None.
            memo (dict[str, Any], optional): Non-indexed metadata for the workflow.
                Defaults to None.
            search_attributes (dict[str, Any], optional): Indexed metadata for workflow search.
                Defaults to None.

        Returns:
            WorkflowHandle[T, Any]: Handle to the started workflow execution.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def execute_workflow(
        self,
        workflow: str | Callable,
        arg: Any = None,
        workflow_id: str | None = None,
        task_queue: str | None = None,
        execution_timeout: int | None = None,
        run_timeout: int | None = None,
        task_timeout: int | None = None,
    ) -> T:
        """Execute a workflow and wait for its completion.

        Args:
            workflow (str | Callable): The workflow function or workflow type name.
            arg (Any, optional): Input argument for the workflow. Defaults to None.
            workflow_id (str, optional): Unique identifier for the workflow execution.
                If None, a UUID will be generated. Defaults to None.
            task_queue (str, optional): Task queue name for workflow execution.
                If None, uses the default task queue. Defaults to None.
            execution_timeout (int, optional): Maximum workflow execution time in seconds.
                Overrides config default. Defaults to None.
            run_timeout (int, optional): Maximum single workflow run time in seconds.
                Overrides config default. Defaults to None.
            task_timeout (int, optional): Maximum workflow task processing time in seconds.
                Overrides config default. Defaults to None.

        Returns:
            T: The workflow execution result.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def get_workflow_handle(self, workflow_id: str, run_id: str | None = None) -> Any:  # WorkflowHandle
        """Get a handle to an existing workflow execution.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, gets the latest run. Defaults to None.

        Returns:
            WorkflowHandle[T, Any]: Handle to the workflow execution.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def cancel_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = None) -> None:
        """Cancel a running workflow execution.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, cancels the latest run. Defaults to None.
            reason (str, optional): Reason for cancellation. Defaults to None.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def terminate_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = None) -> None:
        """Terminate a running workflow execution immediately.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, terminates the latest run. Defaults to None.
            reason (str, optional): Reason for termination. Defaults to None.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def signal_workflow(
        self,
        workflow_id: str,
        signal_name: str,
        arg: Any = None,
        run_id: str | None = None,
    ) -> None:
        """Send a signal to a running workflow execution.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            signal_name (str): The name of the signal to send.
            arg (Any, optional): Argument to pass with the signal. Defaults to None.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, signals the latest run. Defaults to None.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def query_workflow(
        self,
        workflow_id: str,
        query_name: str,
        arg: Any = None,
        run_id: str | None = None,
    ) -> Any:
        """Query a running workflow execution for information.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            query_name (str): The name of the query to execute.
            arg (Any, optional): Argument to pass with the query. Defaults to None.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, queries the latest run. Defaults to None.

        Returns:
            Any: The query result from the workflow.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def list_workflows(
        self,
        query: str | None = None,
        page_size: int | None = None,
        next_page_token: bytes | None = None,
    ) -> WorkflowListResponse:
        """List workflow executions matching the given criteria.

        Args:
            query (str, optional): List filter query in Temporal SQL syntax.
                Defaults to None (no filter).
            page_size (int, optional): Maximum number of results per page.
                Defaults to None (server default).
            next_page_token (bytes, optional): Token for pagination.
                Defaults to None (first page).

        Returns:
            WorkflowListResponse: List of workflow executions with pagination info.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def describe_workflow(self, workflow_id: str, run_id: str | None = None) -> WorkflowDescription:
        """Get detailed information about a workflow execution.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, describes the latest run. Defaults to None.

        Returns:
            WorkflowDescription: Detailed workflow execution information.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def close(self) -> None:
        """Close the Temporal client connection.

        Performs cleanup of resources and closes the connection to the Temporal server.
        Should be called when the adapter is no longer needed.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def create_schedule(
        self,
        schedule_id: str,
        workflow_class: Any,
        spec: ScheduleSpec,
        task_queue: str,
    ) -> None:
        """Create a new schedule."""
        raise NotImplementedError

    @abstractmethod
    async def stop_schedule(self, schedule_id: str) -> None:
        """Stop a schedule."""
        raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.start_workflow abstractmethod async

start_workflow(
    workflow: str | Callable,
    arg: Any = None,
    workflow_id: str | None = None,
    task_queue: str | None = None,
    execution_timeout: int | None = None,
    run_timeout: int | None = None,
    task_timeout: int | None = None,
    memo: dict[str, Any] | None = None,
    search_attributes: dict[str, Any] | None = None,
) -> Any

Start a workflow execution asynchronously.

Parameters:

Name Type Description Default
workflow str | Callable

The workflow function or workflow type name.

required
arg Any

Input argument for the workflow. Defaults to None.

None
workflow_id str

Unique identifier for the workflow execution. If None, a UUID will be generated. Defaults to None.

None
task_queue str

Task queue name for workflow execution. If None, uses the default task queue. Defaults to None.

None
execution_timeout int

Maximum workflow execution time in seconds. Overrides config default. Defaults to None.

None
run_timeout int

Maximum single workflow run time in seconds. Overrides config default. Defaults to None.

None
task_timeout int

Maximum workflow task processing time in seconds. Overrides config default. Defaults to None.

None
memo dict[str, Any]

Non-indexed metadata for the workflow. Defaults to None.

None
search_attributes dict[str, Any]

Indexed metadata for workflow search. Defaults to None.

None

Returns:

Type Description
Any

WorkflowHandle[T, Any]: Handle to the started workflow execution.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def start_workflow(
    self,
    workflow: str | Callable,
    arg: Any = None,
    workflow_id: str | None = None,
    task_queue: str | None = None,
    execution_timeout: int | None = None,
    run_timeout: int | None = None,
    task_timeout: int | None = None,
    memo: dict[str, Any] | None = None,
    search_attributes: dict[str, Any] | None = None,
) -> Any:  # WorkflowHandle
    """Start a workflow execution asynchronously.

    Args:
        workflow (str | Callable): The workflow function or workflow type name.
        arg (Any, optional): Input argument for the workflow. Defaults to None.
        workflow_id (str, optional): Unique identifier for the workflow execution.
            If None, a UUID will be generated. Defaults to None.
        task_queue (str, optional): Task queue name for workflow execution.
            If None, uses the default task queue. Defaults to None.
        execution_timeout (int, optional): Maximum workflow execution time in seconds.
            Overrides config default. Defaults to None.
        run_timeout (int, optional): Maximum single workflow run time in seconds.
            Overrides config default. Defaults to None.
        task_timeout (int, optional): Maximum workflow task processing time in seconds.
            Overrides config default. Defaults to None.
        memo (dict[str, Any], optional): Non-indexed metadata for the workflow.
            Defaults to None.
        search_attributes (dict[str, Any], optional): Indexed metadata for workflow search.
            Defaults to None.

    Returns:
        WorkflowHandle[T, Any]: Handle to the started workflow execution.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.execute_workflow abstractmethod async

execute_workflow(
    workflow: str | Callable,
    arg: Any = None,
    workflow_id: str | None = None,
    task_queue: str | None = None,
    execution_timeout: int | None = None,
    run_timeout: int | None = None,
    task_timeout: int | None = None,
) -> T

Execute a workflow and wait for its completion.

Parameters:

Name Type Description Default
workflow str | Callable

The workflow function or workflow type name.

required
arg Any

Input argument for the workflow. Defaults to None.

None
workflow_id str

Unique identifier for the workflow execution. If None, a UUID will be generated. Defaults to None.

None
task_queue str

Task queue name for workflow execution. If None, uses the default task queue. Defaults to None.

None
execution_timeout int

Maximum workflow execution time in seconds. Overrides config default. Defaults to None.

None
run_timeout int

Maximum single workflow run time in seconds. Overrides config default. Defaults to None.

None
task_timeout int

Maximum workflow task processing time in seconds. Overrides config default. Defaults to None.

None

Returns:

Name Type Description
T T

The workflow execution result.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def execute_workflow(
    self,
    workflow: str | Callable,
    arg: Any = None,
    workflow_id: str | None = None,
    task_queue: str | None = None,
    execution_timeout: int | None = None,
    run_timeout: int | None = None,
    task_timeout: int | None = None,
) -> T:
    """Execute a workflow and wait for its completion.

    Args:
        workflow (str | Callable): The workflow function or workflow type name.
        arg (Any, optional): Input argument for the workflow. Defaults to None.
        workflow_id (str, optional): Unique identifier for the workflow execution.
            If None, a UUID will be generated. Defaults to None.
        task_queue (str, optional): Task queue name for workflow execution.
            If None, uses the default task queue. Defaults to None.
        execution_timeout (int, optional): Maximum workflow execution time in seconds.
            Overrides config default. Defaults to None.
        run_timeout (int, optional): Maximum single workflow run time in seconds.
            Overrides config default. Defaults to None.
        task_timeout (int, optional): Maximum workflow task processing time in seconds.
            Overrides config default. Defaults to None.

    Returns:
        T: The workflow execution result.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.get_workflow_handle abstractmethod async

get_workflow_handle(
    workflow_id: str, run_id: str | None = None
) -> Any

Get a handle to an existing workflow execution.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
run_id str

The specific run identifier within the workflow. If None, gets the latest run. Defaults to None.

None

Returns:

Type Description
Any

WorkflowHandle[T, Any]: Handle to the workflow execution.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def get_workflow_handle(self, workflow_id: str, run_id: str | None = None) -> Any:  # WorkflowHandle
    """Get a handle to an existing workflow execution.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, gets the latest run. Defaults to None.

    Returns:
        WorkflowHandle[T, Any]: Handle to the workflow execution.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.cancel_workflow abstractmethod async

cancel_workflow(
    workflow_id: str,
    run_id: str | None = None,
    reason: str | None = None,
) -> None

Cancel a running workflow execution.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
run_id str

The specific run identifier within the workflow. If None, cancels the latest run. Defaults to None.

None
reason str

Reason for cancellation. Defaults to None.

None

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def cancel_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = None) -> None:
    """Cancel a running workflow execution.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, cancels the latest run. Defaults to None.
        reason (str, optional): Reason for cancellation. Defaults to None.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.terminate_workflow abstractmethod async

terminate_workflow(
    workflow_id: str,
    run_id: str | None = None,
    reason: str | None = None,
) -> None

Terminate a running workflow execution immediately.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
run_id str

The specific run identifier within the workflow. If None, terminates the latest run. Defaults to None.

None
reason str

Reason for termination. Defaults to None.

None

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def terminate_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = None) -> None:
    """Terminate a running workflow execution immediately.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, terminates the latest run. Defaults to None.
        reason (str, optional): Reason for termination. Defaults to None.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.signal_workflow abstractmethod async

signal_workflow(
    workflow_id: str,
    signal_name: str,
    arg: Any = None,
    run_id: str | None = None,
) -> None

Send a signal to a running workflow execution.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
signal_name str

The name of the signal to send.

required
arg Any

Argument to pass with the signal. Defaults to None.

None
run_id str

The specific run identifier within the workflow. If None, signals the latest run. Defaults to None.

None

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def signal_workflow(
    self,
    workflow_id: str,
    signal_name: str,
    arg: Any = None,
    run_id: str | None = None,
) -> None:
    """Send a signal to a running workflow execution.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        signal_name (str): The name of the signal to send.
        arg (Any, optional): Argument to pass with the signal. Defaults to None.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, signals the latest run. Defaults to None.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.query_workflow abstractmethod async

query_workflow(
    workflow_id: str,
    query_name: str,
    arg: Any = None,
    run_id: str | None = None,
) -> Any

Query a running workflow execution for information.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
query_name str

The name of the query to execute.

required
arg Any

Argument to pass with the query. Defaults to None.

None
run_id str

The specific run identifier within the workflow. If None, queries the latest run. Defaults to None.

None

Returns:

Name Type Description
Any Any

The query result from the workflow.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def query_workflow(
    self,
    workflow_id: str,
    query_name: str,
    arg: Any = None,
    run_id: str | None = None,
) -> Any:
    """Query a running workflow execution for information.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        query_name (str): The name of the query to execute.
        arg (Any, optional): Argument to pass with the query. Defaults to None.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, queries the latest run. Defaults to None.

    Returns:
        Any: The query result from the workflow.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.list_workflows abstractmethod async

list_workflows(
    query: str | None = None,
    page_size: int | None = None,
    next_page_token: bytes | None = None,
) -> WorkflowListResponse

List workflow executions matching the given criteria.

Parameters:

Name Type Description Default
query str

List filter query in Temporal SQL syntax. Defaults to None (no filter).

None
page_size int

Maximum number of results per page. Defaults to None (server default).

None
next_page_token bytes

Token for pagination. Defaults to None (first page).

None

Returns:

Name Type Description
WorkflowListResponse WorkflowListResponse

List of workflow executions with pagination info.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def list_workflows(
    self,
    query: str | None = None,
    page_size: int | None = None,
    next_page_token: bytes | None = None,
) -> WorkflowListResponse:
    """List workflow executions matching the given criteria.

    Args:
        query (str, optional): List filter query in Temporal SQL syntax.
            Defaults to None (no filter).
        page_size (int, optional): Maximum number of results per page.
            Defaults to None (server default).
        next_page_token (bytes, optional): Token for pagination.
            Defaults to None (first page).

    Returns:
        WorkflowListResponse: List of workflow executions with pagination info.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.describe_workflow abstractmethod async

describe_workflow(
    workflow_id: str, run_id: str | None = None
) -> WorkflowDescription

Get detailed information about a workflow execution.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
run_id str

The specific run identifier within the workflow. If None, describes the latest run. Defaults to None.

None

Returns:

Name Type Description
WorkflowDescription WorkflowDescription

Detailed workflow execution information.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def describe_workflow(self, workflow_id: str, run_id: str | None = None) -> WorkflowDescription:
    """Get detailed information about a workflow execution.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, describes the latest run. Defaults to None.

    Returns:
        WorkflowDescription: Detailed workflow execution information.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.close abstractmethod async

close() -> None

Close the Temporal client connection.

Performs cleanup of resources and closes the connection to the Temporal server. Should be called when the adapter is no longer needed.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def close(self) -> None:
    """Close the Temporal client connection.

    Performs cleanup of resources and closes the connection to the Temporal server.
    Should be called when the adapter is no longer needed.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.create_schedule abstractmethod async

create_schedule(
    schedule_id: str,
    workflow_class: Any,
    spec: ScheduleSpec,
    task_queue: str,
) -> None

Create a new schedule.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def create_schedule(
    self,
    schedule_id: str,
    workflow_class: Any,
    spec: ScheduleSpec,
    task_queue: str,
) -> None:
    """Create a new schedule."""
    raise NotImplementedError

archipy.adapters.temporal.ports.TemporalPort.stop_schedule abstractmethod async

stop_schedule(schedule_id: str) -> None

Stop a schedule.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def stop_schedule(self, schedule_id: str) -> None:
    """Stop a schedule."""
    raise NotImplementedError

archipy.adapters.temporal.ports.WorkerPort

Interface for Temporal worker operations providing a standardized access pattern.

This interface defines the contract for Temporal worker management, ensuring consistent implementation of worker lifecycle operations. Workers are responsible for executing workflows and activities.

Implementing classes should provide concrete implementations for all methods, typically by wrapping a Temporal worker.

Source code in archipy/adapters/temporal/ports.py
class WorkerPort:
    """Interface for Temporal worker operations providing a standardized access pattern.

    This interface defines the contract for Temporal worker management, ensuring consistent
    implementation of worker lifecycle operations. Workers are responsible for executing
    workflows and activities.

    Implementing classes should provide concrete implementations for all
    methods, typically by wrapping a Temporal worker.
    """

    @abstractmethod
    async def start_worker(
        self,
        task_queue: str,
        workflows: list[type] | None = None,
        activities: list[Callable[..., Any]] | None = None,
        build_id: str | None = None,
        identity: str | None = None,
        max_concurrent_workflow_tasks: int | None = None,
        max_concurrent_activities: int | None = None,
    ) -> WorkerHandle:
        """Start a Temporal worker for the specified task queue.

        Args:
            task_queue (str): The task queue this worker will poll from.
            workflows (list[type], optional): List of workflow classes to register.
                Defaults to None.
            activities (list[Callable], optional): List of activity callables to register.
                Defaults to None.
            build_id (str, optional): Build identifier for worker versioning.
                Defaults to None.
            identity (str, optional): Unique worker identity. If None, auto-generated.
                Defaults to None.
            max_concurrent_workflow_tasks (int, optional): Maximum concurrent workflow tasks.
                Defaults to None (server default).
            max_concurrent_activities (int, optional): Maximum concurrent activity tasks.
                Defaults to None (server default).

        Returns:
            WorkerHandle: Handle to the started worker.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def stop_worker(self, worker_handle: WorkerHandle) -> None:
        """Stop a running Temporal worker.

        Args:
            worker_handle (WorkerHandle): Handle to the worker to stop.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def shutdown_all_workers(self) -> None:
        """Shutdown all workers managed by this port.

        Performs graceful shutdown of all active workers, waiting for current
        tasks to complete before terminating.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

archipy.adapters.temporal.ports.WorkerPort.start_worker abstractmethod async

start_worker(
    task_queue: str,
    workflows: list[type] | None = None,
    activities: list[Callable[..., Any]] | None = None,
    build_id: str | None = None,
    identity: str | None = None,
    max_concurrent_workflow_tasks: int | None = None,
    max_concurrent_activities: int | None = None,
) -> WorkerHandle

Start a Temporal worker for the specified task queue.

Parameters:

Name Type Description Default
task_queue str

The task queue this worker will poll from.

required
workflows list[type]

List of workflow classes to register. Defaults to None.

None
activities list[Callable]

List of activity callables to register. Defaults to None.

None
build_id str

Build identifier for worker versioning. Defaults to None.

None
identity str

Unique worker identity. If None, auto-generated. Defaults to None.

None
max_concurrent_workflow_tasks int

Maximum concurrent workflow tasks. Defaults to None (server default).

None
max_concurrent_activities int

Maximum concurrent activity tasks. Defaults to None (server default).

None

Returns:

Name Type Description
WorkerHandle WorkerHandle

Handle to the started worker.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def start_worker(
    self,
    task_queue: str,
    workflows: list[type] | None = None,
    activities: list[Callable[..., Any]] | None = None,
    build_id: str | None = None,
    identity: str | None = None,
    max_concurrent_workflow_tasks: int | None = None,
    max_concurrent_activities: int | None = None,
) -> WorkerHandle:
    """Start a Temporal worker for the specified task queue.

    Args:
        task_queue (str): The task queue this worker will poll from.
        workflows (list[type], optional): List of workflow classes to register.
            Defaults to None.
        activities (list[Callable], optional): List of activity callables to register.
            Defaults to None.
        build_id (str, optional): Build identifier for worker versioning.
            Defaults to None.
        identity (str, optional): Unique worker identity. If None, auto-generated.
            Defaults to None.
        max_concurrent_workflow_tasks (int, optional): Maximum concurrent workflow tasks.
            Defaults to None (server default).
        max_concurrent_activities (int, optional): Maximum concurrent activity tasks.
            Defaults to None (server default).

    Returns:
        WorkerHandle: Handle to the started worker.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.WorkerPort.stop_worker abstractmethod async

stop_worker(worker_handle: WorkerHandle) -> None

Stop a running Temporal worker.

Parameters:

Name Type Description Default
worker_handle WorkerHandle

Handle to the worker to stop.

required

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def stop_worker(self, worker_handle: WorkerHandle) -> None:
    """Stop a running Temporal worker.

    Args:
        worker_handle (WorkerHandle): Handle to the worker to stop.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.WorkerPort.shutdown_all_workers abstractmethod async

shutdown_all_workers() -> None

Shutdown all workers managed by this port.

Performs graceful shutdown of all active workers, waiting for current tasks to complete before terminating.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def shutdown_all_workers(self) -> None:
    """Shutdown all workers managed by this port.

    Performs graceful shutdown of all active workers, waiting for current
    tasks to complete before terminating.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.temporal.ports.WorkflowHandle

Type stub for workflow handle.

Source code in archipy/adapters/temporal/ports.py
class WorkflowHandle:
    """Type stub for workflow handle."""

archipy.adapters.temporal.ports.WorkflowListResponse

Type stub for workflow list response.

Source code in archipy/adapters/temporal/ports.py
class WorkflowListResponse:
    """Type stub for workflow list response."""

archipy.adapters.temporal.ports.WorkflowDescription

Type stub for workflow description.

Source code in archipy/adapters/temporal/ports.py
class WorkflowDescription:
    """Type stub for workflow description."""

archipy.adapters.temporal.ports.WorkerHandle

Bases: ABC

Base type for worker handle.

This is an abstract base class that concrete implementations should extend. It provides a common interface for worker handle operations.

Source code in archipy/adapters/temporal/ports.py
class WorkerHandle(ABC):
    """Base type for worker handle.

    This is an abstract base class that concrete implementations should extend.
    It provides a common interface for worker handle operations.
    """

    worker_id: str
    task_queue: str

    @abstractmethod
    async def stop(self, grace_period: int = 30) -> None:
        """Stop the worker gracefully.

        Args:
            grace_period: Maximum time in seconds to wait for graceful shutdown.
        """
        ...

archipy.adapters.temporal.ports.WorkerHandle.worker_id instance-attribute

worker_id: str

archipy.adapters.temporal.ports.WorkerHandle.task_queue instance-attribute

task_queue: str

archipy.adapters.temporal.ports.WorkerHandle.stop abstractmethod async

stop(grace_period: int = 30) -> None

Stop the worker gracefully.

Parameters:

Name Type Description Default
grace_period int

Maximum time in seconds to wait for graceful shutdown.

30
Source code in archipy/adapters/temporal/ports.py
@abstractmethod
async def stop(self, grace_period: int = 30) -> None:
    """Stop the worker gracefully.

    Args:
        grace_period: Maximum time in seconds to wait for graceful shutdown.
    """
    ...

options: show_root_toc_entry: false heading_level: 3

Base

Base classes for Temporal workflow and activity implementations.

Base classes for Temporal workflows and activities.

This module provides base classes and utilities for implementing Temporal workflows and activities within the ArchiPy architecture, including integration with existing adapters and standardized patterns.

archipy.adapters.temporal.base.T module-attribute

T = TypeVar('T')

archipy.adapters.temporal.base.R module-attribute

R = TypeVar('R')

archipy.adapters.temporal.base.BaseWorkflow

Base class for all Temporal workflows in ArchiPy.

Provides common functionality and patterns for workflow implementations, including standardized logging, error handling, and integration with ArchiPy services through activities.

Class Type Parameters:

Name Bound or Constraints Description Default
T

Type of the workflow input parameter.

required
R

Type of the workflow return value.

required
Source code in archipy/adapters/temporal/base.py
class BaseWorkflow[T, R]:
    """Base class for all Temporal workflows in ArchiPy.

    Provides common functionality and patterns for workflow implementations,
    including standardized logging, error handling, and integration with
    ArchiPy services through activities.

    Type Parameters:
        T: Type of the workflow input parameter.
        R: Type of the workflow return value.
    """

    @workflow.run
    async def run(self, workflow_input: T) -> R:
        """Main workflow execution method.

        This method must be implemented by concrete workflow classes to define
        the workflow logic. It should orchestrate activities and child workflows
        to accomplish the business process.

        Args:
            workflow_input (T): The input data for the workflow.

        Returns:
            R: The result of the workflow execution.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError("Workflow must implement run method")

    async def _execute_activity_with_retry(
        self,
        activity_func: Any,
        arg: Any,
        start_to_close_timeout: timedelta | None = None,
        heartbeat_timeout: timedelta | None = None,
        retry_policy: RetryPolicy | None = None,
        task_queue: str | None = None,
    ) -> Any:
        """Execute an activity with standardized retry configuration.

        Args:
            activity_func (Any): The activity function to execute.
            arg (Any): Argument to pass to the activity.
            start_to_close_timeout (timedelta): Maximum execution time for the activity.
                Defaults to 30 seconds.
            heartbeat_timeout (timedelta, optional): Heartbeat timeout for long-running activities.
                Defaults to None.
            retry_policy (RetryPolicy, optional): Custom retry policy.
                If None, uses default retry policy. Defaults to None.
            task_queue (str, optional): Task queue for activity execution.
                If None, uses workflow's task queue. Defaults to None.

        Returns:
            Any: The result of the activity execution.
        """
        # Get temporal config for default values
        from archipy.configs.base_config import BaseConfig

        temporal_config = BaseConfig.global_config().TEMPORAL

        # Use config defaults if not provided
        if start_to_close_timeout is None:
            start_to_close_timeout = timedelta(seconds=temporal_config.ACTIVITY_START_TO_CLOSE_TIMEOUT)
        if heartbeat_timeout is None:
            heartbeat_timeout = timedelta(seconds=temporal_config.ACTIVITY_HEARTBEAT_TIMEOUT)
        if retry_policy is None:
            retry_policy = RetryPolicy(
                maximum_attempts=temporal_config.RETRY_MAXIMUM_ATTEMPTS,
                backoff_coefficient=temporal_config.RETRY_BACKOFF_COEFFICIENT,
                maximum_interval=timedelta(seconds=temporal_config.RETRY_MAXIMUM_INTERVAL),
            )

        return await workflow.execute_activity(
            activity_func,
            arg,
            start_to_close_timeout=start_to_close_timeout,
            heartbeat_timeout=heartbeat_timeout,
            retry_policy=retry_policy,
            task_queue=task_queue or temporal_config.TASK_QUEUE,
        )

    async def _execute_child_workflow(
        self,
        child_workflow: Any,
        arg: Any,
        workflow_id: str | None = None,
        task_queue: str | None = None,
        execution_timeout: timedelta | None = None,
    ) -> Any:
        """Execute a child workflow with standardized configuration.

        Args:
            child_workflow (Any): The child workflow function to execute.
            arg (Any): Argument to pass to the child workflow.
            workflow_id (str, optional): Unique ID for the child workflow.
                If None, auto-generated. Defaults to None.
            task_queue (str, optional): Task queue for child workflow execution.
                If None, uses parent workflow's task queue. Defaults to None.
            execution_timeout (timedelta, optional): Maximum execution time for the child workflow.
                If None, uses default timeout. Defaults to None.

        Returns:
            Any: The result of the child workflow execution.
        """
        # Get temporal config for default values
        from archipy.configs.base_config import BaseConfig

        temporal_config = BaseConfig.global_config().TEMPORAL

        # Use config defaults if not provided
        if execution_timeout is None:
            execution_timeout = timedelta(seconds=temporal_config.WORKFLOW_EXECUTION_TIMEOUT)

        return await workflow.execute_child_workflow(
            child_workflow,
            arg,
            id=workflow_id,
            task_queue=task_queue or temporal_config.TASK_QUEUE,
            execution_timeout=execution_timeout,
        )

    def _log_workflow_event(self, event: str, details: dict[str, Any] | None = None) -> None:
        """Log workflow events with consistent formatting.

        Args:
            event (str): The event description.
            details (dict[str, Any], optional): Additional event details.
                Defaults to None.
        """
        log_data = {
            "workflow_id": workflow.info().workflow_id,
            "workflow_type": workflow.info().workflow_type,
            "event": event,
        }

        if details:
            log_data.update(details)

        workflow.logger.info("Workflow event", extra=log_data)

archipy.adapters.temporal.base.BaseWorkflow.run async

run(workflow_input: T) -> R

Main workflow execution method.

This method must be implemented by concrete workflow classes to define the workflow logic. It should orchestrate activities and child workflows to accomplish the business process.

Parameters:

Name Type Description Default
workflow_input T

The input data for the workflow.

required

Returns:

Name Type Description
R R

The result of the workflow execution.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/temporal/base.py
@workflow.run
async def run(self, workflow_input: T) -> R:
    """Main workflow execution method.

    This method must be implemented by concrete workflow classes to define
    the workflow logic. It should orchestrate activities and child workflows
    to accomplish the business process.

    Args:
        workflow_input (T): The input data for the workflow.

    Returns:
        R: The result of the workflow execution.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError("Workflow must implement run method")

archipy.adapters.temporal.base.BaseActivity

Base class for all Temporal activities in ArchiPy.

Provides common functionality for activity implementations, including integration with your logic layer, standardized error handling, and execution hooks for cross-cutting concerns.

Class Type Parameters:

Name Bound or Constraints Description Default
T

Type of the activity input parameter.

required
R

Type of the activity return value.

required
Source code in archipy/adapters/temporal/base.py
class BaseActivity[T, R]:
    """Base class for all Temporal activities in ArchiPy.

    Provides common functionality for activity implementations, including
    integration with your logic layer, standardized error handling, and
    execution hooks for cross-cutting concerns.

    Type Parameters:
        T: Type of the activity input parameter.
        R: Type of the activity return value.
    """

    def __init__(self, logic: Any | None = None) -> None:
        """Initialize the activity with a logic instance.

        Args:
            logic (Any, optional): Your business logic instance (object) that contains
                a repository with access to adapters. If None, subclass should override _get_logic().
                Defaults to None.
        """
        self._logic = logic

    def _get_logic(self) -> Any:
        """Get the logic instance for this activity.

        Override this method in subclasses to provide your specific logic instance,
        or pass it via constructor using dependency injection. Your logic instance
        should have a repository that manages adapter access.

        Returns:
            Any: Your business logic instance.

        Raises:
            NotImplementedError: If not implemented by the subclass and no logic provided.
        """
        if self._logic is not None:
            return self._logic
        raise NotImplementedError("Activity must provide a logic instance via constructor or override _get_logic()")

    @activity.defn
    async def execute(self, activity_input: T) -> R:
        """Main activity execution method with hooks.

        This method provides a template for activity execution with pre/post hooks
        for common concerns like caching, validation, and monitoring.

        Args:
            activity_input (T): The input data for the activity.

        Returns:
            R: The result of the activity execution.

        Raises:
            Exception: Any exception that occurs during activity execution.
        """
        try:
            # Pre-execution hook
            await self._before_execute(activity_input)

            # Check cache if enabled
            if self._is_cacheable():
                cache_key = self._get_cache_key(activity_input)
                cached_result = await self._get_from_cache(cache_key)
                if cached_result is not None:
                    activity.logger.info("Using cached result", extra={"cache_key": cache_key})
                    return cached_result

            # Main business logic execution
            result = await self._do_execute(activity_input)

            # Cache result if enabled
            if self._is_cacheable() and result is not None:
                cache_key = self._get_cache_key(activity_input)
                await self._store_in_cache(cache_key, result)

            # Post-execution hook
            await self._after_execute(activity_input, result)

            return result

        except Exception as error:
            await self._handle_error(activity_input, error)
            raise

    @abstractmethod
    async def _do_execute(self, activity_input: T) -> R:
        """Execute the main activity business logic.

        This method must be implemented by concrete activity classes to define
        the specific business logic for the activity.

        Args:
            activity_input (T): The input data for the activity.

        Returns:
            R: The result of the activity execution.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError("Activity must implement _do_execute method")

    async def _before_execute(self, activity_input: T) -> None:
        """Pre-execution hook for common setup tasks.

        Override this method to perform tasks before the main activity logic,
        such as validation, setup, or preparation.

        Args:
            activity_input (T): The input data for the activity.
        """
        self._log_activity_event("execution_started", {"input_type": type(activity_input).__name__})

    async def _after_execute(self, activity_input: T, result: R) -> None:
        """Post-execution hook for cleanup and monitoring.

        Override this method to perform tasks after successful activity execution,
        such as cleanup, metrics emission, or notifications.

        Args:
            activity_input (T): The input data that was processed.
            result (R): The result of the activity execution.
        """
        self._log_activity_event(
            "execution_completed",
            {
                "input_type": type(activity_input).__name__,
                "result_type": type(result).__name__,
            },
        )

    async def _handle_error(self, activity_input: T, error: Exception) -> None:
        """Handle activity execution errors.

        Override this method to implement custom error handling logic,
        such as error reporting, cleanup, or compensation actions.

        Args:
            activity_input (T): The input data that was being processed.
            error (Exception): The exception that occurred.
        """
        self._log_activity_event(
            "execution_failed",
            {
                "input_type": type(activity_input).__name__,
                "error_type": type(error).__name__,
                "error_message": str(error),
            },
        )

    def _is_cacheable(self) -> bool:
        """Determine if this activity's results should be cached.

        Override this method to enable caching for specific activities.

        Returns:
            bool: True if results should be cached, False otherwise.
        """
        return False

    def _get_cache_key(self, activity_input: T) -> str:
        """Generate a cache key for the given input.

        Override this method to customize cache key generation for activities
        that support caching.

        Args:
            activity_input (T): The activity input data.

        Returns:
            str: The cache key for storing/retrieving results.
        """
        return f"{self.__class__.__name__}:{hash(str(activity_input))}"

    async def _get_from_cache(self, cache_key: str) -> R | None:
        """Retrieve a result from cache using your logic instance.

        Override this method to implement caching using your repository pattern.
        By default, this returns None (no caching).

        Args:
            cache_key (str): The cache key to look up.

        Returns:
            R | None: The cached result if found, None otherwise.
        """
        # Override this method to use your logic instance's repository
        return None

    async def _store_in_cache(self, cache_key: str, result: R, ttl: int = 3600) -> None:
        """Store a result in cache using your logic instance.

        Override this method to implement caching using your repository pattern.
        By default, this does nothing.

        Args:
            cache_key (str): The cache key for storage.
            result (R): The result to cache.
            ttl (int): Time-to-live in seconds. Defaults to 3600 (1 hour).
        """
        # Override this method to use your logic instance's repository

    def _log_activity_event(self, event: str, details: dict[str, Any] | None = None) -> None:
        """Log activity events with consistent formatting.

        Args:
            event (str): The event description.
            details (dict[str, Any], optional): Additional event details.
                Defaults to None.
        """
        log_data = {
            "activity_type": self.__class__.__name__,
            "event": event,
        }

        if details:
            log_data.update(details)

        activity.logger.info("Activity event", extra=log_data)

archipy.adapters.temporal.base.BaseActivity.execute async

execute(activity_input: T) -> R

Main activity execution method with hooks.

This method provides a template for activity execution with pre/post hooks for common concerns like caching, validation, and monitoring.

Parameters:

Name Type Description Default
activity_input T

The input data for the activity.

required

Returns:

Name Type Description
R R

The result of the activity execution.

Raises:

Type Description
Exception

Any exception that occurs during activity execution.

Source code in archipy/adapters/temporal/base.py
@activity.defn
async def execute(self, activity_input: T) -> R:
    """Main activity execution method with hooks.

    This method provides a template for activity execution with pre/post hooks
    for common concerns like caching, validation, and monitoring.

    Args:
        activity_input (T): The input data for the activity.

    Returns:
        R: The result of the activity execution.

    Raises:
        Exception: Any exception that occurs during activity execution.
    """
    try:
        # Pre-execution hook
        await self._before_execute(activity_input)

        # Check cache if enabled
        if self._is_cacheable():
            cache_key = self._get_cache_key(activity_input)
            cached_result = await self._get_from_cache(cache_key)
            if cached_result is not None:
                activity.logger.info("Using cached result", extra={"cache_key": cache_key})
                return cached_result

        # Main business logic execution
        result = await self._do_execute(activity_input)

        # Cache result if enabled
        if self._is_cacheable() and result is not None:
            cache_key = self._get_cache_key(activity_input)
            await self._store_in_cache(cache_key, result)

        # Post-execution hook
        await self._after_execute(activity_input, result)

        return result

    except Exception as error:
        await self._handle_error(activity_input, error)
        raise

archipy.adapters.temporal.base.LogicIntegratedActivity

Bases: BaseActivity[T, R]

Activity base class that enforces the logic layer pattern.

This class provides helper methods that delegate to your logic instance, ensuring all business operations go through your established architecture with a single repository managing adapter access.

Source code in archipy/adapters/temporal/base.py
class LogicIntegratedActivity(BaseActivity[T, R]):
    """Activity base class that enforces the logic layer pattern.

    This class provides helper methods that delegate to your logic instance,
    ensuring all business operations go through your established architecture
    with a single repository managing adapter access.
    """

    async def _execute_with_logic(self, operation_func: str, *args: Any, **kwargs: Any) -> Any:
        """Execute an operation using your logic instance.

        This is a convenience method to call methods on your logic instance.

        Args:
            operation_func (str): The name of the method to call on your logic instance.
            *args (Any): Arguments to pass to the logic method.
            **kwargs (Any): Keyword arguments to pass to the logic method.

        Returns:
            Any: Result of the logic operation.

        Example:
            # Call logic.get_user_by_id(user_id)
            user = await self._execute_with_logic("get_user_by_id", user_id)
        """
        logic = self._get_logic()
        method = getattr(logic, operation_func)
        return await method(*args, **kwargs)

    async def _execute_with_atomic(self, operation_func: str, *args: Any, **kwargs: Any) -> Any:
        """Execute an operation using your logic instance method decorated with @atomic.

        This method assumes your logic methods are decorated with atomic decorators
        for transaction management.

        Args:
            operation_func (str): The name of the method to call on your logic instance.
                This method should be decorated with @atomic for transaction support.
            *args (Any): Arguments to pass to the logic method.
            **kwargs (Any): Keyword arguments to pass to the logic method.

        Returns:
            Any: Result of the atomic operation.

        Example:
            # Call logic.process_order(order_data) - method decorated with @atomic
            result = await self._execute_with_atomic("process_order", order_data)
        """
        logic = self._get_logic()
        method = getattr(logic, operation_func)
        # Method should be decorated with @atomic, so transaction is handled automatically
        return await method(*args, **kwargs)

    async def _call_atomic_method(self, method_name: str, *args: Any, **kwargs: Any) -> Any:
        """Call an atomic method directly on your logic instance.

        This is an alias for _execute_with_atomic for clearer semantic meaning.

        Args:
            method_name (str): The name of the atomic method to call.
            *args (Any): Arguments to pass to the method.
            **kwargs (Any): Keyword arguments to pass to the method.

        Returns:
            Any: Result of the atomic method call.

        Example:
            # Direct call to @atomic decorated method
            result = await self._call_atomic_method("create_order_with_payment", order_data)
        """
        return await self._execute_with_atomic(method_name, *args, **kwargs)

archipy.adapters.temporal.base.LogicIntegratedActivity.execute async

execute(activity_input: T) -> R

Main activity execution method with hooks.

This method provides a template for activity execution with pre/post hooks for common concerns like caching, validation, and monitoring.

Parameters:

Name Type Description Default
activity_input T

The input data for the activity.

required

Returns:

Name Type Description
R R

The result of the activity execution.

Raises:

Type Description
Exception

Any exception that occurs during activity execution.

Source code in archipy/adapters/temporal/base.py
@activity.defn
async def execute(self, activity_input: T) -> R:
    """Main activity execution method with hooks.

    This method provides a template for activity execution with pre/post hooks
    for common concerns like caching, validation, and monitoring.

    Args:
        activity_input (T): The input data for the activity.

    Returns:
        R: The result of the activity execution.

    Raises:
        Exception: Any exception that occurs during activity execution.
    """
    try:
        # Pre-execution hook
        await self._before_execute(activity_input)

        # Check cache if enabled
        if self._is_cacheable():
            cache_key = self._get_cache_key(activity_input)
            cached_result = await self._get_from_cache(cache_key)
            if cached_result is not None:
                activity.logger.info("Using cached result", extra={"cache_key": cache_key})
                return cached_result

        # Main business logic execution
        result = await self._do_execute(activity_input)

        # Cache result if enabled
        if self._is_cacheable() and result is not None:
            cache_key = self._get_cache_key(activity_input)
            await self._store_in_cache(cache_key, result)

        # Post-execution hook
        await self._after_execute(activity_input, result)

        return result

    except Exception as error:
        await self._handle_error(activity_input, error)
        raise

archipy.adapters.temporal.base.AtomicActivity

Bases: BaseActivity[T, R]

Activity base class with built-in atomic transaction support.

This class extends BaseActivity to provide direct atomic transaction support within activity execution, ensuring database consistency during activity operations.

Parameters:

Name Type Description Default
logic Any

Business logic instance with repository access. Defaults to None.

None
db_type str

Database type for atomic operations. Defaults to "postgres".

'postgres'
Source code in archipy/adapters/temporal/base.py
class AtomicActivity(BaseActivity[T, R]):
    """Activity base class with built-in atomic transaction support.

    This class extends BaseActivity to provide direct atomic transaction support
    within activity execution, ensuring database consistency during activity operations.

    Args:
        logic (Any, optional): Business logic instance with repository access. Defaults to None.
        db_type (str, optional): Database type for atomic operations. Defaults to "postgres".
    """

    def __init__(self, logic: Any | None = None, db_type: str = "postgres") -> None:
        """Initialize the atomic activity.

        Args:
            logic (Any, optional): Business logic instance. Defaults to None.
            db_type (str): Database type ("postgres", "sqlite", "starrocks"). Defaults to "postgres".

        Raises:
            ValueError: If an invalid db_type is provided.
        """
        super().__init__(logic)
        if db_type not in ("postgres", "sqlite", "starrocks"):
            raise ValueError(f"Invalid db_type: {db_type}. Must be one of: postgres, sqlite, starrocks")
        self.db_type = db_type

    def _get_atomic_decorator(self) -> Callable[..., Any]:
        """Get the appropriate async atomic decorator for the configured database type.

        Returns:
            Callable: The async atomic decorator function for the configured database.

        Raises:
            ImportError: If SQLAlchemy is not installed and atomic decorators are needed.
        """
        # Lazy import to avoid requiring SQLAlchemy when using temporalio without sqlalchemy extra
        from archipy.helpers.decorators import (
            async_postgres_sqlalchemy_atomic_decorator,
            async_sqlite_sqlalchemy_atomic_decorator,
            async_starrocks_sqlalchemy_atomic_decorator,
        )

        decorators_map = {
            "postgres": async_postgres_sqlalchemy_atomic_decorator,
            "sqlite": async_sqlite_sqlalchemy_atomic_decorator,
            "starrocks": async_starrocks_sqlalchemy_atomic_decorator,
        }
        return decorators_map[self.db_type]

    @activity.defn
    async def execute_atomic(self, activity_input: T) -> R:
        """Execute the activity within a database transaction.

        This method wraps the entire activity execution (including pre/post hooks)
        within a database transaction, ensuring atomicity of all database operations.

        Args:
            activity_input (T): The input data for the activity.

        Returns:
            R: The result of the activity execution.

        Raises:
            Exception: Any exception that occurs during activity execution.
        """
        atomic_decorator = self._get_atomic_decorator()

        @atomic_decorator
        async def _atomic_execute() -> R:
            return await super(AtomicActivity, self).execute(activity_input)

        return await _atomic_execute()

    async def _do_execute_atomic(self, activity_input: T) -> R:
        """Execute main business logic within a database transaction.

        This method provides atomic transaction support for the core business logic only,
        excluding pre/post hooks from the transaction scope.

        Args:
            activity_input (T): The input data for the activity.

        Returns:
            R: The result of the business logic execution.
        """
        atomic_decorator = self._get_atomic_decorator()

        @atomic_decorator
        async def _atomic_do_execute() -> R:
            return await self._do_execute(activity_input)

        return await _atomic_do_execute()

    async def execute_custom_atomic_operation(self, operation: Callable[[], Any]) -> Any:
        """Execute a custom operation within a database transaction.

        This utility method allows executing custom logic within the activity's
        configured atomic transaction context.

        Args:
            operation (Callable[[], T]): The operation to execute atomically.

        Returns:
            Any: The result of the operation.

        Example:
            ```python
            async def custom_logic():
                # Custom database operations
                return await some_database_work()


            result = await self.execute_custom_atomic_operation(custom_logic)
            ```
        """
        atomic_decorator = self._get_atomic_decorator()

        @atomic_decorator
        async def _execute_operation() -> Any:
            return await operation()

        return await _execute_operation()

    def with_db_type(self, db_type: str) -> AtomicActivity[T, R]:
        """Create a new instance with a different database type.

        Args:
            db_type (str): The new database type ("postgres", "sqlite", or "starrocks").

        Returns:
            AtomicActivity[T, R]: New activity instance with the specified database type.

        Raises:
            ValueError: If an invalid db_type is provided.
        """
        # Cannot return concrete type due to abstract method
        raise NotImplementedError("AtomicActivity cannot be instantiated directly")

archipy.adapters.temporal.base.AtomicActivity.db_type instance-attribute

db_type = db_type

archipy.adapters.temporal.base.AtomicActivity.execute_atomic async

execute_atomic(activity_input: T) -> R

Execute the activity within a database transaction.

This method wraps the entire activity execution (including pre/post hooks) within a database transaction, ensuring atomicity of all database operations.

Parameters:

Name Type Description Default
activity_input T

The input data for the activity.

required

Returns:

Name Type Description
R R

The result of the activity execution.

Raises:

Type Description
Exception

Any exception that occurs during activity execution.

Source code in archipy/adapters/temporal/base.py
@activity.defn
async def execute_atomic(self, activity_input: T) -> R:
    """Execute the activity within a database transaction.

    This method wraps the entire activity execution (including pre/post hooks)
    within a database transaction, ensuring atomicity of all database operations.

    Args:
        activity_input (T): The input data for the activity.

    Returns:
        R: The result of the activity execution.

    Raises:
        Exception: Any exception that occurs during activity execution.
    """
    atomic_decorator = self._get_atomic_decorator()

    @atomic_decorator
    async def _atomic_execute() -> R:
        return await super(AtomicActivity, self).execute(activity_input)

    return await _atomic_execute()

archipy.adapters.temporal.base.AtomicActivity.execute_custom_atomic_operation async

execute_custom_atomic_operation(
    operation: Callable[[], Any],
) -> Any

Execute a custom operation within a database transaction.

This utility method allows executing custom logic within the activity's configured atomic transaction context.

Parameters:

Name Type Description Default
operation Callable[[], T]

The operation to execute atomically.

required

Returns:

Name Type Description
Any Any

The result of the operation.

Example
async def custom_logic():
    # Custom database operations
    return await some_database_work()


result = await self.execute_custom_atomic_operation(custom_logic)
Source code in archipy/adapters/temporal/base.py
async def execute_custom_atomic_operation(self, operation: Callable[[], Any]) -> Any:
    """Execute a custom operation within a database transaction.

    This utility method allows executing custom logic within the activity's
    configured atomic transaction context.

    Args:
        operation (Callable[[], T]): The operation to execute atomically.

    Returns:
        Any: The result of the operation.

    Example:
        ```python
        async def custom_logic():
            # Custom database operations
            return await some_database_work()


        result = await self.execute_custom_atomic_operation(custom_logic)
        ```
    """
    atomic_decorator = self._get_atomic_decorator()

    @atomic_decorator
    async def _execute_operation() -> Any:
        return await operation()

    return await _execute_operation()

archipy.adapters.temporal.base.AtomicActivity.with_db_type

with_db_type(db_type: str) -> AtomicActivity[T, R]

Create a new instance with a different database type.

Parameters:

Name Type Description Default
db_type str

The new database type ("postgres", "sqlite", or "starrocks").

required

Returns:

Type Description
AtomicActivity[T, R]

AtomicActivity[T, R]: New activity instance with the specified database type.

Raises:

Type Description
ValueError

If an invalid db_type is provided.

Source code in archipy/adapters/temporal/base.py
def with_db_type(self, db_type: str) -> AtomicActivity[T, R]:
    """Create a new instance with a different database type.

    Args:
        db_type (str): The new database type ("postgres", "sqlite", or "starrocks").

    Returns:
        AtomicActivity[T, R]: New activity instance with the specified database type.

    Raises:
        ValueError: If an invalid db_type is provided.
    """
    # Cannot return concrete type due to abstract method
    raise NotImplementedError("AtomicActivity cannot be instantiated directly")

archipy.adapters.temporal.base.AtomicActivity.execute async

execute(activity_input: T) -> R

Main activity execution method with hooks.

This method provides a template for activity execution with pre/post hooks for common concerns like caching, validation, and monitoring.

Parameters:

Name Type Description Default
activity_input T

The input data for the activity.

required

Returns:

Name Type Description
R R

The result of the activity execution.

Raises:

Type Description
Exception

Any exception that occurs during activity execution.

Source code in archipy/adapters/temporal/base.py
@activity.defn
async def execute(self, activity_input: T) -> R:
    """Main activity execution method with hooks.

    This method provides a template for activity execution with pre/post hooks
    for common concerns like caching, validation, and monitoring.

    Args:
        activity_input (T): The input data for the activity.

    Returns:
        R: The result of the activity execution.

    Raises:
        Exception: Any exception that occurs during activity execution.
    """
    try:
        # Pre-execution hook
        await self._before_execute(activity_input)

        # Check cache if enabled
        if self._is_cacheable():
            cache_key = self._get_cache_key(activity_input)
            cached_result = await self._get_from_cache(cache_key)
            if cached_result is not None:
                activity.logger.info("Using cached result", extra={"cache_key": cache_key})
                return cached_result

        # Main business logic execution
        result = await self._do_execute(activity_input)

        # Cache result if enabled
        if self._is_cacheable() and result is not None:
            cache_key = self._get_cache_key(activity_input)
            await self._store_in_cache(cache_key, result)

        # Post-execution hook
        await self._after_execute(activity_input, result)

        return result

    except Exception as error:
        await self._handle_error(activity_input, error)
        raise

options: show_root_toc_entry: false heading_level: 3

Adapters

Concrete Temporal adapter implementing workflow and activity client operations.

Temporal adapter implementation for workflow orchestration.

This module provides concrete implementations of the Temporal port interfaces, integrating with the Temporal workflow engine while following ArchiPy patterns and conventions.

archipy.adapters.temporal.adapters.T module-attribute

T = TypeVar('T')

archipy.adapters.temporal.adapters.TemporalAdapter

Bases: TemporalPort

Temporal workflow adapter implementing the TemporalPort interface.

This adapter provides a standardized interface for interacting with Temporal workflow orchestration services, following ArchiPy architecture patterns. It handles client connections, TLS configuration, and workflow lifecycle management.

Parameters:

Name Type Description Default
temporal_config TemporalConfig

Configuration settings for Temporal. If None, retrieves from global config. Defaults to None.

None
Source code in archipy/adapters/temporal/adapters.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
class TemporalAdapter(TemporalPort):
    """Temporal workflow adapter implementing the TemporalPort interface.

    This adapter provides a standardized interface for interacting with Temporal
    workflow orchestration services, following ArchiPy architecture patterns.
    It handles client connections, TLS configuration, and workflow lifecycle
    management.

    Args:
        temporal_config (TemporalConfig, optional): Configuration settings for Temporal.
            If None, retrieves from global config. Defaults to None.
    """

    def __init__(self, temporal_config: TemporalConfig | None = None) -> None:
        """Initialize the TemporalAdapter with configuration settings.

        Args:
            temporal_config (TemporalConfig, optional): Configuration settings for Temporal.
                If None, retrieves from global config. Defaults to None.
        """
        # Get temporal config from the global config or use provided one
        if temporal_config is None:
            global_config = BaseConfig.global_config()
            if hasattr(global_config, "TEMPORAL"):
                self.config = global_config.TEMPORAL
            else:
                # Create a default config if none exists
                from archipy.configs.config_template import TemporalConfig

                self.config = TemporalConfig()
        else:
            self.config = temporal_config
        self._client: Client | None = None

    async def get_client(self) -> Client:
        """Get or create the Temporal client connection.

        Returns:
            Client: The Temporal client instance.

        Raises:
            ConnectionError: If unable to connect to Temporal server.
        """
        if self._client is None:
            try:
                # Build connection kwargs, only including tls if configured
                connect_kwargs: dict[str, Any] = {
                    "namespace": self.config.NAMESPACE,
                }
                if self._has_tls_config():
                    tls_config = self._build_tls_config()
                    connect_kwargs["tls"] = tls_config

                # Configure Runtime with Prometheus telemetry if enabled
                if self.config.ENABLE_METRICS:
                    runtime_manager = TemporalRuntimeManager()
                    runtime = runtime_manager.get_runtime(
                        prometheus_enabled=True,
                        prometheus_port=self.config.METRICS_PORT,
                    )
                    if runtime is not None:
                        connect_kwargs["runtime"] = runtime

                self._client = await Client.connect(
                    f"{self.config.HOST}:{self.config.PORT}",
                    **connect_kwargs,
                )
            except Exception as error:
                raise BaseError(
                    additional_data={
                        "server": f"{self.config.HOST}:{self.config.PORT}",
                        "namespace": self.config.NAMESPACE,
                        "original_error": str(error),
                    },
                ) from error

        return self._client

    def _has_tls_config(self) -> bool:
        """Check if TLS configuration is provided.

        Returns:
            bool: True if TLS configuration is complete, False otherwise.
        """
        return all(
            [
                self.config.TLS_CA_CERT,
                self.config.TLS_CLIENT_CERT,
                self.config.TLS_CLIENT_KEY,
            ],
        )

    def _build_tls_config(self) -> TLSConfig:
        """Build TLS configuration for secure connections.

        Returns:
            TLSConfig: The TLS configuration object.

        Raises:
            InvalidArgumentError: If TLS configuration is incomplete.
        """
        if not self._has_tls_config():
            raise InvalidArgumentError(
                additional_data={
                    "ca_cert": bool(self.config.TLS_CA_CERT),
                    "client_cert": bool(self.config.TLS_CLIENT_CERT),
                    "client_key": bool(self.config.TLS_CLIENT_KEY),
                },
            )

        try:
            if self.config.TLS_CA_CERT is None:
                raise InvalidArgumentError(additional_data={"error": "TLS_CA_CERT is required but not set"})
            ca_cert_path: str = self.config.TLS_CA_CERT
            ca_cert_data = Path(ca_cert_path).read_bytes()

            client_cert_data = None
            client_key_data = None

            if self.config.TLS_CLIENT_CERT:
                client_cert_path: str = self.config.TLS_CLIENT_CERT
                client_cert_data = Path(client_cert_path).read_bytes()

            if self.config.TLS_CLIENT_KEY:
                client_key_path: str = self.config.TLS_CLIENT_KEY
                client_key_data = Path(client_key_path).read_bytes()

            return TLSConfig(
                server_root_ca_cert=ca_cert_data,
                client_cert=client_cert_data,
                client_private_key=client_key_data,
            )
        except OSError as error:
            raise InvalidArgumentError(additional_data={"original_error": str(error)}) from error

    def _build_retry_policy(self) -> RetryPolicy:
        """Build default retry policy from configuration.

        Returns:
            RetryPolicy: The configured retry policy.
        """
        return RetryPolicy(
            maximum_attempts=self.config.RETRY_MAXIMUM_ATTEMPTS,
            backoff_coefficient=self.config.RETRY_BACKOFF_COEFFICIENT,
            maximum_interval=timedelta(seconds=self.config.RETRY_MAXIMUM_INTERVAL),
        )

    @override
    async def start_workflow(
        self,
        workflow: str | Callable,
        arg: Any = None,
        workflow_id: str | None = None,
        task_queue: str | None = None,
        execution_timeout: int | None = None,
        run_timeout: int | None = None,
        task_timeout: int | None = None,
        memo: dict[str, Any] | None = None,
        search_attributes: dict[str, Any] | None = None,
    ) -> WorkflowHandle[T, Any]:
        """Start a workflow execution asynchronously.

        Args:
            workflow (str | Callable): The workflow function or workflow type name.
            arg (Any, optional): Input argument for the workflow. Defaults to None.
            workflow_id (str, optional): Unique identifier for the workflow execution.
                If None, a UUID will be generated. Defaults to None.
            task_queue (str, optional): Task queue name for workflow execution.
                If None, uses the default task queue. Defaults to None.
            execution_timeout (int, optional): Maximum workflow execution time in seconds.
                Overrides config default. Defaults to None.
            run_timeout (int, optional): Maximum single workflow run time in seconds.
                Overrides config default. Defaults to None.
            task_timeout (int, optional): Maximum workflow task processing time in seconds.
                Overrides config default. Defaults to None.
            memo (dict[str, Any], optional): Non-indexed metadata for the workflow.
                Defaults to None.
            search_attributes (dict[str, Any], optional): Indexed metadata for workflow search.
                Defaults to None.

        Returns:
            WorkflowHandle[T, Any]: Handle to the started workflow execution.
        """
        client = await self.get_client()

        workflow_id = workflow_id or str(uuid4())
        task_queue = task_queue or self.config.TASK_QUEUE

        # Build positional args: only include arg if it's not None,
        # so workflows that take no parameters aren't given an extra argument.
        positional_args: list[Any] = [] if arg is None else [arg]

        return await client.start_workflow(
            workflow,
            *positional_args,
            id=workflow_id,
            task_queue=task_queue,
            execution_timeout=timedelta(seconds=execution_timeout or self.config.WORKFLOW_EXECUTION_TIMEOUT),
            run_timeout=timedelta(seconds=run_timeout or self.config.WORKFLOW_RUN_TIMEOUT),
            task_timeout=timedelta(seconds=task_timeout or self.config.WORKFLOW_TASK_TIMEOUT),
            retry_policy=self._build_retry_policy(),
            memo=memo,
            search_attributes=search_attributes,
        )

    @override
    async def execute_workflow(
        self,
        workflow: str | Callable,
        arg: Any = None,
        workflow_id: str | None = None,
        task_queue: str | None = None,
        execution_timeout: int | None = None,
        run_timeout: int | None = None,
        task_timeout: int | None = None,
    ) -> T:
        """Execute a workflow and wait for its completion.

        Args:
            workflow (str | Callable): The workflow function or workflow type name.
            arg (Any, optional): Input argument for the workflow. Defaults to None.
            workflow_id (str, optional): Unique identifier for the workflow execution.
                If None, a UUID will be generated. Defaults to None.
            task_queue (str, optional): Task queue name for workflow execution.
                If None, uses the default task queue. Defaults to None.
            execution_timeout (int, optional): Maximum workflow execution time in seconds.
                Overrides config default. Defaults to None.
            run_timeout (int, optional): Maximum single workflow run time in seconds.
                Overrides config default. Defaults to None.
            task_timeout (int, optional): Maximum workflow task processing time in seconds.
                Overrides config default. Defaults to None.

        Returns:
            T: The workflow execution result.
        """
        client = await self.get_client()

        workflow_id = workflow_id or str(uuid4())
        task_queue = task_queue or self.config.TASK_QUEUE

        # Build positional args: only include arg if it's not None,
        # so workflows that take no parameters aren't given an extra argument.
        positional_args: list[Any] = [] if arg is None else [arg]

        return await client.execute_workflow(
            workflow,
            *positional_args,
            id=workflow_id,
            task_queue=task_queue,
            execution_timeout=timedelta(seconds=execution_timeout or self.config.WORKFLOW_EXECUTION_TIMEOUT),
            run_timeout=timedelta(seconds=run_timeout or self.config.WORKFLOW_RUN_TIMEOUT),
            task_timeout=timedelta(seconds=task_timeout or self.config.WORKFLOW_TASK_TIMEOUT),
            retry_policy=self._build_retry_policy(),
        )

    @override
    async def get_workflow_handle(self, workflow_id: str, run_id: str | None = None) -> WorkflowHandle[T, Any]:
        """Get a handle to an existing workflow execution.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, gets the latest run. Defaults to None.

        Returns:
            WorkflowHandle[T, Any]: Handle to the workflow execution.
        """
        client = await self.get_client()
        return client.get_workflow_handle(workflow_id, run_id=run_id)

    @override
    async def cancel_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = None) -> None:
        """Cancel a running workflow execution.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, cancels the latest run. Defaults to None.
            reason (str, optional): Reason for cancellation. Defaults to None.
        """
        handle = await self.get_workflow_handle(workflow_id, run_id)
        await handle.cancel()

    @override
    async def terminate_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = None) -> None:
        """Terminate a running workflow execution immediately.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, terminates the latest run. Defaults to None.
            reason (str, optional): Reason for termination. Defaults to None.
        """
        handle = await self.get_workflow_handle(workflow_id, run_id)
        await handle.terminate(reason=reason)

    @override
    async def signal_workflow(
        self,
        workflow_id: str,
        signal_name: str,
        arg: Any = None,
        run_id: str | None = None,
    ) -> None:
        """Send a signal to a running workflow execution.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            signal_name (str): The name of the signal to send.
            arg (Any, optional): Argument to pass with the signal. Defaults to None.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, signals the latest run. Defaults to None.
        """
        handle = await self.get_workflow_handle(workflow_id, run_id)
        # Only pass arg if it's not None, so signal handlers with no parameters work correctly.
        positional_args: list[Any] = [] if arg is None else [arg]
        await handle.signal(signal_name, *positional_args)

    @override
    async def query_workflow(
        self,
        workflow_id: str,
        query_name: str,
        arg: Any = None,
        run_id: str | None = None,
    ) -> Any:
        """Query a running workflow execution for information.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            query_name (str): The name of the query to execute.
            arg (Any, optional): Argument to pass with the query. Defaults to None.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, queries the latest run. Defaults to None.

        Returns:
            Any: The query result from the workflow.
        """
        handle = await self.get_workflow_handle(workflow_id, run_id)
        # Only pass arg if it's not None, so query handlers with no parameters work correctly.
        positional_args: list[Any] = [] if arg is None else [arg]
        return await handle.query(query_name, *positional_args)

    @override
    async def list_workflows(
        self,
        query: str | None = None,
        page_size: int | None = None,
        next_page_token: bytes | None = None,
    ) -> Any:
        """List workflow executions matching the given criteria.

        Args:
            query (str, optional): List filter query in Temporal SQL syntax.
                Defaults to None (no filter).
            page_size (int, optional): Maximum number of results per page.
                Defaults to None (server default).
            next_page_token (bytes, optional): Token for pagination.
                Defaults to None (first page).

        Returns:
            Any: List of workflow executions with pagination info.
        """
        client = await self.get_client()
        # list_workflows returns an async iterator, not awaitable
        workflows_iter = client.list_workflows(
            query=query,
            page_size=page_size or 100,
            next_page_token=next_page_token,
        )
        # Convert to list for compatibility
        return [workflow async for workflow in workflows_iter]

    @override
    async def describe_workflow(self, workflow_id: str, run_id: str | None = None) -> Any:
        """Get detailed information about a workflow execution.

        Args:
            workflow_id (str): The unique identifier of the workflow execution.
            run_id (str, optional): The specific run identifier within the workflow.
                If None, describes the latest run. Defaults to None.

        Returns:
            Any: Detailed workflow execution information.
        """
        handle = await self.get_workflow_handle(workflow_id, run_id)
        return await handle.describe()

    @override
    async def close(self) -> None:
        """Close the Temporal client connection.

        Performs cleanup of resources and closes the connection to the Temporal server.
        Should be called when the adapter is no longer needed.
        """
        if self._client:
            # Temporal client doesn't have a close method, just clear the reference
            self._client = None

    @override
    async def create_schedule(
        self,
        schedule_id: str,
        workflow_class: Any,
        spec: ScheduleSpec,
        task_queue: str,
        workflow_id: str | None = None,
        schedule_policy: SchedulePolicy | None = None,
    ) -> None:
        """Create a schedule for a workflow."""
        client = await self.get_client()

        workflow_execution_id = workflow_id or schedule_id
        sched = Schedule(
            action=ScheduleActionStartWorkflow(
                workflow=workflow_class,
                id=workflow_execution_id,
                task_queue=task_queue,
            ),
            spec=spec,
            policy=schedule_policy
            or SchedulePolicy(
                overlap=ScheduleOverlapPolicy.SKIP,
            ),
        )

        await client.create_schedule(schedule_id, sched)

    @override
    async def stop_schedule(self, schedule_id: str) -> None:
        """Stop a schedule."""
        client = await self.get_client()
        handle = client.get_schedule_handle(schedule_id)
        await handle.delete()

archipy.adapters.temporal.adapters.TemporalAdapter.config instance-attribute

config = TEMPORAL

archipy.adapters.temporal.adapters.TemporalAdapter.get_client async

get_client() -> Client

Get or create the Temporal client connection.

Returns:

Name Type Description
Client Client

The Temporal client instance.

Raises:

Type Description
ConnectionError

If unable to connect to Temporal server.

Source code in archipy/adapters/temporal/adapters.py
async def get_client(self) -> Client:
    """Get or create the Temporal client connection.

    Returns:
        Client: The Temporal client instance.

    Raises:
        ConnectionError: If unable to connect to Temporal server.
    """
    if self._client is None:
        try:
            # Build connection kwargs, only including tls if configured
            connect_kwargs: dict[str, Any] = {
                "namespace": self.config.NAMESPACE,
            }
            if self._has_tls_config():
                tls_config = self._build_tls_config()
                connect_kwargs["tls"] = tls_config

            # Configure Runtime with Prometheus telemetry if enabled
            if self.config.ENABLE_METRICS:
                runtime_manager = TemporalRuntimeManager()
                runtime = runtime_manager.get_runtime(
                    prometheus_enabled=True,
                    prometheus_port=self.config.METRICS_PORT,
                )
                if runtime is not None:
                    connect_kwargs["runtime"] = runtime

            self._client = await Client.connect(
                f"{self.config.HOST}:{self.config.PORT}",
                **connect_kwargs,
            )
        except Exception as error:
            raise BaseError(
                additional_data={
                    "server": f"{self.config.HOST}:{self.config.PORT}",
                    "namespace": self.config.NAMESPACE,
                    "original_error": str(error),
                },
            ) from error

    return self._client

archipy.adapters.temporal.adapters.TemporalAdapter.start_workflow async

start_workflow(
    workflow: str | Callable,
    arg: Any = None,
    workflow_id: str | None = None,
    task_queue: str | None = None,
    execution_timeout: int | None = None,
    run_timeout: int | None = None,
    task_timeout: int | None = None,
    memo: dict[str, Any] | None = None,
    search_attributes: dict[str, Any] | None = None,
) -> WorkflowHandle[T, Any]

Start a workflow execution asynchronously.

Parameters:

Name Type Description Default
workflow str | Callable

The workflow function or workflow type name.

required
arg Any

Input argument for the workflow. Defaults to None.

None
workflow_id str

Unique identifier for the workflow execution. If None, a UUID will be generated. Defaults to None.

None
task_queue str

Task queue name for workflow execution. If None, uses the default task queue. Defaults to None.

None
execution_timeout int

Maximum workflow execution time in seconds. Overrides config default. Defaults to None.

None
run_timeout int

Maximum single workflow run time in seconds. Overrides config default. Defaults to None.

None
task_timeout int

Maximum workflow task processing time in seconds. Overrides config default. Defaults to None.

None
memo dict[str, Any]

Non-indexed metadata for the workflow. Defaults to None.

None
search_attributes dict[str, Any]

Indexed metadata for workflow search. Defaults to None.

None

Returns:

Type Description
WorkflowHandle[T, Any]

WorkflowHandle[T, Any]: Handle to the started workflow execution.

Source code in archipy/adapters/temporal/adapters.py
@override
async def start_workflow(
    self,
    workflow: str | Callable,
    arg: Any = None,
    workflow_id: str | None = None,
    task_queue: str | None = None,
    execution_timeout: int | None = None,
    run_timeout: int | None = None,
    task_timeout: int | None = None,
    memo: dict[str, Any] | None = None,
    search_attributes: dict[str, Any] | None = None,
) -> WorkflowHandle[T, Any]:
    """Start a workflow execution asynchronously.

    Args:
        workflow (str | Callable): The workflow function or workflow type name.
        arg (Any, optional): Input argument for the workflow. Defaults to None.
        workflow_id (str, optional): Unique identifier for the workflow execution.
            If None, a UUID will be generated. Defaults to None.
        task_queue (str, optional): Task queue name for workflow execution.
            If None, uses the default task queue. Defaults to None.
        execution_timeout (int, optional): Maximum workflow execution time in seconds.
            Overrides config default. Defaults to None.
        run_timeout (int, optional): Maximum single workflow run time in seconds.
            Overrides config default. Defaults to None.
        task_timeout (int, optional): Maximum workflow task processing time in seconds.
            Overrides config default. Defaults to None.
        memo (dict[str, Any], optional): Non-indexed metadata for the workflow.
            Defaults to None.
        search_attributes (dict[str, Any], optional): Indexed metadata for workflow search.
            Defaults to None.

    Returns:
        WorkflowHandle[T, Any]: Handle to the started workflow execution.
    """
    client = await self.get_client()

    workflow_id = workflow_id or str(uuid4())
    task_queue = task_queue or self.config.TASK_QUEUE

    # Build positional args: only include arg if it's not None,
    # so workflows that take no parameters aren't given an extra argument.
    positional_args: list[Any] = [] if arg is None else [arg]

    return await client.start_workflow(
        workflow,
        *positional_args,
        id=workflow_id,
        task_queue=task_queue,
        execution_timeout=timedelta(seconds=execution_timeout or self.config.WORKFLOW_EXECUTION_TIMEOUT),
        run_timeout=timedelta(seconds=run_timeout or self.config.WORKFLOW_RUN_TIMEOUT),
        task_timeout=timedelta(seconds=task_timeout or self.config.WORKFLOW_TASK_TIMEOUT),
        retry_policy=self._build_retry_policy(),
        memo=memo,
        search_attributes=search_attributes,
    )

archipy.adapters.temporal.adapters.TemporalAdapter.execute_workflow async

execute_workflow(
    workflow: str | Callable,
    arg: Any = None,
    workflow_id: str | None = None,
    task_queue: str | None = None,
    execution_timeout: int | None = None,
    run_timeout: int | None = None,
    task_timeout: int | None = None,
) -> T

Execute a workflow and wait for its completion.

Parameters:

Name Type Description Default
workflow str | Callable

The workflow function or workflow type name.

required
arg Any

Input argument for the workflow. Defaults to None.

None
workflow_id str

Unique identifier for the workflow execution. If None, a UUID will be generated. Defaults to None.

None
task_queue str

Task queue name for workflow execution. If None, uses the default task queue. Defaults to None.

None
execution_timeout int

Maximum workflow execution time in seconds. Overrides config default. Defaults to None.

None
run_timeout int

Maximum single workflow run time in seconds. Overrides config default. Defaults to None.

None
task_timeout int

Maximum workflow task processing time in seconds. Overrides config default. Defaults to None.

None

Returns:

Name Type Description
T T

The workflow execution result.

Source code in archipy/adapters/temporal/adapters.py
@override
async def execute_workflow(
    self,
    workflow: str | Callable,
    arg: Any = None,
    workflow_id: str | None = None,
    task_queue: str | None = None,
    execution_timeout: int | None = None,
    run_timeout: int | None = None,
    task_timeout: int | None = None,
) -> T:
    """Execute a workflow and wait for its completion.

    Args:
        workflow (str | Callable): The workflow function or workflow type name.
        arg (Any, optional): Input argument for the workflow. Defaults to None.
        workflow_id (str, optional): Unique identifier for the workflow execution.
            If None, a UUID will be generated. Defaults to None.
        task_queue (str, optional): Task queue name for workflow execution.
            If None, uses the default task queue. Defaults to None.
        execution_timeout (int, optional): Maximum workflow execution time in seconds.
            Overrides config default. Defaults to None.
        run_timeout (int, optional): Maximum single workflow run time in seconds.
            Overrides config default. Defaults to None.
        task_timeout (int, optional): Maximum workflow task processing time in seconds.
            Overrides config default. Defaults to None.

    Returns:
        T: The workflow execution result.
    """
    client = await self.get_client()

    workflow_id = workflow_id or str(uuid4())
    task_queue = task_queue or self.config.TASK_QUEUE

    # Build positional args: only include arg if it's not None,
    # so workflows that take no parameters aren't given an extra argument.
    positional_args: list[Any] = [] if arg is None else [arg]

    return await client.execute_workflow(
        workflow,
        *positional_args,
        id=workflow_id,
        task_queue=task_queue,
        execution_timeout=timedelta(seconds=execution_timeout or self.config.WORKFLOW_EXECUTION_TIMEOUT),
        run_timeout=timedelta(seconds=run_timeout or self.config.WORKFLOW_RUN_TIMEOUT),
        task_timeout=timedelta(seconds=task_timeout or self.config.WORKFLOW_TASK_TIMEOUT),
        retry_policy=self._build_retry_policy(),
    )

archipy.adapters.temporal.adapters.TemporalAdapter.get_workflow_handle async

get_workflow_handle(
    workflow_id: str, run_id: str | None = None
) -> WorkflowHandle[T, Any]

Get a handle to an existing workflow execution.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
run_id str

The specific run identifier within the workflow. If None, gets the latest run. Defaults to None.

None

Returns:

Type Description
WorkflowHandle[T, Any]

WorkflowHandle[T, Any]: Handle to the workflow execution.

Source code in archipy/adapters/temporal/adapters.py
@override
async def get_workflow_handle(self, workflow_id: str, run_id: str | None = None) -> WorkflowHandle[T, Any]:
    """Get a handle to an existing workflow execution.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, gets the latest run. Defaults to None.

    Returns:
        WorkflowHandle[T, Any]: Handle to the workflow execution.
    """
    client = await self.get_client()
    return client.get_workflow_handle(workflow_id, run_id=run_id)

archipy.adapters.temporal.adapters.TemporalAdapter.cancel_workflow async

cancel_workflow(
    workflow_id: str,
    run_id: str | None = None,
    reason: str | None = None,
) -> None

Cancel a running workflow execution.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
run_id str

The specific run identifier within the workflow. If None, cancels the latest run. Defaults to None.

None
reason str

Reason for cancellation. Defaults to None.

None
Source code in archipy/adapters/temporal/adapters.py
@override
async def cancel_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = None) -> None:
    """Cancel a running workflow execution.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, cancels the latest run. Defaults to None.
        reason (str, optional): Reason for cancellation. Defaults to None.
    """
    handle = await self.get_workflow_handle(workflow_id, run_id)
    await handle.cancel()

archipy.adapters.temporal.adapters.TemporalAdapter.terminate_workflow async

terminate_workflow(
    workflow_id: str,
    run_id: str | None = None,
    reason: str | None = None,
) -> None

Terminate a running workflow execution immediately.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
run_id str

The specific run identifier within the workflow. If None, terminates the latest run. Defaults to None.

None
reason str

Reason for termination. Defaults to None.

None
Source code in archipy/adapters/temporal/adapters.py
@override
async def terminate_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = None) -> None:
    """Terminate a running workflow execution immediately.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, terminates the latest run. Defaults to None.
        reason (str, optional): Reason for termination. Defaults to None.
    """
    handle = await self.get_workflow_handle(workflow_id, run_id)
    await handle.terminate(reason=reason)

archipy.adapters.temporal.adapters.TemporalAdapter.signal_workflow async

signal_workflow(
    workflow_id: str,
    signal_name: str,
    arg: Any = None,
    run_id: str | None = None,
) -> None

Send a signal to a running workflow execution.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
signal_name str

The name of the signal to send.

required
arg Any

Argument to pass with the signal. Defaults to None.

None
run_id str

The specific run identifier within the workflow. If None, signals the latest run. Defaults to None.

None
Source code in archipy/adapters/temporal/adapters.py
@override
async def signal_workflow(
    self,
    workflow_id: str,
    signal_name: str,
    arg: Any = None,
    run_id: str | None = None,
) -> None:
    """Send a signal to a running workflow execution.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        signal_name (str): The name of the signal to send.
        arg (Any, optional): Argument to pass with the signal. Defaults to None.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, signals the latest run. Defaults to None.
    """
    handle = await self.get_workflow_handle(workflow_id, run_id)
    # Only pass arg if it's not None, so signal handlers with no parameters work correctly.
    positional_args: list[Any] = [] if arg is None else [arg]
    await handle.signal(signal_name, *positional_args)

archipy.adapters.temporal.adapters.TemporalAdapter.query_workflow async

query_workflow(
    workflow_id: str,
    query_name: str,
    arg: Any = None,
    run_id: str | None = None,
) -> Any

Query a running workflow execution for information.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
query_name str

The name of the query to execute.

required
arg Any

Argument to pass with the query. Defaults to None.

None
run_id str

The specific run identifier within the workflow. If None, queries the latest run. Defaults to None.

None

Returns:

Name Type Description
Any Any

The query result from the workflow.

Source code in archipy/adapters/temporal/adapters.py
@override
async def query_workflow(
    self,
    workflow_id: str,
    query_name: str,
    arg: Any = None,
    run_id: str | None = None,
) -> Any:
    """Query a running workflow execution for information.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        query_name (str): The name of the query to execute.
        arg (Any, optional): Argument to pass with the query. Defaults to None.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, queries the latest run. Defaults to None.

    Returns:
        Any: The query result from the workflow.
    """
    handle = await self.get_workflow_handle(workflow_id, run_id)
    # Only pass arg if it's not None, so query handlers with no parameters work correctly.
    positional_args: list[Any] = [] if arg is None else [arg]
    return await handle.query(query_name, *positional_args)

archipy.adapters.temporal.adapters.TemporalAdapter.list_workflows async

list_workflows(
    query: str | None = None,
    page_size: int | None = None,
    next_page_token: bytes | None = None,
) -> Any

List workflow executions matching the given criteria.

Parameters:

Name Type Description Default
query str

List filter query in Temporal SQL syntax. Defaults to None (no filter).

None
page_size int

Maximum number of results per page. Defaults to None (server default).

None
next_page_token bytes

Token for pagination. Defaults to None (first page).

None

Returns:

Name Type Description
Any Any

List of workflow executions with pagination info.

Source code in archipy/adapters/temporal/adapters.py
@override
async def list_workflows(
    self,
    query: str | None = None,
    page_size: int | None = None,
    next_page_token: bytes | None = None,
) -> Any:
    """List workflow executions matching the given criteria.

    Args:
        query (str, optional): List filter query in Temporal SQL syntax.
            Defaults to None (no filter).
        page_size (int, optional): Maximum number of results per page.
            Defaults to None (server default).
        next_page_token (bytes, optional): Token for pagination.
            Defaults to None (first page).

    Returns:
        Any: List of workflow executions with pagination info.
    """
    client = await self.get_client()
    # list_workflows returns an async iterator, not awaitable
    workflows_iter = client.list_workflows(
        query=query,
        page_size=page_size or 100,
        next_page_token=next_page_token,
    )
    # Convert to list for compatibility
    return [workflow async for workflow in workflows_iter]

archipy.adapters.temporal.adapters.TemporalAdapter.describe_workflow async

describe_workflow(
    workflow_id: str, run_id: str | None = None
) -> Any

Get detailed information about a workflow execution.

Parameters:

Name Type Description Default
workflow_id str

The unique identifier of the workflow execution.

required
run_id str

The specific run identifier within the workflow. If None, describes the latest run. Defaults to None.

None

Returns:

Name Type Description
Any Any

Detailed workflow execution information.

Source code in archipy/adapters/temporal/adapters.py
@override
async def describe_workflow(self, workflow_id: str, run_id: str | None = None) -> Any:
    """Get detailed information about a workflow execution.

    Args:
        workflow_id (str): The unique identifier of the workflow execution.
        run_id (str, optional): The specific run identifier within the workflow.
            If None, describes the latest run. Defaults to None.

    Returns:
        Any: Detailed workflow execution information.
    """
    handle = await self.get_workflow_handle(workflow_id, run_id)
    return await handle.describe()

archipy.adapters.temporal.adapters.TemporalAdapter.close async

close() -> None

Close the Temporal client connection.

Performs cleanup of resources and closes the connection to the Temporal server. Should be called when the adapter is no longer needed.

Source code in archipy/adapters/temporal/adapters.py
@override
async def close(self) -> None:
    """Close the Temporal client connection.

    Performs cleanup of resources and closes the connection to the Temporal server.
    Should be called when the adapter is no longer needed.
    """
    if self._client:
        # Temporal client doesn't have a close method, just clear the reference
        self._client = None

archipy.adapters.temporal.adapters.TemporalAdapter.create_schedule async

create_schedule(
    schedule_id: str,
    workflow_class: Any,
    spec: ScheduleSpec,
    task_queue: str,
    workflow_id: str | None = None,
    schedule_policy: SchedulePolicy | None = None,
) -> None

Create a schedule for a workflow.

Source code in archipy/adapters/temporal/adapters.py
@override
async def create_schedule(
    self,
    schedule_id: str,
    workflow_class: Any,
    spec: ScheduleSpec,
    task_queue: str,
    workflow_id: str | None = None,
    schedule_policy: SchedulePolicy | None = None,
) -> None:
    """Create a schedule for a workflow."""
    client = await self.get_client()

    workflow_execution_id = workflow_id or schedule_id
    sched = Schedule(
        action=ScheduleActionStartWorkflow(
            workflow=workflow_class,
            id=workflow_execution_id,
            task_queue=task_queue,
        ),
        spec=spec,
        policy=schedule_policy
        or SchedulePolicy(
            overlap=ScheduleOverlapPolicy.SKIP,
        ),
    )

    await client.create_schedule(schedule_id, sched)

archipy.adapters.temporal.adapters.TemporalAdapter.stop_schedule async

stop_schedule(schedule_id: str) -> None

Stop a schedule.

Source code in archipy/adapters/temporal/adapters.py
@override
async def stop_schedule(self, schedule_id: str) -> None:
    """Stop a schedule."""
    client = await self.get_client()
    handle = client.get_schedule_handle(schedule_id)
    await handle.delete()

options: show_root_toc_entry: false heading_level: 3

Runtime

Temporal runtime configuration and initialization utilities.

Temporal Runtime singleton for managing Runtime instances with telemetry.

This module provides a singleton class for creating and managing Temporal Runtime instances with Prometheus metrics integration.

archipy.adapters.temporal.runtime.logger module-attribute

logger = getLogger(__name__)

archipy.adapters.temporal.runtime.TemporalRuntimeManager

Singleton manager for Temporal Runtime instances with telemetry configuration.

This class ensures only one Runtime instance is created and reused across all Temporal clients and workers. Once created with metrics enabled, the Runtime cannot be changed (Temporal SDK limitation).

Example
from archipy.adapters.temporal.runtime import TemporalRuntimeManager

# Get the singleton manager
manager = TemporalRuntimeManager()

# Get Runtime with Prometheus enabled
runtime = manager.get_runtime(prometheus_enabled=True, prometheus_port=18201)
Source code in archipy/adapters/temporal/runtime.py
class TemporalRuntimeManager(metaclass=Singleton, thread_safe=True):
    """Singleton manager for Temporal Runtime instances with telemetry configuration.

    This class ensures only one Runtime instance is created and reused across all
    Temporal clients and workers. Once created with metrics enabled, the Runtime
    cannot be changed (Temporal SDK limitation).

    Example:
        ```python
        from archipy.adapters.temporal.runtime import TemporalRuntimeManager

        # Get the singleton manager
        manager = TemporalRuntimeManager()

        # Get Runtime with Prometheus enabled
        runtime = manager.get_runtime(prometheus_enabled=True, prometheus_port=18201)
        ```
    """

    def __init__(self) -> None:
        """Initialize the TemporalRuntimeManager singleton."""
        self._runtime: Runtime | None = None

    def get_runtime(self, prometheus_enabled: bool = False, prometheus_port: int = 18201) -> Runtime | None:
        """Get or create a Runtime with Prometheus telemetry.

        Args:
            prometheus_enabled (bool): Whether to enable Prometheus metrics collection.
            prometheus_port (int): Port for the Prometheus metrics endpoint.

        Returns:
            Runtime | None: The configured Runtime instance if metrics are enabled,
                None otherwise (uses default Runtime).

        Note:
            Once a Runtime is created with metrics enabled, it cannot be disabled
            or recreated on a different port due to Temporal SDK limitations.
            Subsequent calls will return the existing Runtime regardless of parameters.
        """
        if not prometheus_enabled:
            logger.debug("Prometheus metrics disabled for Temporal, using default runtime")
            return None

        # If Runtime already created, return it (can't change once bound to port)
        if self._runtime is not None:
            logger.debug("Returning existing Temporal Runtime instance")
            return self._runtime

        logger.info("Creating Temporal Runtime with Prometheus metrics on port %d", prometheus_port)

        try:
            self._runtime = Runtime(
                telemetry=TelemetryConfig(
                    metrics=PrometheusConfig(bind_address=f"0.0.0.0:{prometheus_port}"),
                ),
            )
            logger.info("Temporal Runtime created successfully with Prometheus telemetry")
        except Exception:
            logger.exception("Failed to create Temporal Runtime with Prometheus config")
            # Return None so Temporal uses default Runtime
            return None

        return self._runtime

    def reset_runtime(self) -> None:
        """Reset the Runtime instance.

        Warning:
            This does NOT actually close the Runtime or release the port binding.
            The Temporal SDK does not support Runtime cleanup. This method only
            resets internal references for testing purposes. The port will remain
            bound until the process exits.
        """
        logger.warning("Resetting Temporal Runtime reference (port remains bound until process exit)")
        self._runtime = None

archipy.adapters.temporal.runtime.TemporalRuntimeManager.get_runtime

get_runtime(
    prometheus_enabled: bool = False,
    prometheus_port: int = 18201,
) -> Runtime | None

Get or create a Runtime with Prometheus telemetry.

Parameters:

Name Type Description Default
prometheus_enabled bool

Whether to enable Prometheus metrics collection.

False
prometheus_port int

Port for the Prometheus metrics endpoint.

18201

Returns:

Type Description
Runtime | None

Runtime | None: The configured Runtime instance if metrics are enabled, None otherwise (uses default Runtime).

Note

Once a Runtime is created with metrics enabled, it cannot be disabled or recreated on a different port due to Temporal SDK limitations. Subsequent calls will return the existing Runtime regardless of parameters.

Source code in archipy/adapters/temporal/runtime.py
def get_runtime(self, prometheus_enabled: bool = False, prometheus_port: int = 18201) -> Runtime | None:
    """Get or create a Runtime with Prometheus telemetry.

    Args:
        prometheus_enabled (bool): Whether to enable Prometheus metrics collection.
        prometheus_port (int): Port for the Prometheus metrics endpoint.

    Returns:
        Runtime | None: The configured Runtime instance if metrics are enabled,
            None otherwise (uses default Runtime).

    Note:
        Once a Runtime is created with metrics enabled, it cannot be disabled
        or recreated on a different port due to Temporal SDK limitations.
        Subsequent calls will return the existing Runtime regardless of parameters.
    """
    if not prometheus_enabled:
        logger.debug("Prometheus metrics disabled for Temporal, using default runtime")
        return None

    # If Runtime already created, return it (can't change once bound to port)
    if self._runtime is not None:
        logger.debug("Returning existing Temporal Runtime instance")
        return self._runtime

    logger.info("Creating Temporal Runtime with Prometheus metrics on port %d", prometheus_port)

    try:
        self._runtime = Runtime(
            telemetry=TelemetryConfig(
                metrics=PrometheusConfig(bind_address=f"0.0.0.0:{prometheus_port}"),
            ),
        )
        logger.info("Temporal Runtime created successfully with Prometheus telemetry")
    except Exception:
        logger.exception("Failed to create Temporal Runtime with Prometheus config")
        # Return None so Temporal uses default Runtime
        return None

    return self._runtime

archipy.adapters.temporal.runtime.TemporalRuntimeManager.reset_runtime

reset_runtime() -> None

Reset the Runtime instance.

Warning

This does NOT actually close the Runtime or release the port binding. The Temporal SDK does not support Runtime cleanup. This method only resets internal references for testing purposes. The port will remain bound until the process exits.

Source code in archipy/adapters/temporal/runtime.py
def reset_runtime(self) -> None:
    """Reset the Runtime instance.

    Warning:
        This does NOT actually close the Runtime or release the port binding.
        The Temporal SDK does not support Runtime cleanup. This method only
        resets internal references for testing purposes. The port will remain
        bound until the process exits.
    """
    logger.warning("Resetting Temporal Runtime reference (port remains bound until process exit)")
    self._runtime = None

options: show_root_toc_entry: false heading_level: 3

Worker

Temporal worker setup for executing workflows and activities.

Worker management for Temporal workflow execution.

This module provides worker management functionality for Temporal workflow orchestration, including worker lifecycle management, task queue assignment, and integration with ArchiPy service adapters.

archipy.adapters.temporal.worker.WorkerHandle

Bases: WorkerHandle

Handle for managing a Temporal worker instance.

Provides methods to control and monitor a running Temporal worker, including starting, stopping, and querying worker status.

Attributes:

Name Type Description
worker_id str

Unique identifier for this worker instance.

task_queue str

The task queue this worker polls from.

workflows list[type]

List of workflow types registered with this worker.

activities list[Callable]

List of activity callables registered with this worker.

build_id str | None

Build identifier for worker versioning.

identity str | None

Worker identity for debugging and monitoring.

max_concurrent_workflow_tasks int

Maximum concurrent workflow tasks.

max_concurrent_activities int

Maximum concurrent activity tasks.

Source code in archipy/adapters/temporal/worker.py
class WorkerHandle(PortWorkerHandle):
    """Handle for managing a Temporal worker instance.

    Provides methods to control and monitor a running Temporal worker,
    including starting, stopping, and querying worker status.

    Attributes:
        worker_id (str): Unique identifier for this worker instance.
        task_queue (str): The task queue this worker polls from.
        workflows (list[type]): List of workflow types registered with this worker.
        activities (list[Callable]): List of activity callables registered with this worker.
        build_id (str | None): Build identifier for worker versioning.
        identity (str | None): Worker identity for debugging and monitoring.
        max_concurrent_workflow_tasks (int): Maximum concurrent workflow tasks.
        max_concurrent_activities (int): Maximum concurrent activity tasks.
    """

    def __init__(
        self,
        worker: Worker,
        worker_id: str,
        task_queue: str,
        workflows: list[type] | None = None,
        activities: list[Callable[..., Any]] | None = None,
        build_id: str | None = None,
        identity: str | None = None,
        max_concurrent_workflow_tasks: int | None = None,
        max_concurrent_activities: int | None = None,
    ) -> None:
        """Initialize the worker handle.

        Args:
            worker (Worker): The Temporal worker instance.
            worker_id (str): Unique identifier for this worker instance.
            task_queue (str): The task queue this worker polls from.
            workflows (list[type], optional): List of workflow types. Defaults to None.
            activities (list[Callable], optional): List of activity callables. Defaults to None.
            build_id (str, optional): Build identifier for worker versioning. Defaults to None.
            identity (str, optional): Worker identity. Defaults to None.
            max_concurrent_workflow_tasks (int, optional): Maximum concurrent workflow tasks.
                Defaults to None.
            max_concurrent_activities (int, optional): Maximum concurrent activity tasks.
                Defaults to None.
        """
        self._worker = worker
        self.worker_id = worker_id
        self.task_queue = task_queue
        self.workflows = workflows or []
        self.activities = activities or []
        self.build_id = build_id
        self.identity = identity
        self.max_concurrent_workflow_tasks = max_concurrent_workflow_tasks
        self.max_concurrent_activities = max_concurrent_activities
        self._running = False
        self._logger = logging.getLogger(__name__)

    async def start(self) -> None:
        """Start the worker to begin polling for tasks.

        Raises:
            WorkerConnectionError: If the worker fails to start.
        """
        try:
            self._logger.info(
                "Starting worker",
                extra={
                    "worker_id": self.worker_id,
                    "task_queue": self.task_queue,
                    "identity": self.identity,
                },
            )

            # Start the worker in the background
            task = asyncio.create_task(self._worker.run())
            # Store task reference to avoid it being garbage collected
            self._background_task = task
            self._running = True

            self._logger.info(
                "Worker started successfully",
                extra={
                    "worker_id": self.worker_id,
                    "task_queue": self.task_queue,
                },
            )

        except Exception as error:
            raise WorkerConnectionError(
                additional_data={
                    "message": f"Failed to start worker for task queue '{self.task_queue}'",
                    "task_queue": self.task_queue,
                    "worker_id": self.worker_id,
                    "error": str(error),
                },
            ) from error

    async def stop(self, grace_period: int = 30) -> None:
        """Stop the worker gracefully.

        Args:
            grace_period (int): Maximum time to wait for graceful shutdown in seconds.
                Defaults to 30.

        Raises:
            WorkerShutdownError: If the worker fails to stop gracefully.
        """
        if not self._running:
            return

        try:
            self._logger.info(
                "Stopping worker",
                extra={
                    "worker_id": self.worker_id,
                    "task_queue": self.task_queue,
                    "grace_period": grace_period,
                },
            )

            # Signal shutdown and wait for graceful completion
            await asyncio.wait_for(self._worker.shutdown(), timeout=grace_period)
            self._running = False

            self._logger.info(
                "Worker stopped successfully",
                extra={
                    "worker_id": self.worker_id,
                    "task_queue": self.task_queue,
                },
            )

        except TimeoutError as error:
            raise WorkerShutdownError(
                additional_data={
                    "message": f"Worker shutdown timeout after {grace_period} seconds",
                    "worker_identity": self.identity,
                    "task_queue": self.task_queue,
                    "worker_id": self.worker_id,
                    "grace_period": grace_period,
                },
            ) from error
        except Exception as error:
            raise WorkerShutdownError(
                additional_data={
                    "message": f"Failed to stop worker for task queue '{self.task_queue}'",
                    "worker_identity": self.identity,
                    "task_queue": self.task_queue,
                    "worker_id": self.worker_id,
                    "error": str(error),
                },
            ) from error

    async def wait_until_stopped(self) -> None:
        """Wait until the worker is stopped."""
        await self._background_task

    @property
    def is_running(self) -> bool:
        """Check if the worker is currently running.

        Returns:
            bool: True if the worker is running, False otherwise.
        """
        return self._running

    def get_stats(self) -> dict[str, Any]:
        """Get worker statistics and status information.

        Returns:
            dict[str, Any]: Worker statistics and status.
        """
        return {
            "worker_id": self.worker_id,
            "task_queue": self.task_queue,
            "identity": self.identity,
            "build_id": self.build_id,
            "is_running": self.is_running,
            "workflow_count": len(self.workflows),
            "activity_count": len(self.activities),
            "max_concurrent_workflow_tasks": self.max_concurrent_workflow_tasks,
            "max_concurrent_activities": self.max_concurrent_activities,
        }

archipy.adapters.temporal.worker.WorkerHandle.worker_id instance-attribute

worker_id = worker_id

archipy.adapters.temporal.worker.WorkerHandle.task_queue instance-attribute

task_queue = task_queue

archipy.adapters.temporal.worker.WorkerHandle.workflows instance-attribute

workflows = workflows or []

archipy.adapters.temporal.worker.WorkerHandle.activities instance-attribute

activities = activities or []

archipy.adapters.temporal.worker.WorkerHandle.build_id instance-attribute

build_id = build_id

archipy.adapters.temporal.worker.WorkerHandle.identity instance-attribute

identity = identity

archipy.adapters.temporal.worker.WorkerHandle.max_concurrent_workflow_tasks instance-attribute

max_concurrent_workflow_tasks = (
    max_concurrent_workflow_tasks
)

archipy.adapters.temporal.worker.WorkerHandle.max_concurrent_activities instance-attribute

max_concurrent_activities = max_concurrent_activities

archipy.adapters.temporal.worker.WorkerHandle.is_running property

is_running: bool

Check if the worker is currently running.

Returns:

Name Type Description
bool bool

True if the worker is running, False otherwise.

archipy.adapters.temporal.worker.WorkerHandle.start async

start() -> None

Start the worker to begin polling for tasks.

Raises:

Type Description
WorkerConnectionError

If the worker fails to start.

Source code in archipy/adapters/temporal/worker.py
async def start(self) -> None:
    """Start the worker to begin polling for tasks.

    Raises:
        WorkerConnectionError: If the worker fails to start.
    """
    try:
        self._logger.info(
            "Starting worker",
            extra={
                "worker_id": self.worker_id,
                "task_queue": self.task_queue,
                "identity": self.identity,
            },
        )

        # Start the worker in the background
        task = asyncio.create_task(self._worker.run())
        # Store task reference to avoid it being garbage collected
        self._background_task = task
        self._running = True

        self._logger.info(
            "Worker started successfully",
            extra={
                "worker_id": self.worker_id,
                "task_queue": self.task_queue,
            },
        )

    except Exception as error:
        raise WorkerConnectionError(
            additional_data={
                "message": f"Failed to start worker for task queue '{self.task_queue}'",
                "task_queue": self.task_queue,
                "worker_id": self.worker_id,
                "error": str(error),
            },
        ) from error

archipy.adapters.temporal.worker.WorkerHandle.stop async

stop(grace_period: int = 30) -> None

Stop the worker gracefully.

Parameters:

Name Type Description Default
grace_period int

Maximum time to wait for graceful shutdown in seconds. Defaults to 30.

30

Raises:

Type Description
WorkerShutdownError

If the worker fails to stop gracefully.

Source code in archipy/adapters/temporal/worker.py
async def stop(self, grace_period: int = 30) -> None:
    """Stop the worker gracefully.

    Args:
        grace_period (int): Maximum time to wait for graceful shutdown in seconds.
            Defaults to 30.

    Raises:
        WorkerShutdownError: If the worker fails to stop gracefully.
    """
    if not self._running:
        return

    try:
        self._logger.info(
            "Stopping worker",
            extra={
                "worker_id": self.worker_id,
                "task_queue": self.task_queue,
                "grace_period": grace_period,
            },
        )

        # Signal shutdown and wait for graceful completion
        await asyncio.wait_for(self._worker.shutdown(), timeout=grace_period)
        self._running = False

        self._logger.info(
            "Worker stopped successfully",
            extra={
                "worker_id": self.worker_id,
                "task_queue": self.task_queue,
            },
        )

    except TimeoutError as error:
        raise WorkerShutdownError(
            additional_data={
                "message": f"Worker shutdown timeout after {grace_period} seconds",
                "worker_identity": self.identity,
                "task_queue": self.task_queue,
                "worker_id": self.worker_id,
                "grace_period": grace_period,
            },
        ) from error
    except Exception as error:
        raise WorkerShutdownError(
            additional_data={
                "message": f"Failed to stop worker for task queue '{self.task_queue}'",
                "worker_identity": self.identity,
                "task_queue": self.task_queue,
                "worker_id": self.worker_id,
                "error": str(error),
            },
        ) from error

archipy.adapters.temporal.worker.WorkerHandle.wait_until_stopped async

wait_until_stopped() -> None

Wait until the worker is stopped.

Source code in archipy/adapters/temporal/worker.py
async def wait_until_stopped(self) -> None:
    """Wait until the worker is stopped."""
    await self._background_task

archipy.adapters.temporal.worker.WorkerHandle.get_stats

get_stats() -> dict[str, Any]

Get worker statistics and status information.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Worker statistics and status.

Source code in archipy/adapters/temporal/worker.py
def get_stats(self) -> dict[str, Any]:
    """Get worker statistics and status information.

    Returns:
        dict[str, Any]: Worker statistics and status.
    """
    return {
        "worker_id": self.worker_id,
        "task_queue": self.task_queue,
        "identity": self.identity,
        "build_id": self.build_id,
        "is_running": self.is_running,
        "workflow_count": len(self.workflows),
        "activity_count": len(self.activities),
        "max_concurrent_workflow_tasks": self.max_concurrent_workflow_tasks,
        "max_concurrent_activities": self.max_concurrent_activities,
    }

archipy.adapters.temporal.worker.TemporalWorkerManager

Bases: WorkerPort

Manager for Temporal worker lifecycle and operations.

This class provides a high-level interface for managing Temporal workers, including creation, configuration, and lifecycle management. It integrates with ArchiPy configuration and service patterns.

Parameters:

Name Type Description Default
temporal_config TemporalConfig

Configuration settings for Temporal. If None, retrieves from global config. Defaults to None.

None
Source code in archipy/adapters/temporal/worker.py
class TemporalWorkerManager(WorkerPort):
    """Manager for Temporal worker lifecycle and operations.

    This class provides a high-level interface for managing Temporal workers,
    including creation, configuration, and lifecycle management. It integrates
    with ArchiPy configuration and service patterns.

    Args:
        temporal_config (TemporalConfig, optional): Configuration settings for Temporal.
            If None, retrieves from global config. Defaults to None.
    """

    def __init__(self, temporal_config: TemporalConfig | None = None) -> None:
        """Initialize the worker manager.

        Args:
            temporal_config (TemporalConfig, optional): Configuration settings for Temporal.
                If None, retrieves from global config. Defaults to None.
        """
        # Get temporal config from the global config or use provided one
        if temporal_config is None:
            global_config = BaseConfig.global_config()
            if hasattr(global_config, "TEMPORAL"):
                self.config = global_config.TEMPORAL
            else:
                # Create a default config if none exists
                from archipy.configs.config_template import TemporalConfig

                self.config = TemporalConfig()
        else:
            self.config = temporal_config
        self._temporal_adapter = TemporalAdapter(temporal_config)
        self._workers: dict[str, WorkerHandle] = {}
        self._logger = logging.getLogger(__name__)

    async def _get_client(self) -> Client:
        """Get the Temporal client from the adapter.

        Returns:
            Client: The Temporal client instance.
        """
        return await self._temporal_adapter.get_client()

    @override
    async def start_worker(
        self,
        task_queue: str,
        workflows: list[type] | None = None,
        activities: list[Callable[..., Any]] | None = None,
        build_id: str | None = None,
        identity: str | None = None,
        max_concurrent_workflow_tasks: int | None = None,
        max_concurrent_activities: int | None = None,
    ) -> WorkerHandle:
        """Start a Temporal worker for the specified task queue.

        Args:
            task_queue (str): The task queue this worker will poll from.
            workflows (list[type], optional): List of workflow classes to register.
                Defaults to None.
            activities (list[Callable], optional): List of activity callables to register.
                Defaults to None.
            build_id (str, optional): Build identifier for worker versioning.
                Defaults to None.
            identity (str, optional): Unique worker identity. If None, auto-generated.
                Defaults to None.
            max_concurrent_workflow_tasks (int, optional): Maximum concurrent workflow tasks.
                Defaults to None (server default).
            max_concurrent_activities (int, optional): Maximum concurrent activity tasks.
                Defaults to None (server default).

        Returns:
            WorkerHandle: Handle to the started worker.

        Raises:
            WorkerConnectionError: If the worker fails to start.
        """
        if not task_queue or not task_queue.strip():
            raise WorkerConnectionError(
                additional_data={
                    "message": "Task queue name cannot be empty",
                    "task_queue": task_queue,
                },
            )

        client = await self._get_client()
        worker_id = str(uuid4())
        worker_identity = identity or f"worker-{worker_id[:8]}"

        try:
            # Create the Temporal worker
            # Note: Worker inherits Runtime (including Prometheus config) from the Client
            worker = Worker(
                client,
                task_queue=task_queue,
                workflows=workflows or [],
                activities=activities or [],
                build_id=build_id,
                identity=worker_identity,
                max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
                max_concurrent_activities=max_concurrent_activities,
            )

            # Create worker handle
            worker_handle = WorkerHandle(
                worker=worker,
                worker_id=worker_id,
                task_queue=task_queue,
                workflows=workflows,
                activities=activities,
                build_id=build_id,
                identity=worker_identity,
                max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
                max_concurrent_activities=max_concurrent_activities,
            )

            # Start the worker
            await worker_handle.start()

            # Register the worker
            self._workers[worker_id] = worker_handle

            self._logger.info(
                "Worker created and started",
                extra={
                    "worker_id": worker_id,
                    "task_queue": task_queue,
                    "identity": worker_identity,
                    "workflow_count": len(workflows) if workflows else 0,
                    "activity_count": len(activities) if activities else 0,
                },
            )

            return worker_handle

        except Exception as error:
            raise WorkerConnectionError(
                additional_data={
                    "message": f"Failed to start worker for task queue '{task_queue}'",
                    "task_queue": task_queue,
                    "worker_id": worker_id,
                    "identity": worker_identity,
                    "error": str(error),
                },
            ) from error

    @override
    async def stop_worker(self, worker_handle: PortWorkerHandle) -> None:
        """Stop a running Temporal worker.

        Args:
            worker_handle (PortWorkerHandle): Handle to the worker to stop.

        Raises:
            WorkerShutdownError: If the worker fails to stop gracefully.
        """
        if worker_handle.worker_id not in self._workers:
            return  # Worker already stopped or not managed by this manager

        try:
            await worker_handle.stop()
            del self._workers[worker_handle.worker_id]

            self._logger.info(
                "Worker stopped and removed",
                extra={
                    "worker_id": worker_handle.worker_id,
                    "task_queue": worker_handle.task_queue,
                },
            )

        except Exception:
            # Remove from tracking even if shutdown failed
            if worker_handle.worker_id in self._workers:
                del self._workers[worker_handle.worker_id]
            raise

    @override
    async def shutdown_all_workers(self) -> None:
        """Shutdown all workers managed by this port.

        Performs graceful shutdown of all active workers, waiting for current
        tasks to complete before terminating.

        Raises:
            WorkerShutdownError: If any worker fails to shutdown gracefully.
        """
        if not self._workers:
            return

        self._logger.info(
            "Shutting down all workers",
            extra={
                "worker_count": len(self._workers),
            },
        )

        shutdown_errors = []
        workers_to_stop = list(self._workers.values())

        # Stop all workers concurrently
        for worker_handle in workers_to_stop:
            try:
                await self.stop_worker(worker_handle)
            except Exception as error:
                shutdown_errors.append(
                    {
                        "worker_id": worker_handle.worker_id,
                        "task_queue": worker_handle.task_queue,
                        "error": str(error),
                    },
                )

        if shutdown_errors:
            raise WorkerShutdownError(
                additional_data={
                    "message": f"Failed to shutdown {len(shutdown_errors)} workers",
                    "worker_count": len(self._workers),
                    "failed_count": len(shutdown_errors),
                    "shutdown_errors": shutdown_errors,
                },
            )

        self._logger.info("All workers shut down successfully")

    def get_worker_stats(self) -> list[dict[str, Any]]:
        """Get statistics for all managed workers.

        Returns:
            list[dict[str, Any]]: List of worker statistics.
        """
        return [worker.get_stats() for worker in self._workers.values()]

    def get_worker_by_task_queue(self, task_queue: str) -> WorkerHandle | None:
        """Get a worker handle by task queue.

        Args:
            task_queue (str): The task queue to search for.

        Returns:
            WorkerHandle | None: Worker handle if found, None otherwise.
        """
        for worker in self._workers.values():
            if worker.task_queue == task_queue:
                return worker
        return None

    def list_workers(self) -> list[WorkerHandle]:
        """Get a list of all managed workers.

        Returns:
            list[WorkerHandle]: List of worker handles.
        """
        return list(self._workers.values())

    @property
    def worker_count(self) -> int:
        """Get the number of managed workers.

        Returns:
            int: Number of managed workers.
        """
        return len(self._workers)

    async def close(self) -> None:
        """Close the worker manager and all managed workers.

        Performs cleanup of all resources, including stopping all workers
        and closing the Temporal client connection.
        """
        await self.shutdown_all_workers()
        await self._temporal_adapter.close()
        self._logger.info("Worker manager closed")

archipy.adapters.temporal.worker.TemporalWorkerManager.config instance-attribute

config = TEMPORAL

archipy.adapters.temporal.worker.TemporalWorkerManager.worker_count property

worker_count: int

Get the number of managed workers.

Returns:

Name Type Description
int int

Number of managed workers.

archipy.adapters.temporal.worker.TemporalWorkerManager.start_worker async

start_worker(
    task_queue: str,
    workflows: list[type] | None = None,
    activities: list[Callable[..., Any]] | None = None,
    build_id: str | None = None,
    identity: str | None = None,
    max_concurrent_workflow_tasks: int | None = None,
    max_concurrent_activities: int | None = None,
) -> WorkerHandle

Start a Temporal worker for the specified task queue.

Parameters:

Name Type Description Default
task_queue str

The task queue this worker will poll from.

required
workflows list[type]

List of workflow classes to register. Defaults to None.

None
activities list[Callable]

List of activity callables to register. Defaults to None.

None
build_id str

Build identifier for worker versioning. Defaults to None.

None
identity str

Unique worker identity. If None, auto-generated. Defaults to None.

None
max_concurrent_workflow_tasks int

Maximum concurrent workflow tasks. Defaults to None (server default).

None
max_concurrent_activities int

Maximum concurrent activity tasks. Defaults to None (server default).

None

Returns:

Name Type Description
WorkerHandle WorkerHandle

Handle to the started worker.

Raises:

Type Description
WorkerConnectionError

If the worker fails to start.

Source code in archipy/adapters/temporal/worker.py
@override
async def start_worker(
    self,
    task_queue: str,
    workflows: list[type] | None = None,
    activities: list[Callable[..., Any]] | None = None,
    build_id: str | None = None,
    identity: str | None = None,
    max_concurrent_workflow_tasks: int | None = None,
    max_concurrent_activities: int | None = None,
) -> WorkerHandle:
    """Start a Temporal worker for the specified task queue.

    Args:
        task_queue (str): The task queue this worker will poll from.
        workflows (list[type], optional): List of workflow classes to register.
            Defaults to None.
        activities (list[Callable], optional): List of activity callables to register.
            Defaults to None.
        build_id (str, optional): Build identifier for worker versioning.
            Defaults to None.
        identity (str, optional): Unique worker identity. If None, auto-generated.
            Defaults to None.
        max_concurrent_workflow_tasks (int, optional): Maximum concurrent workflow tasks.
            Defaults to None (server default).
        max_concurrent_activities (int, optional): Maximum concurrent activity tasks.
            Defaults to None (server default).

    Returns:
        WorkerHandle: Handle to the started worker.

    Raises:
        WorkerConnectionError: If the worker fails to start.
    """
    if not task_queue or not task_queue.strip():
        raise WorkerConnectionError(
            additional_data={
                "message": "Task queue name cannot be empty",
                "task_queue": task_queue,
            },
        )

    client = await self._get_client()
    worker_id = str(uuid4())
    worker_identity = identity or f"worker-{worker_id[:8]}"

    try:
        # Create the Temporal worker
        # Note: Worker inherits Runtime (including Prometheus config) from the Client
        worker = Worker(
            client,
            task_queue=task_queue,
            workflows=workflows or [],
            activities=activities or [],
            build_id=build_id,
            identity=worker_identity,
            max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
            max_concurrent_activities=max_concurrent_activities,
        )

        # Create worker handle
        worker_handle = WorkerHandle(
            worker=worker,
            worker_id=worker_id,
            task_queue=task_queue,
            workflows=workflows,
            activities=activities,
            build_id=build_id,
            identity=worker_identity,
            max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
            max_concurrent_activities=max_concurrent_activities,
        )

        # Start the worker
        await worker_handle.start()

        # Register the worker
        self._workers[worker_id] = worker_handle

        self._logger.info(
            "Worker created and started",
            extra={
                "worker_id": worker_id,
                "task_queue": task_queue,
                "identity": worker_identity,
                "workflow_count": len(workflows) if workflows else 0,
                "activity_count": len(activities) if activities else 0,
            },
        )

        return worker_handle

    except Exception as error:
        raise WorkerConnectionError(
            additional_data={
                "message": f"Failed to start worker for task queue '{task_queue}'",
                "task_queue": task_queue,
                "worker_id": worker_id,
                "identity": worker_identity,
                "error": str(error),
            },
        ) from error

archipy.adapters.temporal.worker.TemporalWorkerManager.stop_worker async

stop_worker(worker_handle: WorkerHandle) -> None

Stop a running Temporal worker.

Parameters:

Name Type Description Default
worker_handle WorkerHandle

Handle to the worker to stop.

required

Raises:

Type Description
WorkerShutdownError

If the worker fails to stop gracefully.

Source code in archipy/adapters/temporal/worker.py
@override
async def stop_worker(self, worker_handle: PortWorkerHandle) -> None:
    """Stop a running Temporal worker.

    Args:
        worker_handle (PortWorkerHandle): Handle to the worker to stop.

    Raises:
        WorkerShutdownError: If the worker fails to stop gracefully.
    """
    if worker_handle.worker_id not in self._workers:
        return  # Worker already stopped or not managed by this manager

    try:
        await worker_handle.stop()
        del self._workers[worker_handle.worker_id]

        self._logger.info(
            "Worker stopped and removed",
            extra={
                "worker_id": worker_handle.worker_id,
                "task_queue": worker_handle.task_queue,
            },
        )

    except Exception:
        # Remove from tracking even if shutdown failed
        if worker_handle.worker_id in self._workers:
            del self._workers[worker_handle.worker_id]
        raise

archipy.adapters.temporal.worker.TemporalWorkerManager.shutdown_all_workers async

shutdown_all_workers() -> None

Shutdown all workers managed by this port.

Performs graceful shutdown of all active workers, waiting for current tasks to complete before terminating.

Raises:

Type Description
WorkerShutdownError

If any worker fails to shutdown gracefully.

Source code in archipy/adapters/temporal/worker.py
@override
async def shutdown_all_workers(self) -> None:
    """Shutdown all workers managed by this port.

    Performs graceful shutdown of all active workers, waiting for current
    tasks to complete before terminating.

    Raises:
        WorkerShutdownError: If any worker fails to shutdown gracefully.
    """
    if not self._workers:
        return

    self._logger.info(
        "Shutting down all workers",
        extra={
            "worker_count": len(self._workers),
        },
    )

    shutdown_errors = []
    workers_to_stop = list(self._workers.values())

    # Stop all workers concurrently
    for worker_handle in workers_to_stop:
        try:
            await self.stop_worker(worker_handle)
        except Exception as error:
            shutdown_errors.append(
                {
                    "worker_id": worker_handle.worker_id,
                    "task_queue": worker_handle.task_queue,
                    "error": str(error),
                },
            )

    if shutdown_errors:
        raise WorkerShutdownError(
            additional_data={
                "message": f"Failed to shutdown {len(shutdown_errors)} workers",
                "worker_count": len(self._workers),
                "failed_count": len(shutdown_errors),
                "shutdown_errors": shutdown_errors,
            },
        )

    self._logger.info("All workers shut down successfully")

archipy.adapters.temporal.worker.TemporalWorkerManager.get_worker_stats

get_worker_stats() -> list[dict[str, Any]]

Get statistics for all managed workers.

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of worker statistics.

Source code in archipy/adapters/temporal/worker.py
def get_worker_stats(self) -> list[dict[str, Any]]:
    """Get statistics for all managed workers.

    Returns:
        list[dict[str, Any]]: List of worker statistics.
    """
    return [worker.get_stats() for worker in self._workers.values()]

archipy.adapters.temporal.worker.TemporalWorkerManager.get_worker_by_task_queue

get_worker_by_task_queue(
    task_queue: str,
) -> WorkerHandle | None

Get a worker handle by task queue.

Parameters:

Name Type Description Default
task_queue str

The task queue to search for.

required

Returns:

Type Description
WorkerHandle | None

WorkerHandle | None: Worker handle if found, None otherwise.

Source code in archipy/adapters/temporal/worker.py
def get_worker_by_task_queue(self, task_queue: str) -> WorkerHandle | None:
    """Get a worker handle by task queue.

    Args:
        task_queue (str): The task queue to search for.

    Returns:
        WorkerHandle | None: Worker handle if found, None otherwise.
    """
    for worker in self._workers.values():
        if worker.task_queue == task_queue:
            return worker
    return None

archipy.adapters.temporal.worker.TemporalWorkerManager.list_workers

list_workers() -> list[WorkerHandle]

Get a list of all managed workers.

Returns:

Type Description
list[WorkerHandle]

list[WorkerHandle]: List of worker handles.

Source code in archipy/adapters/temporal/worker.py
def list_workers(self) -> list[WorkerHandle]:
    """Get a list of all managed workers.

    Returns:
        list[WorkerHandle]: List of worker handles.
    """
    return list(self._workers.values())

archipy.adapters.temporal.worker.TemporalWorkerManager.close async

close() -> None

Close the worker manager and all managed workers.

Performs cleanup of all resources, including stopping all workers and closing the Temporal client connection.

Source code in archipy/adapters/temporal/worker.py
async def close(self) -> None:
    """Close the worker manager and all managed workers.

    Performs cleanup of all resources, including stopping all workers
    and closing the Temporal client connection.
    """
    await self.shutdown_all_workers()
    await self._temporal_adapter.close()
    self._logger.info("Worker manager closed")

options: show_root_toc_entry: false heading_level: 3