Skip to content

Interceptors

The helpers/interceptors subpackage provides request/response interceptors for FastAPI and gRPC, covering rate limiting, metrics collection, exception handling, and distributed tracing.

FastAPI

metric

FastAPI middleware interceptor for collecting Prometheus metrics on HTTP requests.

archipy.helpers.interceptors.fastapi.metric.interceptor.FastAPIMetricInterceptor

Bases: BaseHTTPMiddleware

A FastAPI interceptor for collecting and reporting metrics using Prometheus.

This interceptor measures the response time of HTTP requests and records it in a Prometheus histogram. It also tracks the number of active requests using a Prometheus gauge. The interceptor captures errors and logs them for monitoring purposes.

Source code in archipy/helpers/interceptors/fastapi/metric/interceptor.py
class FastAPIMetricInterceptor(BaseHTTPMiddleware):
    """A FastAPI interceptor for collecting and reporting metrics using Prometheus.

    This interceptor measures the response time of HTTP requests and records it in a Prometheus histogram.
    It also tracks the number of active requests using a Prometheus gauge.
    The interceptor captures errors and logs them for monitoring purposes.
    """

    ZERO_TO_ONE_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 1000 for i in range(0, 1000, 5)]
    ONE_TO_FIVE_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 100 for i in range(100, 500, 20)]
    FIVE_TO_THIRTY_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 100 for i in range(500, 3000, 50)]
    TOTAL_BUCKETS: ClassVar[list[float]] = (
        ZERO_TO_ONE_SECONDS_BUCKETS + ONE_TO_FIVE_SECONDS_BUCKETS + FIVE_TO_THIRTY_SECONDS_BUCKETS + [float("inf")]
    )

    RESPONSE_TIME_SECONDS: ClassVar[Histogram] = Histogram(
        "fastapi_response_time_seconds",
        "Time spent processing HTTP request",
        labelnames=("method", "status_code", "path_template"),
        buckets=TOTAL_BUCKETS,
    )

    ACTIVE_REQUESTS: ClassVar[Gauge] = Gauge(
        "fastapi_active_requests",
        "Number of active HTTP requests",
        labelnames=("method", "path_template"),
    )

    _path_template_cache: ClassVar[dict[str, str]] = {}

    def __init__(self, app: ASGIApp) -> None:
        """Initialize the FastAPI metric interceptor.

        Args:
            app (ASGIApp): The ASGI application to wrap.
        """
        super().__init__(app)

    async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
        """Intercept HTTP requests to measure response time and track active requests.

        Args:
            request (Request): The incoming HTTP request.
            call_next (Callable[[Request], Awaitable[Response]]): The next interceptor or endpoint to call.

        Returns:
            Response: The HTTP response from the endpoint.

        Raises:
            Exception: If an exception occurs during request processing, it is captured and re-raised.
        """
        if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
            return await call_next(request)

        path_template = self._get_path_template(request)
        method = request.method

        self.ACTIVE_REQUESTS.labels(method=method, path_template=path_template).inc()

        start_time = time.time()
        status_code = 500

        try:
            response = await call_next(request)
            status_code = response.status_code
        except Exception as exception:
            BaseUtils.capture_exception(exception)
            raise
        else:
            return response
        finally:
            duration = time.time() - start_time
            self.RESPONSE_TIME_SECONDS.labels(
                method=method,
                status_code=status_code,
                path_template=path_template,
            ).observe(duration)
            self.ACTIVE_REQUESTS.labels(method=method, path_template=path_template).dec()

    def _get_path_template(self, request: Request) -> str:
        """Extract path template from request by matching against app routes with in-memory caching.

        Args:
            request (Request): The FastAPI request object.

        Returns:
            str: Path template (e.g., /users/{id}) or raw path if no route found.
        """
        path = request.url.path
        method = request.method
        cache_key = f"{method}:{path}"

        if cache_key in self._path_template_cache:
            return self._path_template_cache[cache_key]

        for route in request.app.routes:
            match, _ = route.matches(request.scope)
            if match == Match.FULL:
                path_template = route.path
                self._path_template_cache[cache_key] = path_template
                return path_template

        self._path_template_cache[cache_key] = path
        return path

archipy.helpers.interceptors.fastapi.metric.interceptor.FastAPIMetricInterceptor.ZERO_TO_ONE_SECONDS_BUCKETS class-attribute

ZERO_TO_ONE_SECONDS_BUCKETS: list[float] = [
    (i / 1000) for i in (range(0, 1000, 5))
]

archipy.helpers.interceptors.fastapi.metric.interceptor.FastAPIMetricInterceptor.ONE_TO_FIVE_SECONDS_BUCKETS class-attribute

ONE_TO_FIVE_SECONDS_BUCKETS: list[float] = [
    (i / 100) for i in (range(100, 500, 20))
]

archipy.helpers.interceptors.fastapi.metric.interceptor.FastAPIMetricInterceptor.FIVE_TO_THIRTY_SECONDS_BUCKETS class-attribute

FIVE_TO_THIRTY_SECONDS_BUCKETS: list[float] = [
    (i / 100) for i in (range(500, 3000, 50))
]

archipy.helpers.interceptors.fastapi.metric.interceptor.FastAPIMetricInterceptor.TOTAL_BUCKETS class-attribute

TOTAL_BUCKETS: list[float] = (
    ZERO_TO_ONE_SECONDS_BUCKETS
    + ONE_TO_FIVE_SECONDS_BUCKETS
    + FIVE_TO_THIRTY_SECONDS_BUCKETS
    + [float("inf")]
)

archipy.helpers.interceptors.fastapi.metric.interceptor.FastAPIMetricInterceptor.RESPONSE_TIME_SECONDS class-attribute

RESPONSE_TIME_SECONDS: Histogram = Histogram(
    "fastapi_response_time_seconds",
    "Time spent processing HTTP request",
    labelnames=("method", "status_code", "path_template"),
    buckets=TOTAL_BUCKETS,
)

archipy.helpers.interceptors.fastapi.metric.interceptor.FastAPIMetricInterceptor.ACTIVE_REQUESTS class-attribute

ACTIVE_REQUESTS: Gauge = Gauge(
    "fastapi_active_requests",
    "Number of active HTTP requests",
    labelnames=("method", "path_template"),
)

archipy.helpers.interceptors.fastapi.metric.interceptor.FastAPIMetricInterceptor.dispatch async

dispatch(
    request: Request,
    call_next: Callable[[Request], Awaitable[Response]],
) -> Response

Intercept HTTP requests to measure response time and track active requests.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required
call_next Callable[[Request], Awaitable[Response]]

The next interceptor or endpoint to call.

required

Returns:

Name Type Description
Response Response

The HTTP response from the endpoint.

Raises:

Type Description
Exception

If an exception occurs during request processing, it is captured and re-raised.

Source code in archipy/helpers/interceptors/fastapi/metric/interceptor.py
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
    """Intercept HTTP requests to measure response time and track active requests.

    Args:
        request (Request): The incoming HTTP request.
        call_next (Callable[[Request], Awaitable[Response]]): The next interceptor or endpoint to call.

    Returns:
        Response: The HTTP response from the endpoint.

    Raises:
        Exception: If an exception occurs during request processing, it is captured and re-raised.
    """
    if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
        return await call_next(request)

    path_template = self._get_path_template(request)
    method = request.method

    self.ACTIVE_REQUESTS.labels(method=method, path_template=path_template).inc()

    start_time = time.time()
    status_code = 500

    try:
        response = await call_next(request)
        status_code = response.status_code
    except Exception as exception:
        BaseUtils.capture_exception(exception)
        raise
    else:
        return response
    finally:
        duration = time.time() - start_time
        self.RESPONSE_TIME_SECONDS.labels(
            method=method,
            status_code=status_code,
            path_template=path_template,
        ).observe(duration)
        self.ACTIVE_REQUESTS.labels(method=method, path_template=path_template).dec()

options: show_root_toc_entry: false heading_level: 3

rate_limit

FastAPI interceptor that enforces configurable rate limits on HTTP endpoints using Redis as a backend.

archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler

A rate-limiting handler for FastAPI REST endpoints using Redis for tracking.

This class provides rate-limiting functionality by tracking the number of requests made to a specific endpoint within a defined time window. If the request limit is exceeded, it raises an HTTP 429 Too Many Requests error.

Parameters:

Name Type Description Default
calls_count StrictInt

The maximum number of allowed requests within the time window.

1
milliseconds StrictInt

The time window in milliseconds.

0
seconds StrictInt

The time window in seconds.

0
minutes StrictInt

The time window in minutes.

0
hours StrictInt

The time window in hours.

0
days StrictInt

The time window in days.

0
query_params set(StrictStr)

request query parameters for rate-limiting based on query params.

None
Source code in archipy/helpers/interceptors/fastapi/rate_limit/fastapi_rest_rate_limit_handler.py
class FastAPIRestRateLimitHandler:
    """A rate-limiting handler for FastAPI REST endpoints using Redis for tracking.

    This class provides rate-limiting functionality by tracking the number of requests
    made to a specific endpoint within a defined time window. If the request limit is
    exceeded, it raises an HTTP 429 Too Many Requests error.

    Args:
        calls_count (StrictInt): The maximum number of allowed requests within the time window.
        milliseconds (StrictInt): The time window in milliseconds.
        seconds (StrictInt): The time window in seconds.
        minutes (StrictInt): The time window in minutes.
        hours (StrictInt): The time window in hours.
        days (StrictInt): The time window in days.
        query_params (set(StrictStr)): request query parameters for rate-limiting based on query params.
    """

    def __init__(
        self,
        calls_count: StrictInt = 1,
        milliseconds: StrictInt = 0,
        seconds: StrictInt = 0,
        minutes: StrictInt = 0,
        hours: StrictInt = 0,
        days: StrictInt = 0,
        query_params: set[StrictStr] | None = None,
    ) -> None:
        """Initialize the rate limit handler with specified time window and request limits.

        The time window is calculated by combining all time unit parameters into milliseconds.
        At least one time unit parameter should be greater than 0 to create a valid window.

        Args:
            calls_count (StrictInt, optional): Maximum number of allowed requests within the time window.
                Defaults to 1.
            milliseconds (StrictInt, optional): Number of milliseconds in the time window.
                Defaults to 0.
            seconds (StrictInt, optional): Number of seconds in the time window.
                Defaults to 0.
            minutes (StrictInt, optional): Number of minutes in the time window.
                Defaults to 0.
            hours (StrictInt, optional): Number of hours in the time window.
                Defaults to 0.
            days (StrictInt, optional): Number of days in the time window.
                Defaults to 0.
            query_params (set[StrictStr] | None, optional): Set of query parameter names to include
                in rate limit key generation. If None, no query parameters will be used.
                Defaults to None.

        Example:
            >>> # Allow 100 requests per minute
            >>> handler = FastAPIRestRateLimitHandler(calls_count=100, minutes=1)
            >>>
            >>> # Allow 1000 requests per day with specific query params
            >>> handler = FastAPIRestRateLimitHandler(calls_count=1000, days=1, query_params={"user_id", "action"})
        """
        self.query_params = query_params or set()
        self.calls_count = calls_count
        # Calculate total time in milliseconds directly for better readability
        self.milliseconds = (
            milliseconds + 1000 * seconds + 60 * 1000 * minutes + 60 * 60 * 1000 * hours + 24 * 60 * 60 * 1000 * days
        )
        self.redis_client = AsyncRedisAdapter()

    async def _check(self, key: str) -> RedisResponseType:
        """Checks if the request count for the given key exceeds the allowed limit.

        Args:
            key (str): The Redis key used to track the request count.

        Returns:
            int: The remaining time-to-live (TTL) in milliseconds if the limit is exceeded, otherwise 0.
        """
        # Use await for getting value from Redis as it's asynchronous
        current_request = await self.redis_client.get(key)
        if current_request is None:
            await self.redis_client.set(key, 1, px=self.milliseconds)
            return 0

        current_request = int(current_request)
        if current_request < self.calls_count:
            await self.redis_client.incrby(key)
            return 0

        ttl = await self.redis_client.pttl(key)
        if ttl == -1:
            await self.redis_client.delete(key)
        return ttl

    async def __call__(self, request: Request) -> None:
        """Handles the rate-limiting logic for incoming requests.

        Args:
            request (Request): The incoming FastAPI request.

        Raises:
            HTTPException: If the rate limit is exceeded, an HTTP 429 Too Many Requests error is raised.
        """
        rate_key = await self._get_identifier(request)
        key = f"RateLimitHandler:{rate_key}:{request.scope['path']}:{request.method}"
        pexpire = await self._check(key)  # Awaiting the function since it is an async call
        if pexpire != 0:
            await self._create_callback(pexpire)

    @staticmethod
    async def _create_callback(pexpire: int) -> None:
        """Raises an HTTP 429 Too Many Requests error with the appropriate headers.

        Args:
            pexpire (int): The remaining time-to-live (TTL) in milliseconds before the rate limit resets.

        Raises:
            HTTPException: An HTTP 429 Too Many Requests error with the `Retry-After` header.
        """
        expire = ceil(pexpire / 1000)
        raise HTTPException(
            status_code=HTTP_429_TOO_MANY_REQUESTS,
            detail="Too Many Requests",
            headers={"Retry-After": str(expire)},
        )

    async def _get_identifier(self, request: Request) -> str:
        """Extracts a unique identifier from the request, typically an IP address and endpoint.

        Args:
            request (Request): The FastAPI request object containing headers and client info.

        Returns:
            str: A Redis key generated using the extracted identifier and request information.

        Note:
            - Validates IP addresses for proper formatting
            - Handles forwarded IPs with comma-separated values
            - Filters out private, loopback, link-local, and multicast IPs in X-Forwarded-For
            - Falls back to client.host if no valid IP is found
        """
        base_identifier = await self._extract_client_ip(request)
        return self._generate_redis_key(request=request, base_identifier=base_identifier)

    async def _extract_client_ip(self, request: Request) -> str:
        """Extracts and validates client IP from request headers.

        Args:
            request (Request): The FastAPI request object.

        Returns:
            str: Validated IP address or client host.
        """
        # Check X-Real-IP header first
        if real_ip := self._validate_ip_from_header(request.headers.get("X-Real-IP")):
            return real_ip

        # Then check X-Forwarded-For header
        if forwarded_for := self._validate_forwarded_for_header(request.headers.get("X-Forwarded-For")):
            return forwarded_for

        # Fallback to client host
        if request.client is not None:
            return request.client.host
        return "unknown"

    def _validate_ip_from_header(self, header_value: str | None) -> str | None:
        """Validates IP address from header value.

        Args:
            header_value (Optional[str]): IP address from header.

        Returns:
            Optional[str]: First valid IP address or None.
        """
        if not header_value:
            return None

        try:
            ip_str = header_value.split(",")[0].strip()
            ip = ip_address(ip_str)  # Validate IP format
            if not (ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast):
                return ip_str
        except ValueError:
            pass

        return None

    def _validate_forwarded_for_header(self, forwarded_for: str | None) -> str | None:
        """Validates IP from X-Forwarded-For header.

        Args:
            forwarded_for (Optional[str]): X-Forwarded-For header value.

        Returns:
            Optional[str]: Valid non-private IP, the header or None.
        """
        if not forwarded_for:
            return None

        try:
            ip_str = forwarded_for.split(",")[0].strip()
            ip = ip_address(ip_str)

            if not (ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast):
                return ip_str

        except ValueError:
            pass
        return forwarded_for

    def _generate_redis_key(self, request: Request, base_identifier: str) -> str:
        """Generates a Redis key for rate limiting based on the request and base identifier.

        Args:
            request (Request): The FastAPI request object containing path and query parameters.
            base_identifier (str): The base identifier (typically an IP address) for the request.

        Returns:
            str: A Redis key string with optional query parameters.
        """
        path_key = f"{base_identifier}:{request.scope['path']}"

        if not self.query_params:
            return path_key

        return self._append_query_params(path_key, request.query_params)

    def _append_query_params(self, base_key: str, query_params: QueryParams) -> str:
        """Appends sorted query parameters to the Redis key.

        Args:
            base_key (str): Base Redis key without query parameters.
            query_params (dict[str, str]): Request query parameters.

        Returns:
            str: Redis key with appended query parameters.
        """
        filtered_params = {k: v for k, v in query_params.items() if k in self.query_params and v is not None}

        if not filtered_params:
            return base_key

        sorted_params = sorted(filtered_params.items())
        query_string = "&".join(f"{k}={v}" for k, v in sorted_params)
        return f"{base_key}?{query_string}"

archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler.query_params instance-attribute

query_params = query_params or set()

archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler.calls_count instance-attribute

calls_count = calls_count

archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler.milliseconds instance-attribute

milliseconds = (
    milliseconds
    + 1000 * seconds
    + 60 * 1000 * minutes
    + 60 * 60 * 1000 * hours
    + 24 * 60 * 60 * 1000 * days
)

archipy.helpers.interceptors.fastapi.rate_limit.fastapi_rest_rate_limit_handler.FastAPIRestRateLimitHandler.redis_client instance-attribute

redis_client = AsyncRedisAdapter()

options: show_root_toc_entry: false heading_level: 3

gRPC

base

Abstract base classes for gRPC client and server interceptors.

archipy.helpers.interceptors.grpc.base.client_interceptor.ClientCallDetails

Bases: _ClientCallDetailsFields, ClientCallDetails

Describes an RPC to be invoked.

This class extends grpc.ClientCallDetails and provides additional fields for RPC details. See https://grpc.github.io/grpc/python/grpc.html#grpc.ClientCallDetails

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
class ClientCallDetails(_ClientCallDetailsFields, grpc.ClientCallDetails):
    """Describes an RPC to be invoked.

    This class extends `grpc.ClientCallDetails` and provides additional fields for RPC details.
    See https://grpc.github.io/grpc/python/grpc.html#grpc.ClientCallDetails
    """

archipy.helpers.interceptors.grpc.base.client_interceptor.ClientCallDetails.method instance-attribute

method: str

archipy.helpers.interceptors.grpc.base.client_interceptor.ClientCallDetails.timeout instance-attribute

timeout: float | None

archipy.helpers.interceptors.grpc.base.client_interceptor.ClientCallDetails.metadata instance-attribute

metadata: Sequence[tuple[str, str | bytes]] | None

archipy.helpers.interceptors.grpc.base.client_interceptor.ClientCallDetails.credentials instance-attribute

credentials: CallCredentials | None

archipy.helpers.interceptors.grpc.base.client_interceptor.ClientCallDetails.wait_for_ready instance-attribute

wait_for_ready: bool | None

archipy.helpers.interceptors.grpc.base.client_interceptor.ClientCallDetails.compression instance-attribute

compression: Compression | None

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseGrpcClientInterceptor

Bases: UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor, StreamStreamClientInterceptor

Base class for gRPC client interceptors.

This class provides a base implementation for intercepting gRPC client calls. It supports unary-unary, unary-stream, stream-unary, and stream-stream RPCs.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
class BaseGrpcClientInterceptor(
    grpc.UnaryUnaryClientInterceptor,
    grpc.UnaryStreamClientInterceptor,
    grpc.StreamUnaryClientInterceptor,
    grpc.StreamStreamClientInterceptor,
    metaclass=abc.ABCMeta,
):
    """Base class for gRPC client interceptors.

    This class provides a base implementation for intercepting gRPC client calls.
    It supports unary-unary, unary-stream, stream-unary, and stream-stream RPCs.
    """

    @abc.abstractmethod
    def intercept(
        self,
        method: Callable,
        request_or_iterator: Any,
        call_details: grpc.ClientCallDetails,
    ) -> Any:
        """Intercepts a gRPC client call.

        Args:
            method (Callable): The continuation function to call.
            request_or_iterator (Any): The request or request iterator.
            call_details (grpc.ClientCallDetails): Details of the RPC call.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        return method(request_or_iterator, call_details)

    def intercept_unary_unary(
        self,
        continuation: Callable[[grpc.ClientCallDetails, _TRequest], Any],
        client_call_details: grpc.ClientCallDetails,
        request: _TRequest,
    ) -> Any:
        """Intercepts a unary-unary RPC call.

        Args:
            continuation (Callable): The continuation function to call.
            client_call_details (grpc.ClientCallDetails): Details of the RPC call.
            request (Any): The request object.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        result = self.intercept(_swap_args(continuation), request, client_call_details)
        return result

    def intercept_unary_stream(
        self,
        continuation: Callable[[grpc.ClientCallDetails, _TRequest], Any],
        client_call_details: grpc.ClientCallDetails,
        request: _TRequest,
    ) -> Any:
        """Intercepts a unary-stream RPC call.

        Args:
            continuation (Callable): The continuation function to call.
            client_call_details (grpc.ClientCallDetails): Details of the RPC call.
            request (Any): The request object.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        result = self.intercept(_swap_args(continuation), request, client_call_details)
        return result

    def intercept_stream_unary(
        self,
        continuation: Callable[[grpc.ClientCallDetails, Iterator[_TRequest]], Any],
        client_call_details: grpc.ClientCallDetails,
        request_iterator: Iterator[_TRequest],
    ) -> Any:
        """Intercepts a stream-unary RPC call.

        Args:
            continuation (Callable): The continuation function to call.
            client_call_details (grpc.ClientCallDetails): Details of the RPC call.
            request_iterator (Iterator[Any]): The request iterator.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        result = self.intercept(_swap_args(continuation), request_iterator, client_call_details)
        return result

    def intercept_stream_stream(
        self,
        continuation: Callable[[grpc.ClientCallDetails, Iterator[_TRequest]], Any],
        client_call_details: grpc.ClientCallDetails,
        request_iterator: Iterator[_TRequest],
    ) -> Any:
        """Intercepts a stream-stream RPC call.

        Args:
            continuation (Callable): The continuation function to call.
            client_call_details (grpc.ClientCallDetails): Details of the RPC call.
            request_iterator (Iterator[Any]): The request iterator.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        result = self.intercept(_swap_args(continuation), request_iterator, client_call_details)
        return result

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseGrpcClientInterceptor.intercept abstractmethod

intercept(
    method: Callable,
    request_or_iterator: Any,
    call_details: ClientCallDetails,
) -> Any

Intercepts a gRPC client call.

Parameters:

Name Type Description Default
method Callable

The continuation function to call.

required
request_or_iterator Any

The request or request iterator.

required
call_details ClientCallDetails

Details of the RPC call.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
@abc.abstractmethod
def intercept(
    self,
    method: Callable,
    request_or_iterator: Any,
    call_details: grpc.ClientCallDetails,
) -> Any:
    """Intercepts a gRPC client call.

    Args:
        method (Callable): The continuation function to call.
        request_or_iterator (Any): The request or request iterator.
        call_details (grpc.ClientCallDetails): Details of the RPC call.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    return method(request_or_iterator, call_details)

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseGrpcClientInterceptor.intercept_unary_unary

intercept_unary_unary(
    continuation: Callable[
        [ClientCallDetails, _TRequest], Any
    ],
    client_call_details: ClientCallDetails,
    request: _TRequest,
) -> Any

Intercepts a unary-unary RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request Any

The request object.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
def intercept_unary_unary(
    self,
    continuation: Callable[[grpc.ClientCallDetails, _TRequest], Any],
    client_call_details: grpc.ClientCallDetails,
    request: _TRequest,
) -> Any:
    """Intercepts a unary-unary RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.ClientCallDetails): Details of the RPC call.
        request (Any): The request object.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = self.intercept(_swap_args(continuation), request, client_call_details)
    return result

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseGrpcClientInterceptor.intercept_unary_stream

intercept_unary_stream(
    continuation: Callable[
        [ClientCallDetails, _TRequest], Any
    ],
    client_call_details: ClientCallDetails,
    request: _TRequest,
) -> Any

Intercepts a unary-stream RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request Any

The request object.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
def intercept_unary_stream(
    self,
    continuation: Callable[[grpc.ClientCallDetails, _TRequest], Any],
    client_call_details: grpc.ClientCallDetails,
    request: _TRequest,
) -> Any:
    """Intercepts a unary-stream RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.ClientCallDetails): Details of the RPC call.
        request (Any): The request object.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = self.intercept(_swap_args(continuation), request, client_call_details)
    return result

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseGrpcClientInterceptor.intercept_stream_unary

intercept_stream_unary(
    continuation: Callable[
        [ClientCallDetails, Iterator[_TRequest]], Any
    ],
    client_call_details: ClientCallDetails,
    request_iterator: Iterator[_TRequest],
) -> Any

Intercepts a stream-unary RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request_iterator Iterator[Any]

The request iterator.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
def intercept_stream_unary(
    self,
    continuation: Callable[[grpc.ClientCallDetails, Iterator[_TRequest]], Any],
    client_call_details: grpc.ClientCallDetails,
    request_iterator: Iterator[_TRequest],
) -> Any:
    """Intercepts a stream-unary RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.ClientCallDetails): Details of the RPC call.
        request_iterator (Iterator[Any]): The request iterator.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = self.intercept(_swap_args(continuation), request_iterator, client_call_details)
    return result

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseGrpcClientInterceptor.intercept_stream_stream

intercept_stream_stream(
    continuation: Callable[
        [ClientCallDetails, Iterator[_TRequest]], Any
    ],
    client_call_details: ClientCallDetails,
    request_iterator: Iterator[_TRequest],
) -> Any

Intercepts a stream-stream RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request_iterator Iterator[Any]

The request iterator.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
def intercept_stream_stream(
    self,
    continuation: Callable[[grpc.ClientCallDetails, Iterator[_TRequest]], Any],
    client_call_details: grpc.ClientCallDetails,
    request_iterator: Iterator[_TRequest],
) -> Any:
    """Intercepts a stream-stream RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.ClientCallDetails): Details of the RPC call.
        request_iterator (Iterator[Any]): The request iterator.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = self.intercept(_swap_args(continuation), request_iterator, client_call_details)
    return result

archipy.helpers.interceptors.grpc.base.client_interceptor.AsyncClientCallDetails

Bases: _AsyncClientCallDetailsFields, ClientCallDetails

Describes an RPC to be invoked in an asynchronous context.

This class extends grpc.aio.ClientCallDetails and provides additional fields for RPC details. See https://grpc.github.io/grpc/python/grpc.html#grpc.ClientCallDetails

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
class AsyncClientCallDetails(_AsyncClientCallDetailsFields, grpc.aio.ClientCallDetails):
    """Describes an RPC to be invoked in an asynchronous context.

    This class extends `grpc.aio.ClientCallDetails` and provides additional fields for RPC details.
    See https://grpc.github.io/grpc/python/grpc.html#grpc.ClientCallDetails
    """

archipy.helpers.interceptors.grpc.base.client_interceptor.AsyncClientCallDetails.method instance-attribute

method: str

archipy.helpers.interceptors.grpc.base.client_interceptor.AsyncClientCallDetails.timeout instance-attribute

timeout: float | None

archipy.helpers.interceptors.grpc.base.client_interceptor.AsyncClientCallDetails.metadata instance-attribute

metadata: Sequence[tuple[str, str | bytes]] | None

archipy.helpers.interceptors.grpc.base.client_interceptor.AsyncClientCallDetails.credentials instance-attribute

credentials: CallCredentials | None

archipy.helpers.interceptors.grpc.base.client_interceptor.AsyncClientCallDetails.wait_for_ready instance-attribute

wait_for_ready: bool | None

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseAsyncGrpcClientInterceptor

Bases: UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor, StreamStreamClientInterceptor

Base class for asynchronous gRPC client interceptors.

This class provides a base implementation for intercepting asynchronous gRPC client calls. It supports unary-unary, unary-stream, stream-unary, and stream-stream RPCs.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
class BaseAsyncGrpcClientInterceptor(
    grpc.aio.UnaryUnaryClientInterceptor,
    grpc.aio.UnaryStreamClientInterceptor,
    grpc.aio.StreamUnaryClientInterceptor,
    grpc.aio.StreamStreamClientInterceptor,
    metaclass=abc.ABCMeta,
):
    """Base class for asynchronous gRPC client interceptors.

    This class provides a base implementation for intercepting asynchronous gRPC client calls.
    It supports unary-unary, unary-stream, stream-unary, and stream-stream RPCs.
    """

    @abc.abstractmethod
    async def intercept(
        self,
        method: Callable,
        request_or_iterator: Any,
        call_details: grpc.aio.ClientCallDetails,
    ) -> Any:
        """Intercepts an asynchronous gRPC client call.

        Args:
            method (Callable): The continuation function to call.
            request_or_iterator (Any): The request or request iterator.
            call_details (grpc.aio.ClientCallDetails): Details of the RPC call.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        return await method(request_or_iterator, call_details)

    async def intercept_unary_unary(
        self,
        continuation: Callable[[grpc.aio.ClientCallDetails, _TRequest], Any],
        client_call_details: grpc.aio.ClientCallDetails,
        request: _TRequest,
    ) -> Any:
        """Intercepts an asynchronous unary-unary RPC call.

        Args:
            continuation (Callable): The continuation function to call.
            client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
            request (Any): The request object.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        result = await self.intercept(_swap_args(continuation), request, client_call_details)
        return result

    async def intercept_unary_stream(
        self,
        continuation: Callable[[grpc.aio.ClientCallDetails, _TRequest], Any],
        client_call_details: grpc.aio.ClientCallDetails,
        request: _TRequest,
    ) -> Any:
        """Intercepts an asynchronous unary-stream RPC call.

        Args:
            continuation (Callable): The continuation function to call.
            client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
            request (Any): The request object.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        result = await self.intercept(_swap_args(continuation), request, client_call_details)
        return result

    async def intercept_stream_unary(
        self,
        continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterable[_TRequest] | Iterable[_TRequest]], Any],
        client_call_details: grpc.aio.ClientCallDetails,
        request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
    ) -> Any:
        """Intercepts an asynchronous stream-unary RPC call.

        Args:
            continuation (Callable): The continuation function to call.
            client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
            request_iterator (Iterator[Any]): The request iterator.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        result = await self.intercept(_swap_args(continuation), request_iterator, client_call_details)
        return result

    async def intercept_stream_stream(
        self,
        continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterable[_TRequest] | Iterable[_TRequest]], Any],
        client_call_details: grpc.aio.ClientCallDetails,
        request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
    ) -> Any:
        """Intercepts an asynchronous stream-stream RPC call.

        Args:
            continuation (Callable): The continuation function to call.
            client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
            request_iterator (Iterator[Any]): The request iterator.

        Returns:
            Any: The result of the intercepted RPC call.
        """
        result = await self.intercept(_swap_args(continuation), request_iterator, client_call_details)
        return result

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseAsyncGrpcClientInterceptor.intercept abstractmethod async

intercept(
    method: Callable,
    request_or_iterator: Any,
    call_details: ClientCallDetails,
) -> Any

Intercepts an asynchronous gRPC client call.

Parameters:

Name Type Description Default
method Callable

The continuation function to call.

required
request_or_iterator Any

The request or request iterator.

required
call_details ClientCallDetails

Details of the RPC call.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
@abc.abstractmethod
async def intercept(
    self,
    method: Callable,
    request_or_iterator: Any,
    call_details: grpc.aio.ClientCallDetails,
) -> Any:
    """Intercepts an asynchronous gRPC client call.

    Args:
        method (Callable): The continuation function to call.
        request_or_iterator (Any): The request or request iterator.
        call_details (grpc.aio.ClientCallDetails): Details of the RPC call.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    return await method(request_or_iterator, call_details)

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseAsyncGrpcClientInterceptor.intercept_unary_unary async

intercept_unary_unary(
    continuation: Callable[
        [ClientCallDetails, _TRequest], Any
    ],
    client_call_details: ClientCallDetails,
    request: _TRequest,
) -> Any

Intercepts an asynchronous unary-unary RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request Any

The request object.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
async def intercept_unary_unary(
    self,
    continuation: Callable[[grpc.aio.ClientCallDetails, _TRequest], Any],
    client_call_details: grpc.aio.ClientCallDetails,
    request: _TRequest,
) -> Any:
    """Intercepts an asynchronous unary-unary RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
        request (Any): The request object.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = await self.intercept(_swap_args(continuation), request, client_call_details)
    return result

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseAsyncGrpcClientInterceptor.intercept_unary_stream async

intercept_unary_stream(
    continuation: Callable[
        [ClientCallDetails, _TRequest], Any
    ],
    client_call_details: ClientCallDetails,
    request: _TRequest,
) -> Any

Intercepts an asynchronous unary-stream RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request Any

The request object.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
async def intercept_unary_stream(
    self,
    continuation: Callable[[grpc.aio.ClientCallDetails, _TRequest], Any],
    client_call_details: grpc.aio.ClientCallDetails,
    request: _TRequest,
) -> Any:
    """Intercepts an asynchronous unary-stream RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
        request (Any): The request object.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = await self.intercept(_swap_args(continuation), request, client_call_details)
    return result

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseAsyncGrpcClientInterceptor.intercept_stream_unary async

intercept_stream_unary(
    continuation: Callable[
        [
            ClientCallDetails,
            AsyncIterable[_TRequest] | Iterable[_TRequest],
        ],
        Any,
    ],
    client_call_details: ClientCallDetails,
    request_iterator: AsyncIterable[_TRequest]
    | Iterable[_TRequest],
) -> Any

Intercepts an asynchronous stream-unary RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request_iterator Iterator[Any]

The request iterator.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
async def intercept_stream_unary(
    self,
    continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterable[_TRequest] | Iterable[_TRequest]], Any],
    client_call_details: grpc.aio.ClientCallDetails,
    request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
) -> Any:
    """Intercepts an asynchronous stream-unary RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
        request_iterator (Iterator[Any]): The request iterator.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = await self.intercept(_swap_args(continuation), request_iterator, client_call_details)
    return result

archipy.helpers.interceptors.grpc.base.client_interceptor.BaseAsyncGrpcClientInterceptor.intercept_stream_stream async

intercept_stream_stream(
    continuation: Callable[
        [
            ClientCallDetails,
            AsyncIterable[_TRequest] | Iterable[_TRequest],
        ],
        Any,
    ],
    client_call_details: ClientCallDetails,
    request_iterator: AsyncIterable[_TRequest]
    | Iterable[_TRequest],
) -> Any

Intercepts an asynchronous stream-stream RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request_iterator Iterator[Any]

The request iterator.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
async def intercept_stream_stream(
    self,
    continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterable[_TRequest] | Iterable[_TRequest]], Any],
    client_call_details: grpc.aio.ClientCallDetails,
    request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
) -> Any:
    """Intercepts an asynchronous stream-stream RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
        request_iterator (Iterator[Any]): The request iterator.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = await self.intercept(_swap_args(continuation), request_iterator, client_call_details)
    return result

options: show_root_toc_entry: false heading_level: 3

archipy.helpers.interceptors.grpc.base.server_interceptor.MethodName

Bases: BaseDTO

A data transfer object (DTO) representing the parsed method name of a gRPC call.

Attributes:

Name Type Description
full_name str

The full name of the method, including package, service, and method.

package str

The package name.

service str

The service name.

method str

The method name.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
class MethodName(BaseDTO):
    """A data transfer object (DTO) representing the parsed method name of a gRPC call.

    Attributes:
        full_name (str): The full name of the method, including package, service, and method.
        package (str): The package name.
        service (str): The service name.
        method (str): The method name.
    """

    full_name: str
    package: str
    service: str
    method: str

archipy.helpers.interceptors.grpc.base.server_interceptor.MethodName.full_name instance-attribute

full_name: str

archipy.helpers.interceptors.grpc.base.server_interceptor.MethodName.package instance-attribute

package: str

archipy.helpers.interceptors.grpc.base.server_interceptor.MethodName.service instance-attribute

service: str

archipy.helpers.interceptors.grpc.base.server_interceptor.MethodName.method instance-attribute

method: str

archipy.helpers.interceptors.grpc.base.server_interceptor.MethodName.model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="ignore",
    validate_default=True,
    from_attributes=True,
    frozen=True,
    str_strip_whitespace=True,
    arbitrary_types_allowed=True,
)

archipy.helpers.interceptors.grpc.base.server_interceptor.BaseGrpcServerInterceptor

Bases: ServerInterceptor

Base class for gRPC server interceptors.

This class provides a base implementation for intercepting gRPC server calls. It allows custom logic to be injected into the request/response flow.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
class BaseGrpcServerInterceptor(grpc.ServerInterceptor, metaclass=abc.ABCMeta):
    """Base class for gRPC server interceptors.

    This class provides a base implementation for intercepting gRPC server calls.
    It allows custom logic to be injected into the request/response flow.
    """

    @abc.abstractmethod
    def intercept(
        self,
        method: Callable,
        request: object,
        context: grpc.ServicerContext,
        method_name_model: MethodName,
    ) -> object:
        """Intercepts a gRPC server call.

        Args:
            method (Callable): The method to be intercepted.
            request (object): The request object.
            context (grpc.ServicerContext): The context of the RPC call.
            method_name_model (str): The full method name (e.g., "/package.Service/Method").

        Returns:
            object: The result of the intercepted method.
        """
        return method(request, context)

    def intercept_service(
        self,
        continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler | None],
        handler_call_details: grpc.HandlerCallDetails,
    ) -> grpc.RpcMethodHandler | None:
        """Intercepts the service call and wraps the handler with custom logic.

        Args:
            continuation: The continuation function to call.
            handler_call_details: Details of the handler call.

        Returns:
            grpc.RpcMethodHandler: The wrapped RPC method handler.
        """
        next_handler = continuation(handler_call_details)
        if next_handler is None:
            return None

        handler_factory, next_handler_method = _get_factory_and_method(next_handler)

        def invoke_intercept_method(request: object, context: grpc.ServicerContext) -> object:
            """Invokes the intercepted method.

            Args:
                request (object): The request object.
                context (grpc.ServicerContext): The context of the RPC call.

            Returns:
                object: The result of the intercepted method.
            """
            method_name_model = parse_method_name(handler_call_details.method)
            return self.intercept(next_handler_method, request, context, method_name_model)

        return handler_factory(
            invoke_intercept_method,
            request_deserializer=next_handler.request_deserializer,
            response_serializer=next_handler.response_serializer,
        )

archipy.helpers.interceptors.grpc.base.server_interceptor.BaseGrpcServerInterceptor.intercept abstractmethod

intercept(
    method: Callable,
    request: object,
    context: ServicerContext,
    method_name_model: MethodName,
) -> object

Intercepts a gRPC server call.

Parameters:

Name Type Description Default
method Callable

The method to be intercepted.

required
request object

The request object.

required
context ServicerContext

The context of the RPC call.

required
method_name_model str

The full method name (e.g., "/package.Service/Method").

required

Returns:

Name Type Description
object object

The result of the intercepted method.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
@abc.abstractmethod
def intercept(
    self,
    method: Callable,
    request: object,
    context: grpc.ServicerContext,
    method_name_model: MethodName,
) -> object:
    """Intercepts a gRPC server call.

    Args:
        method (Callable): The method to be intercepted.
        request (object): The request object.
        context (grpc.ServicerContext): The context of the RPC call.
        method_name_model (str): The full method name (e.g., "/package.Service/Method").

    Returns:
        object: The result of the intercepted method.
    """
    return method(request, context)

archipy.helpers.interceptors.grpc.base.server_interceptor.BaseGrpcServerInterceptor.intercept_service

intercept_service(
    continuation: Callable[
        [HandlerCallDetails], RpcMethodHandler | None
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler | None

Intercepts the service call and wraps the handler with custom logic.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], RpcMethodHandler | None]

The continuation function to call.

required
handler_call_details HandlerCallDetails

Details of the handler call.

required

Returns:

Type Description
RpcMethodHandler | None

grpc.RpcMethodHandler: The wrapped RPC method handler.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler | None],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler | None:
    """Intercepts the service call and wraps the handler with custom logic.

    Args:
        continuation: The continuation function to call.
        handler_call_details: Details of the handler call.

    Returns:
        grpc.RpcMethodHandler: The wrapped RPC method handler.
    """
    next_handler = continuation(handler_call_details)
    if next_handler is None:
        return None

    handler_factory, next_handler_method = _get_factory_and_method(next_handler)

    def invoke_intercept_method(request: object, context: grpc.ServicerContext) -> object:
        """Invokes the intercepted method.

        Args:
            request (object): The request object.
            context (grpc.ServicerContext): The context of the RPC call.

        Returns:
            object: The result of the intercepted method.
        """
        method_name_model = parse_method_name(handler_call_details.method)
        return self.intercept(next_handler_method, request, context, method_name_model)

    return handler_factory(
        invoke_intercept_method,
        request_deserializer=next_handler.request_deserializer,
        response_serializer=next_handler.response_serializer,
    )

archipy.helpers.interceptors.grpc.base.server_interceptor.BaseAsyncGrpcServerInterceptor

Bases: ServerInterceptor

Base class for asynchronous gRPC server interceptors.

This class provides a simplified base implementation for intercepting async gRPC server calls. Unlike the synchronous version, async interceptors work differently and don't need the complex handler wrapping logic.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
class BaseAsyncGrpcServerInterceptor(grpc.aio.ServerInterceptor, metaclass=abc.ABCMeta):
    """Base class for asynchronous gRPC server interceptors.

    This class provides a simplified base implementation for intercepting async gRPC server calls.
    Unlike the synchronous version, async interceptors work differently and don't need the complex
    handler wrapping logic.
    """

    @abc.abstractmethod
    async def intercept(
        self,
        method: Callable,
        request: object,
        context: grpc.aio.ServicerContext,
        method_name_model: MethodName,
    ) -> object:
        """Intercepts an async gRPC server call.

        Args:
            method (Callable): The method to be intercepted.
            request (object): The request object.
            context (grpc.aio.ServicerContext): The context of the RPC call.
            method_name_model (MethodName): The parsed method name containing package, service, and method components.

        Returns:
            object: The result of the intercepted method.
        """
        return await method(request, context)

    async def intercept_service(
        self,
        continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
        handler_call_details: grpc.HandlerCallDetails,
    ) -> grpc.RpcMethodHandler:
        """Intercepts the service call using the simplified async pattern.

        For async gRPC, we don't need the complex handler wrapping that sync interceptors require.
        Instead, we can use a much simpler pattern where we just await the continuation and
        then wrap the actual method call.

        Args:
            continuation: The continuation function to call.
            handler_call_details: Details of the handler call.

        Returns:
            grpc.RpcMethodHandler: The wrapped RPC method handler.
        """
        next_handler = await continuation(handler_call_details)

        handler_factory, next_handler_method = _get_factory_and_method(next_handler)

        async def invoke_intercept_method(request: object, context: grpc.aio.ServicerContext) -> object:
            """Invokes the intercepted async method.

            Args:
                request (object): The request object.
                context (grpc.aio.ServicerContext): The context of the async RPC call.

            Returns:
                object: The result of the intercepted method.
            """
            method_name_model = parse_method_name(handler_call_details.method)
            return await self.intercept(next_handler_method, request, context, method_name_model)

        return handler_factory(
            invoke_intercept_method,
            request_deserializer=getattr(next_handler, "request_deserializer", None),
            response_serializer=getattr(next_handler, "response_serializer", None),
        )

archipy.helpers.interceptors.grpc.base.server_interceptor.BaseAsyncGrpcServerInterceptor.intercept abstractmethod async

intercept(
    method: Callable,
    request: object,
    context: ServicerContext,
    method_name_model: MethodName,
) -> object

Intercepts an async gRPC server call.

Parameters:

Name Type Description Default
method Callable

The method to be intercepted.

required
request object

The request object.

required
context ServicerContext

The context of the RPC call.

required
method_name_model MethodName

The parsed method name containing package, service, and method components.

required

Returns:

Name Type Description
object object

The result of the intercepted method.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
@abc.abstractmethod
async def intercept(
    self,
    method: Callable,
    request: object,
    context: grpc.aio.ServicerContext,
    method_name_model: MethodName,
) -> object:
    """Intercepts an async gRPC server call.

    Args:
        method (Callable): The method to be intercepted.
        request (object): The request object.
        context (grpc.aio.ServicerContext): The context of the RPC call.
        method_name_model (MethodName): The parsed method name containing package, service, and method components.

    Returns:
        object: The result of the intercepted method.
    """
    return await method(request, context)

archipy.helpers.interceptors.grpc.base.server_interceptor.BaseAsyncGrpcServerInterceptor.intercept_service async

intercept_service(
    continuation: Callable[
        [HandlerCallDetails], Awaitable[RpcMethodHandler]
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler

Intercepts the service call using the simplified async pattern.

For async gRPC, we don't need the complex handler wrapping that sync interceptors require. Instead, we can use a much simpler pattern where we just await the continuation and then wrap the actual method call.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]]

The continuation function to call.

required
handler_call_details HandlerCallDetails

Details of the handler call.

required

Returns:

Type Description
RpcMethodHandler

grpc.RpcMethodHandler: The wrapped RPC method handler.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
async def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
    """Intercepts the service call using the simplified async pattern.

    For async gRPC, we don't need the complex handler wrapping that sync interceptors require.
    Instead, we can use a much simpler pattern where we just await the continuation and
    then wrap the actual method call.

    Args:
        continuation: The continuation function to call.
        handler_call_details: Details of the handler call.

    Returns:
        grpc.RpcMethodHandler: The wrapped RPC method handler.
    """
    next_handler = await continuation(handler_call_details)

    handler_factory, next_handler_method = _get_factory_and_method(next_handler)

    async def invoke_intercept_method(request: object, context: grpc.aio.ServicerContext) -> object:
        """Invokes the intercepted async method.

        Args:
            request (object): The request object.
            context (grpc.aio.ServicerContext): The context of the async RPC call.

        Returns:
            object: The result of the intercepted method.
        """
        method_name_model = parse_method_name(handler_call_details.method)
        return await self.intercept(next_handler_method, request, context, method_name_model)

    return handler_factory(
        invoke_intercept_method,
        request_deserializer=getattr(next_handler, "request_deserializer", None),
        response_serializer=getattr(next_handler, "response_serializer", None),
    )

archipy.helpers.interceptors.grpc.base.server_interceptor.parse_method_name

parse_method_name(method_name: str) -> MethodName

Parses a gRPC method name into its components.

Parameters:

Name Type Description Default
method_name str

The full method name (e.g., "/package.service/method").

required

Returns:

Name Type Description
MethodName MethodName

A MethodName object containing the parsed components.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
def parse_method_name(method_name: str) -> MethodName:
    """Parses a gRPC method name into its components.

    Args:
        method_name (str): The full method name (e.g., "/package.service/method").

    Returns:
        MethodName: A `MethodName` object containing the parsed components.
    """
    method_full_name = method_name.replace("/", "", 1)
    package_and_service, method = method_full_name.split("/")
    *maybe_package, service = package_and_service.rsplit(".", maxsplit=1)
    package = maybe_package[0] if maybe_package else ""
    return MethodName(full_name=method_full_name, package=package, service=service, method=method)

options: show_root_toc_entry: false heading_level: 3

exception

gRPC server interceptor that catches exceptions and converts them to gRPC status codes.

archipy.helpers.interceptors.grpc.exception.server_interceptor.GrpcServerExceptionInterceptor

Bases: BaseGrpcServerInterceptor

A sync gRPC server interceptor for centralized exception handling.

This interceptor catches all exceptions thrown by gRPC service methods and converts them to appropriate gRPC errors, eliminating the need for repetitive try-catch blocks in each service method.

Source code in archipy/helpers/interceptors/grpc/exception/server_interceptor.py
class GrpcServerExceptionInterceptor(BaseGrpcServerInterceptor):
    """A sync gRPC server interceptor for centralized exception handling.

    This interceptor catches all exceptions thrown by gRPC service methods and
    converts them to appropriate gRPC errors, eliminating the need for repetitive
    try-catch blocks in each service method.
    """

    def intercept(
        self,
        method: Callable,
        request: object,
        context: grpc.ServicerContext,
        method_name_model: MethodName,
    ) -> object:
        """Intercepts a sync gRPC server call and handles exceptions.

        Args:
            method: The sync gRPC method being intercepted.
            request: The request object passed to the method.
            context: The context of the sync gRPC call.
            method_name_model: The parsed method name containing package, service, and method components.

        Returns:
            object: The result of the intercepted gRPC method.

        Note:
            This method will not return anything if an exception is handled,
            as the exception handling will abort the gRPC context.
        """
        try:
            # Execute the gRPC method
            result = method(request, context)

        except ValidationError as validation_error:
            BaseUtils.capture_exception(validation_error)
            self._handle_validation_error(validation_error, context)
            raise  # This will never be reached, but satisfies MyPy

        except BaseError as base_error:
            BaseUtils.capture_exception(base_error)
            base_error.abort_grpc_sync(context)
            raise  # This will never be reached, but satisfies MyPy

        except Exception as unexpected_error:
            BaseUtils.capture_exception(unexpected_error)
            self._handle_unexpected_error(unexpected_error, context, method_name_model)
            raise  # This will never be reached, but satisfies MyPy
        else:
            return result

    @staticmethod
    def _handle_validation_error(validation_error: ValidationError, context: grpc.ServicerContext) -> None:
        """Handle Pydantic validation errors.

        Args:
            validation_error: The validation error to handle.
            context: The gRPC context to abort.
        """
        # Format validation errors for better debugging
        validation_details = BaseUtils.format_validation_errors(validation_error, include_type=True)

        InvalidArgumentError(
            argument_name="request_validation",
            additional_data={"validation_errors": validation_details, "error_count": len(validation_error.errors())},
        ).abort_grpc_sync(context)

    @staticmethod
    def _handle_unexpected_error(
        error: Exception,
        context: grpc.ServicerContext,
        method_name_model: MethodName,
    ) -> None:
        """Handle unexpected errors by converting them to internal errors.

        Args:
            error: The unexpected error to handle.
            context: The gRPC context to abort.
            method_name_model: The method name information for better error tracking.
        """
        # Capture the exception for monitoring
        InternalError(
            additional_data={
                "original_error": str(error),
                "error_type": type(error).__name__,
                "service": method_name_model.service,
                "method": method_name_model.method,
                "package": method_name_model.package,
            },
        ).abort_grpc_sync(context)

    @staticmethod
    def _format_validation_errors(validation_error: ValidationError) -> list[dict[str, str]]:
        """Format Pydantic validation errors into a structured format.

        Args:
            validation_error: The validation error to format.

        Returns:
            A list of formatted validation error details.

        Note:
            This method is deprecated. Use BaseUtils.format_validation_errors instead.
        """
        return BaseUtils.format_validation_errors(validation_error, include_type=True)

archipy.helpers.interceptors.grpc.exception.server_interceptor.GrpcServerExceptionInterceptor.intercept

intercept(
    method: Callable,
    request: object,
    context: ServicerContext,
    method_name_model: MethodName,
) -> object

Intercepts a sync gRPC server call and handles exceptions.

Parameters:

Name Type Description Default
method Callable

The sync gRPC method being intercepted.

required
request object

The request object passed to the method.

required
context ServicerContext

The context of the sync gRPC call.

required
method_name_model MethodName

The parsed method name containing package, service, and method components.

required

Returns:

Name Type Description
object object

The result of the intercepted gRPC method.

Note

This method will not return anything if an exception is handled, as the exception handling will abort the gRPC context.

Source code in archipy/helpers/interceptors/grpc/exception/server_interceptor.py
def intercept(
    self,
    method: Callable,
    request: object,
    context: grpc.ServicerContext,
    method_name_model: MethodName,
) -> object:
    """Intercepts a sync gRPC server call and handles exceptions.

    Args:
        method: The sync gRPC method being intercepted.
        request: The request object passed to the method.
        context: The context of the sync gRPC call.
        method_name_model: The parsed method name containing package, service, and method components.

    Returns:
        object: The result of the intercepted gRPC method.

    Note:
        This method will not return anything if an exception is handled,
        as the exception handling will abort the gRPC context.
    """
    try:
        # Execute the gRPC method
        result = method(request, context)

    except ValidationError as validation_error:
        BaseUtils.capture_exception(validation_error)
        self._handle_validation_error(validation_error, context)
        raise  # This will never be reached, but satisfies MyPy

    except BaseError as base_error:
        BaseUtils.capture_exception(base_error)
        base_error.abort_grpc_sync(context)
        raise  # This will never be reached, but satisfies MyPy

    except Exception as unexpected_error:
        BaseUtils.capture_exception(unexpected_error)
        self._handle_unexpected_error(unexpected_error, context, method_name_model)
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

archipy.helpers.interceptors.grpc.exception.server_interceptor.GrpcServerExceptionInterceptor.intercept_service

intercept_service(
    continuation: Callable[
        [HandlerCallDetails], RpcMethodHandler | None
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler | None

Intercepts the service call and wraps the handler with custom logic.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], RpcMethodHandler | None]

The continuation function to call.

required
handler_call_details HandlerCallDetails

Details of the handler call.

required

Returns:

Type Description
RpcMethodHandler | None

grpc.RpcMethodHandler: The wrapped RPC method handler.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler | None],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler | None:
    """Intercepts the service call and wraps the handler with custom logic.

    Args:
        continuation: The continuation function to call.
        handler_call_details: Details of the handler call.

    Returns:
        grpc.RpcMethodHandler: The wrapped RPC method handler.
    """
    next_handler = continuation(handler_call_details)
    if next_handler is None:
        return None

    handler_factory, next_handler_method = _get_factory_and_method(next_handler)

    def invoke_intercept_method(request: object, context: grpc.ServicerContext) -> object:
        """Invokes the intercepted method.

        Args:
            request (object): The request object.
            context (grpc.ServicerContext): The context of the RPC call.

        Returns:
            object: The result of the intercepted method.
        """
        method_name_model = parse_method_name(handler_call_details.method)
        return self.intercept(next_handler_method, request, context, method_name_model)

    return handler_factory(
        invoke_intercept_method,
        request_deserializer=next_handler.request_deserializer,
        response_serializer=next_handler.response_serializer,
    )

archipy.helpers.interceptors.grpc.exception.server_interceptor.AsyncGrpcServerExceptionInterceptor

Bases: BaseAsyncGrpcServerInterceptor

An async gRPC server interceptor for centralized exception handling.

This interceptor catches all exceptions thrown by gRPC service methods and converts them to appropriate gRPC errors, eliminating the need for repetitive try-catch blocks in each service method.

Source code in archipy/helpers/interceptors/grpc/exception/server_interceptor.py
class AsyncGrpcServerExceptionInterceptor(BaseAsyncGrpcServerInterceptor):
    """An async gRPC server interceptor for centralized exception handling.

    This interceptor catches all exceptions thrown by gRPC service methods and
    converts them to appropriate gRPC errors, eliminating the need for repetitive
    try-catch blocks in each service method.
    """

    async def intercept(
        self,
        method: Callable,
        request: object,
        context: grpc.aio.ServicerContext,
        method_name_model: MethodName,
    ) -> object:
        """Intercepts an async gRPC server call and handles exceptions.

        Args:
            method: The async gRPC method being intercepted.
            request: The request object passed to the method.
            context: The context of the async gRPC call.
            method_name_model: The parsed method name containing package, service, and method components.

        Returns:
            object: The result of the intercepted gRPC method.

        Note:
            This method will not return anything if an exception is handled,
            as the exception handling will abort the gRPC context.
        """
        try:
            # Execute the gRPC method
            result = await method(request, context)

        except ValidationError as validation_error:
            BaseUtils.capture_exception(validation_error)
            await self._handle_validation_error(validation_error, context)
            raise  # This will never be reached, but satisfies MyPy

        except BaseError as base_error:
            BaseUtils.capture_exception(base_error)
            await base_error.abort_grpc_async(context)
            raise  # This will never be reached, but satisfies MyPy

        except Exception as unexpected_error:
            BaseUtils.capture_exception(unexpected_error)
            await self._handle_unexpected_error(unexpected_error, context, method_name_model)
            raise  # This will never be reached, but satisfies MyPy
        else:
            return result

    @staticmethod
    async def _handle_validation_error(validation_error: ValidationError, context: grpc.aio.ServicerContext) -> None:
        """Handle Pydantic validation errors.

        Args:
            validation_error: The validation error to handle.
            context: The gRPC context to abort.
        """
        # Format validation errors for better debugging
        validation_details = BaseUtils.format_validation_errors(validation_error, include_type=True)

        await InvalidArgumentError(
            argument_name="request_validation",
            additional_data={"validation_errors": validation_details, "error_count": len(validation_error.errors())},
        ).abort_grpc_async(context)

    @staticmethod
    async def _handle_unexpected_error(
        error: Exception,
        context: grpc.aio.ServicerContext,
        method_name_model: MethodName,
    ) -> None:
        """Handle unexpected errors by converting them to internal errors.

        Args:
            error: The unexpected error to handle.
            context: The gRPC context to abort.
            method_name_model: The method name information for better error tracking.
        """
        # Capture the exception for monitoring
        await InternalError(
            additional_data={
                "original_error": str(error),
                "error_type": type(error).__name__,
                "service": method_name_model.service,
                "method": method_name_model.method,
                "package": method_name_model.package,
            },
        ).abort_grpc_async(context)

    @staticmethod
    def _format_validation_errors(validation_error: ValidationError) -> list[dict[str, str]]:
        """Format Pydantic validation errors into a structured format.

        Args:
            validation_error: The validation error to format.

        Returns:
            A list of formatted validation error details.

        Note:
            This method is deprecated. Use BaseUtils.format_validation_errors instead.
        """
        return BaseUtils.format_validation_errors(validation_error, include_type=True)

archipy.helpers.interceptors.grpc.exception.server_interceptor.AsyncGrpcServerExceptionInterceptor.intercept async

intercept(
    method: Callable,
    request: object,
    context: ServicerContext,
    method_name_model: MethodName,
) -> object

Intercepts an async gRPC server call and handles exceptions.

Parameters:

Name Type Description Default
method Callable

The async gRPC method being intercepted.

required
request object

The request object passed to the method.

required
context ServicerContext

The context of the async gRPC call.

required
method_name_model MethodName

The parsed method name containing package, service, and method components.

required

Returns:

Name Type Description
object object

The result of the intercepted gRPC method.

Note

This method will not return anything if an exception is handled, as the exception handling will abort the gRPC context.

Source code in archipy/helpers/interceptors/grpc/exception/server_interceptor.py
async def intercept(
    self,
    method: Callable,
    request: object,
    context: grpc.aio.ServicerContext,
    method_name_model: MethodName,
) -> object:
    """Intercepts an async gRPC server call and handles exceptions.

    Args:
        method: The async gRPC method being intercepted.
        request: The request object passed to the method.
        context: The context of the async gRPC call.
        method_name_model: The parsed method name containing package, service, and method components.

    Returns:
        object: The result of the intercepted gRPC method.

    Note:
        This method will not return anything if an exception is handled,
        as the exception handling will abort the gRPC context.
    """
    try:
        # Execute the gRPC method
        result = await method(request, context)

    except ValidationError as validation_error:
        BaseUtils.capture_exception(validation_error)
        await self._handle_validation_error(validation_error, context)
        raise  # This will never be reached, but satisfies MyPy

    except BaseError as base_error:
        BaseUtils.capture_exception(base_error)
        await base_error.abort_grpc_async(context)
        raise  # This will never be reached, but satisfies MyPy

    except Exception as unexpected_error:
        BaseUtils.capture_exception(unexpected_error)
        await self._handle_unexpected_error(unexpected_error, context, method_name_model)
        raise  # This will never be reached, but satisfies MyPy
    else:
        return result

archipy.helpers.interceptors.grpc.exception.server_interceptor.AsyncGrpcServerExceptionInterceptor.intercept_service async

intercept_service(
    continuation: Callable[
        [HandlerCallDetails], Awaitable[RpcMethodHandler]
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler

Intercepts the service call using the simplified async pattern.

For async gRPC, we don't need the complex handler wrapping that sync interceptors require. Instead, we can use a much simpler pattern where we just await the continuation and then wrap the actual method call.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]]

The continuation function to call.

required
handler_call_details HandlerCallDetails

Details of the handler call.

required

Returns:

Type Description
RpcMethodHandler

grpc.RpcMethodHandler: The wrapped RPC method handler.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
async def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
    """Intercepts the service call using the simplified async pattern.

    For async gRPC, we don't need the complex handler wrapping that sync interceptors require.
    Instead, we can use a much simpler pattern where we just await the continuation and
    then wrap the actual method call.

    Args:
        continuation: The continuation function to call.
        handler_call_details: Details of the handler call.

    Returns:
        grpc.RpcMethodHandler: The wrapped RPC method handler.
    """
    next_handler = await continuation(handler_call_details)

    handler_factory, next_handler_method = _get_factory_and_method(next_handler)

    async def invoke_intercept_method(request: object, context: grpc.aio.ServicerContext) -> object:
        """Invokes the intercepted async method.

        Args:
            request (object): The request object.
            context (grpc.aio.ServicerContext): The context of the async RPC call.

        Returns:
            object: The result of the intercepted method.
        """
        method_name_model = parse_method_name(handler_call_details.method)
        return await self.intercept(next_handler_method, request, context, method_name_model)

    return handler_factory(
        invoke_intercept_method,
        request_deserializer=getattr(next_handler, "request_deserializer", None),
        response_serializer=getattr(next_handler, "response_serializer", None),
    )

options: show_root_toc_entry: false heading_level: 3

metric

gRPC server interceptor for collecting Prometheus metrics on RPC calls.

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor

Bases: BaseGrpcServerInterceptor

A gRPC server interceptor for collecting and reporting metrics using Prometheus.

This interceptor measures the response time of gRPC methods and records it in a Prometheus histogram. It also tracks the number of active requests using a Prometheus gauge. It also captures errors and logs them for monitoring purposes.

Source code in archipy/helpers/interceptors/grpc/metric/server_interceptor.py
class GrpcServerMetricInterceptor(BaseGrpcServerInterceptor):
    """A gRPC server interceptor for collecting and reporting metrics using Prometheus.

    This interceptor measures the response time of gRPC methods and records it in a Prometheus histogram.
    It also tracks the number of active requests using a Prometheus gauge.
    It also captures errors and logs them for monitoring purposes.
    """

    from prometheus_client import Gauge, Histogram

    "Buckets for measuring response times between 0 and 1 second."
    ZERO_TO_ONE_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 1000 for i in range(0, 1000, 5)]

    "Buckets for measuring response times between 1 and 5 seconds."
    ONE_TO_FIVE_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 100 for i in range(100, 500, 20)]

    "Buckets for measuring response times between 5 and 30 seconds."
    FIVE_TO_THIRTY_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 100 for i in range(500, 3000, 50)]

    "Combined buckets for measuring response times from 0 to 30 seconds and beyond."
    TOTAL_BUCKETS = (
        ZERO_TO_ONE_SECONDS_BUCKETS + ONE_TO_FIVE_SECONDS_BUCKETS + FIVE_TO_THIRTY_SECONDS_BUCKETS + [float("inf")]
    )

    "Prometheus histogram for tracking response times of gRPC methods."
    RESPONSE_TIME_SECONDS = Histogram(
        "grpc_response_time_seconds",
        "Time spent processing gRPC request",
        labelnames=("package", "service", "method", "status_code"),
        buckets=TOTAL_BUCKETS,
    )

    "Prometheus gauge for tracking active gRPC requests."
    ACTIVE_REQUESTS = Gauge(
        "grpc_active_requests",
        "Number of active gRPC requests",
        labelnames=("package", "service", "method"),
    )

    def intercept(
        self,
        method: Callable,
        request: object,
        context: grpc.ServicerContext,
        method_name_model: MethodName,
    ) -> object:
        """Intercepts a gRPC server call to measure response time and track active requests.

        Args:
            method (Callable): The gRPC method being intercepted.
            request (object): The request object passed to the method.
            context (grpc.ServicerContext): The context of the gRPC call.
            method_name_model (MethodName): The parsed method name containing package, service, and method components.

        Returns:
            object: The result of the intercepted gRPC method.

        Raises:
            Exception: If an exception occurs during the method execution, it is captured and logged.
        """
        if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
            return method(request, context)

        package = method_name_model.package
        service = method_name_model.service
        method_name = method_name_model.method

        self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).inc()

        start_time = time.time()
        status_code = "OK"

        try:
            result = method(request, context)

            if hasattr(context, "code") and callable(context.code):
                code_method = cast("Callable[[], Any]", context.code)
                code_obj = code_method()
                if code_obj is not None:
                    code_name = getattr(code_obj, "name", None)
                    if code_name is not None:
                        status_code = code_name
        except Exception as exception:
            BaseUtils.capture_exception(exception)
            raise
        else:
            return result
        finally:
            duration = time.time() - start_time
            self.RESPONSE_TIME_SECONDS.labels(
                package=package,
                service=service,
                method=method_name,
                status_code=status_code,
            ).observe(duration)
            self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).dec()

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor.ZERO_TO_ONE_SECONDS_BUCKETS class-attribute

ZERO_TO_ONE_SECONDS_BUCKETS: list[float] = [
    (i / 1000) for i in (range(0, 1000, 5))
]

Buckets for measuring response times between 1 and 5 seconds.

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor.ONE_TO_FIVE_SECONDS_BUCKETS class-attribute

ONE_TO_FIVE_SECONDS_BUCKETS: list[float] = [
    (i / 100) for i in (range(100, 500, 20))
]

Buckets for measuring response times between 5 and 30 seconds.

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor.FIVE_TO_THIRTY_SECONDS_BUCKETS class-attribute

FIVE_TO_THIRTY_SECONDS_BUCKETS: list[float] = [
    (i / 100) for i in (range(500, 3000, 50))
]

Combined buckets for measuring response times from 0 to 30 seconds and beyond.

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor.TOTAL_BUCKETS class-attribute instance-attribute

TOTAL_BUCKETS = (
    ZERO_TO_ONE_SECONDS_BUCKETS
    + ONE_TO_FIVE_SECONDS_BUCKETS
    + FIVE_TO_THIRTY_SECONDS_BUCKETS
    + [float("inf")]
)

Prometheus histogram for tracking response times of gRPC methods.

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor.RESPONSE_TIME_SECONDS class-attribute instance-attribute

RESPONSE_TIME_SECONDS = Histogram(
    "grpc_response_time_seconds",
    "Time spent processing gRPC request",
    labelnames=(
        "package",
        "service",
        "method",
        "status_code",
    ),
    buckets=TOTAL_BUCKETS,
)

Prometheus gauge for tracking active gRPC requests.

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor.ACTIVE_REQUESTS class-attribute instance-attribute

ACTIVE_REQUESTS = Gauge(
    "grpc_active_requests",
    "Number of active gRPC requests",
    labelnames=("package", "service", "method"),
)

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor.intercept

intercept(
    method: Callable,
    request: object,
    context: ServicerContext,
    method_name_model: MethodName,
) -> object

Intercepts a gRPC server call to measure response time and track active requests.

Parameters:

Name Type Description Default
method Callable

The gRPC method being intercepted.

required
request object

The request object passed to the method.

required
context ServicerContext

The context of the gRPC call.

required
method_name_model MethodName

The parsed method name containing package, service, and method components.

required

Returns:

Name Type Description
object object

The result of the intercepted gRPC method.

Raises:

Type Description
Exception

If an exception occurs during the method execution, it is captured and logged.

Source code in archipy/helpers/interceptors/grpc/metric/server_interceptor.py
def intercept(
    self,
    method: Callable,
    request: object,
    context: grpc.ServicerContext,
    method_name_model: MethodName,
) -> object:
    """Intercepts a gRPC server call to measure response time and track active requests.

    Args:
        method (Callable): The gRPC method being intercepted.
        request (object): The request object passed to the method.
        context (grpc.ServicerContext): The context of the gRPC call.
        method_name_model (MethodName): The parsed method name containing package, service, and method components.

    Returns:
        object: The result of the intercepted gRPC method.

    Raises:
        Exception: If an exception occurs during the method execution, it is captured and logged.
    """
    if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
        return method(request, context)

    package = method_name_model.package
    service = method_name_model.service
    method_name = method_name_model.method

    self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).inc()

    start_time = time.time()
    status_code = "OK"

    try:
        result = method(request, context)

        if hasattr(context, "code") and callable(context.code):
            code_method = cast("Callable[[], Any]", context.code)
            code_obj = code_method()
            if code_obj is not None:
                code_name = getattr(code_obj, "name", None)
                if code_name is not None:
                    status_code = code_name
    except Exception as exception:
        BaseUtils.capture_exception(exception)
        raise
    else:
        return result
    finally:
        duration = time.time() - start_time
        self.RESPONSE_TIME_SECONDS.labels(
            package=package,
            service=service,
            method=method_name,
            status_code=status_code,
        ).observe(duration)
        self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).dec()

archipy.helpers.interceptors.grpc.metric.server_interceptor.GrpcServerMetricInterceptor.intercept_service

intercept_service(
    continuation: Callable[
        [HandlerCallDetails], RpcMethodHandler | None
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler | None

Intercepts the service call and wraps the handler with custom logic.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], RpcMethodHandler | None]

The continuation function to call.

required
handler_call_details HandlerCallDetails

Details of the handler call.

required

Returns:

Type Description
RpcMethodHandler | None

grpc.RpcMethodHandler: The wrapped RPC method handler.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler | None],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler | None:
    """Intercepts the service call and wraps the handler with custom logic.

    Args:
        continuation: The continuation function to call.
        handler_call_details: Details of the handler call.

    Returns:
        grpc.RpcMethodHandler: The wrapped RPC method handler.
    """
    next_handler = continuation(handler_call_details)
    if next_handler is None:
        return None

    handler_factory, next_handler_method = _get_factory_and_method(next_handler)

    def invoke_intercept_method(request: object, context: grpc.ServicerContext) -> object:
        """Invokes the intercepted method.

        Args:
            request (object): The request object.
            context (grpc.ServicerContext): The context of the RPC call.

        Returns:
            object: The result of the intercepted method.
        """
        method_name_model = parse_method_name(handler_call_details.method)
        return self.intercept(next_handler_method, request, context, method_name_model)

    return handler_factory(
        invoke_intercept_method,
        request_deserializer=next_handler.request_deserializer,
        response_serializer=next_handler.response_serializer,
    )

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor

Bases: BaseAsyncGrpcServerInterceptor

An async gRPC server interceptor for collecting and reporting metrics using Prometheus.

This interceptor measures the response time of async gRPC methods and records it in a Prometheus histogram. It also tracks the number of active requests using a Prometheus gauge. It also captures errors and logs them for monitoring purposes.

Source code in archipy/helpers/interceptors/grpc/metric/server_interceptor.py
class AsyncGrpcServerMetricInterceptor(BaseAsyncGrpcServerInterceptor):
    """An async gRPC server interceptor for collecting and reporting metrics using Prometheus.

    This interceptor measures the response time of async gRPC methods and records it in a Prometheus histogram.
    It also tracks the number of active requests using a Prometheus gauge.
    It also captures errors and logs them for monitoring purposes.
    """

    from prometheus_client import Gauge, Histogram

    "Buckets for measuring response times between 0 and 1 second."
    ZERO_TO_ONE_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 1000 for i in range(0, 1000, 5)]

    "Buckets for measuring response times between 1 and 5 seconds."
    ONE_TO_FIVE_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 100 for i in range(100, 500, 20)]

    "Buckets for measuring response times between 5 and 30 seconds."
    FIVE_TO_THIRTY_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 100 for i in range(500, 3000, 50)]

    "Combined buckets for measuring response times from 0 to 30 seconds and beyond."
    TOTAL_BUCKETS = (
        ZERO_TO_ONE_SECONDS_BUCKETS + ONE_TO_FIVE_SECONDS_BUCKETS + FIVE_TO_THIRTY_SECONDS_BUCKETS + [float("inf")]
    )

    "Prometheus histogram for tracking response times of async gRPC methods."
    RESPONSE_TIME_SECONDS = Histogram(
        "grpc_async_response_time_seconds",
        "Time spent processing async gRPC request",
        labelnames=("package", "service", "method", "status_code"),
        buckets=TOTAL_BUCKETS,
    )

    "Prometheus gauge for tracking active async gRPC requests."
    ACTIVE_REQUESTS = Gauge(
        "grpc_async_active_requests",
        "Number of active async gRPC requests",
        labelnames=("package", "service", "method"),
    )

    async def intercept(
        self,
        method: Callable,
        request: object,
        context: grpc.aio.ServicerContext,
        method_name_model: MethodName,
    ) -> object:
        """Intercepts an async gRPC server call to measure response time and track active requests.

        Args:
            method (Callable): The async gRPC method being intercepted.
            request (object): The request object passed to the method.
            context (grpc.aio.ServicerContext): The context of the async gRPC call.
            method_name_model (MethodName): The parsed method name containing package, service, and method components.

        Returns:
            object: The result of the intercepted gRPC method.

        Raises:
            Exception: If an exception occurs during the method execution, it is captured and logged.
        """
        if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
            return await method(request, context)

        package = method_name_model.package
        service = method_name_model.service
        method_name = method_name_model.method

        self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).inc()

        start_time = asyncio.get_event_loop().time()
        status_code = "OK"

        try:
            try:
                result = await method(request, context)

                if hasattr(context, "code") and context.code():
                    status_code = context.code().name
            except Exception as e:
                if isinstance(e, grpc.aio.AioRpcError):
                    code_obj = e.code()
                    if code_obj is not None:
                        code_name = getattr(code_obj, "name", None)
                        if code_name is not None:
                            status_code = code_name
                elif hasattr(e, "code") and callable(e.code):
                    code_method = cast("Callable[[], Any]", e.code)
                    code_obj = code_method()
                    if code_obj is not None:
                        code_name = getattr(code_obj, "name", None)
                        if code_name is not None:
                            status_code = code_name
                else:
                    status_code = "INTERNAL"
                raise
            else:
                return result
            finally:
                duration = asyncio.get_event_loop().time() - start_time
                self.RESPONSE_TIME_SECONDS.labels(
                    package=package,
                    service=service,
                    method=method_name,
                    status_code=status_code,
                ).observe(duration)
                self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).dec()

        except Exception as exception:
            BaseUtils.capture_exception(exception)
            raise

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor.ZERO_TO_ONE_SECONDS_BUCKETS class-attribute

ZERO_TO_ONE_SECONDS_BUCKETS: list[float] = [
    (i / 1000) for i in (range(0, 1000, 5))
]

Buckets for measuring response times between 1 and 5 seconds.

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor.ONE_TO_FIVE_SECONDS_BUCKETS class-attribute

ONE_TO_FIVE_SECONDS_BUCKETS: list[float] = [
    (i / 100) for i in (range(100, 500, 20))
]

Buckets for measuring response times between 5 and 30 seconds.

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor.FIVE_TO_THIRTY_SECONDS_BUCKETS class-attribute

FIVE_TO_THIRTY_SECONDS_BUCKETS: list[float] = [
    (i / 100) for i in (range(500, 3000, 50))
]

Combined buckets for measuring response times from 0 to 30 seconds and beyond.

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor.TOTAL_BUCKETS class-attribute instance-attribute

TOTAL_BUCKETS = (
    ZERO_TO_ONE_SECONDS_BUCKETS
    + ONE_TO_FIVE_SECONDS_BUCKETS
    + FIVE_TO_THIRTY_SECONDS_BUCKETS
    + [float("inf")]
)

Prometheus histogram for tracking response times of async gRPC methods.

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor.RESPONSE_TIME_SECONDS class-attribute instance-attribute

RESPONSE_TIME_SECONDS = Histogram(
    "grpc_async_response_time_seconds",
    "Time spent processing async gRPC request",
    labelnames=(
        "package",
        "service",
        "method",
        "status_code",
    ),
    buckets=TOTAL_BUCKETS,
)

Prometheus gauge for tracking active async gRPC requests.

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor.ACTIVE_REQUESTS class-attribute instance-attribute

ACTIVE_REQUESTS = Gauge(
    "grpc_async_active_requests",
    "Number of active async gRPC requests",
    labelnames=("package", "service", "method"),
)

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor.intercept async

intercept(
    method: Callable,
    request: object,
    context: ServicerContext,
    method_name_model: MethodName,
) -> object

Intercepts an async gRPC server call to measure response time and track active requests.

Parameters:

Name Type Description Default
method Callable

The async gRPC method being intercepted.

required
request object

The request object passed to the method.

required
context ServicerContext

The context of the async gRPC call.

required
method_name_model MethodName

The parsed method name containing package, service, and method components.

required

Returns:

Name Type Description
object object

The result of the intercepted gRPC method.

Raises:

Type Description
Exception

If an exception occurs during the method execution, it is captured and logged.

Source code in archipy/helpers/interceptors/grpc/metric/server_interceptor.py
async def intercept(
    self,
    method: Callable,
    request: object,
    context: grpc.aio.ServicerContext,
    method_name_model: MethodName,
) -> object:
    """Intercepts an async gRPC server call to measure response time and track active requests.

    Args:
        method (Callable): The async gRPC method being intercepted.
        request (object): The request object passed to the method.
        context (grpc.aio.ServicerContext): The context of the async gRPC call.
        method_name_model (MethodName): The parsed method name containing package, service, and method components.

    Returns:
        object: The result of the intercepted gRPC method.

    Raises:
        Exception: If an exception occurs during the method execution, it is captured and logged.
    """
    if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
        return await method(request, context)

    package = method_name_model.package
    service = method_name_model.service
    method_name = method_name_model.method

    self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).inc()

    start_time = asyncio.get_event_loop().time()
    status_code = "OK"

    try:
        try:
            result = await method(request, context)

            if hasattr(context, "code") and context.code():
                status_code = context.code().name
        except Exception as e:
            if isinstance(e, grpc.aio.AioRpcError):
                code_obj = e.code()
                if code_obj is not None:
                    code_name = getattr(code_obj, "name", None)
                    if code_name is not None:
                        status_code = code_name
            elif hasattr(e, "code") and callable(e.code):
                code_method = cast("Callable[[], Any]", e.code)
                code_obj = code_method()
                if code_obj is not None:
                    code_name = getattr(code_obj, "name", None)
                    if code_name is not None:
                        status_code = code_name
            else:
                status_code = "INTERNAL"
            raise
        else:
            return result
        finally:
            duration = asyncio.get_event_loop().time() - start_time
            self.RESPONSE_TIME_SECONDS.labels(
                package=package,
                service=service,
                method=method_name,
                status_code=status_code,
            ).observe(duration)
            self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).dec()

    except Exception as exception:
        BaseUtils.capture_exception(exception)
        raise

archipy.helpers.interceptors.grpc.metric.server_interceptor.AsyncGrpcServerMetricInterceptor.intercept_service async

intercept_service(
    continuation: Callable[
        [HandlerCallDetails], Awaitable[RpcMethodHandler]
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler

Intercepts the service call using the simplified async pattern.

For async gRPC, we don't need the complex handler wrapping that sync interceptors require. Instead, we can use a much simpler pattern where we just await the continuation and then wrap the actual method call.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]]

The continuation function to call.

required
handler_call_details HandlerCallDetails

Details of the handler call.

required

Returns:

Type Description
RpcMethodHandler

grpc.RpcMethodHandler: The wrapped RPC method handler.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
async def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
    """Intercepts the service call using the simplified async pattern.

    For async gRPC, we don't need the complex handler wrapping that sync interceptors require.
    Instead, we can use a much simpler pattern where we just await the continuation and
    then wrap the actual method call.

    Args:
        continuation: The continuation function to call.
        handler_call_details: Details of the handler call.

    Returns:
        grpc.RpcMethodHandler: The wrapped RPC method handler.
    """
    next_handler = await continuation(handler_call_details)

    handler_factory, next_handler_method = _get_factory_and_method(next_handler)

    async def invoke_intercept_method(request: object, context: grpc.aio.ServicerContext) -> object:
        """Invokes the intercepted async method.

        Args:
            request (object): The request object.
            context (grpc.aio.ServicerContext): The context of the async RPC call.

        Returns:
            object: The result of the intercepted method.
        """
        method_name_model = parse_method_name(handler_call_details.method)
        return await self.intercept(next_handler_method, request, context, method_name_model)

    return handler_factory(
        invoke_intercept_method,
        request_deserializer=getattr(next_handler, "request_deserializer", None),
        response_serializer=getattr(next_handler, "response_serializer", None),
    )

options: show_root_toc_entry: false heading_level: 3

trace

gRPC interceptors for propagating distributed tracing context across client and server.

archipy.helpers.interceptors.grpc.trace.client_interceptor.logger module-attribute

logger = getLogger(__name__)

archipy.helpers.interceptors.grpc.trace.client_interceptor.GrpcClientTraceInterceptor

Bases: BaseGrpcClientInterceptor

A gRPC client interceptor for tracing requests using Elastic APM and Sentry APM.

This interceptor injects the Elastic APM trace parent header into gRPC client requests to enable distributed tracing across services. It also creates Sentry transactions to monitor the performance of gRPC calls.

Source code in archipy/helpers/interceptors/grpc/trace/client_interceptor.py
class GrpcClientTraceInterceptor(BaseGrpcClientInterceptor):
    """A gRPC client interceptor for tracing requests using Elastic APM and Sentry APM.

    This interceptor injects the Elastic APM trace parent header into gRPC client requests
    to enable distributed tracing across services. It also creates Sentry transactions
    to monitor the performance of gRPC calls.
    """

    def intercept(self, method: Callable, request_or_iterator: Any, call_details: grpc.ClientCallDetails) -> Any:
        """Intercepts a gRPC client call to inject the Elastic APM trace parent header and monitor performance with Sentry.

        Args:
            method (Callable): The gRPC method being intercepted.
            request_or_iterator (Any): The request or request iterator.
            call_details (grpc.ClientCallDetails): Details of the gRPC call.

        Returns:
            Any: The result of the intercepted gRPC method.

        Notes:
            - If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
            - Creates Sentry spans for tracing gRPC client calls.
            - Injects Elastic APM trace parent header when available.
        """
        config = BaseConfig.global_config()

        # Skip tracing if both APM systems are disabled
        if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
            return method(request_or_iterator, call_details)

        # Initialize Sentry span if enabled
        sentry_span = None
        if config.SENTRY.IS_ENABLED:
            try:
                import sentry_sdk

                sentry_span = sentry_sdk.start_span(
                    op="grpc.client",
                    description=f"gRPC client call to {call_details.method}",
                )
                sentry_span.__enter__()
            except ImportError:
                logger.debug("sentry_sdk is not installed, skipping Sentry span creation.")
            except Exception:
                logger.exception("Failed to create Sentry span for gRPC client call")

        # Handle Elastic APM trace propagation
        metadata = list(call_details.metadata or [])
        if config.ELASTIC_APM.IS_ENABLED:
            trace_parent_id = elasticapm.get_trace_parent_header()
            if trace_parent_id:
                metadata.append((TRACEPARENT_HEADER_NAME, f"{trace_parent_id}"))

        # Create new call details with updated metadata
        new_details = ClientCallDetails(
            method=call_details.method,
            timeout=call_details.timeout,
            metadata=metadata,
            credentials=call_details.credentials,
            wait_for_ready=call_details.wait_for_ready,
            compression=call_details.compression,
        )

        try:
            # Execute the gRPC method with the updated call details
            result = method(request_or_iterator, new_details)
        except Exception as e:
            # Mark Sentry span as failed and capture exception
            if sentry_span:
                sentry_span.set_status("internal_error")
                sentry_span.set_tag("error", True)
                sentry_span.set_data("exception", str(e))
            raise
        else:
            # Mark Sentry span as successful
            if sentry_span:
                sentry_span.set_status("ok")
            return result
        finally:
            # Clean up Sentry span
            if sentry_span:
                try:
                    sentry_span.__exit__(None, None, None)
                except Exception:
                    logger.exception("Error closing Sentry span")

archipy.helpers.interceptors.grpc.trace.client_interceptor.GrpcClientTraceInterceptor.intercept

intercept(
    method: Callable,
    request_or_iterator: Any,
    call_details: ClientCallDetails,
) -> Any

Intercepts a gRPC client call to inject the Elastic APM trace parent header and monitor performance with Sentry.

Parameters:

Name Type Description Default
method Callable

The gRPC method being intercepted.

required
request_or_iterator Any

The request or request iterator.

required
call_details ClientCallDetails

Details of the gRPC call.

required

Returns:

Name Type Description
Any Any

The result of the intercepted gRPC method.

Notes
  • If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
  • Creates Sentry spans for tracing gRPC client calls.
  • Injects Elastic APM trace parent header when available.
Source code in archipy/helpers/interceptors/grpc/trace/client_interceptor.py
def intercept(self, method: Callable, request_or_iterator: Any, call_details: grpc.ClientCallDetails) -> Any:
    """Intercepts a gRPC client call to inject the Elastic APM trace parent header and monitor performance with Sentry.

    Args:
        method (Callable): The gRPC method being intercepted.
        request_or_iterator (Any): The request or request iterator.
        call_details (grpc.ClientCallDetails): Details of the gRPC call.

    Returns:
        Any: The result of the intercepted gRPC method.

    Notes:
        - If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
        - Creates Sentry spans for tracing gRPC client calls.
        - Injects Elastic APM trace parent header when available.
    """
    config = BaseConfig.global_config()

    # Skip tracing if both APM systems are disabled
    if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
        return method(request_or_iterator, call_details)

    # Initialize Sentry span if enabled
    sentry_span = None
    if config.SENTRY.IS_ENABLED:
        try:
            import sentry_sdk

            sentry_span = sentry_sdk.start_span(
                op="grpc.client",
                description=f"gRPC client call to {call_details.method}",
            )
            sentry_span.__enter__()
        except ImportError:
            logger.debug("sentry_sdk is not installed, skipping Sentry span creation.")
        except Exception:
            logger.exception("Failed to create Sentry span for gRPC client call")

    # Handle Elastic APM trace propagation
    metadata = list(call_details.metadata or [])
    if config.ELASTIC_APM.IS_ENABLED:
        trace_parent_id = elasticapm.get_trace_parent_header()
        if trace_parent_id:
            metadata.append((TRACEPARENT_HEADER_NAME, f"{trace_parent_id}"))

    # Create new call details with updated metadata
    new_details = ClientCallDetails(
        method=call_details.method,
        timeout=call_details.timeout,
        metadata=metadata,
        credentials=call_details.credentials,
        wait_for_ready=call_details.wait_for_ready,
        compression=call_details.compression,
    )

    try:
        # Execute the gRPC method with the updated call details
        result = method(request_or_iterator, new_details)
    except Exception as e:
        # Mark Sentry span as failed and capture exception
        if sentry_span:
            sentry_span.set_status("internal_error")
            sentry_span.set_tag("error", True)
            sentry_span.set_data("exception", str(e))
        raise
    else:
        # Mark Sentry span as successful
        if sentry_span:
            sentry_span.set_status("ok")
        return result
    finally:
        # Clean up Sentry span
        if sentry_span:
            try:
                sentry_span.__exit__(None, None, None)
            except Exception:
                logger.exception("Error closing Sentry span")

archipy.helpers.interceptors.grpc.trace.client_interceptor.GrpcClientTraceInterceptor.intercept_unary_unary

intercept_unary_unary(
    continuation: Callable[
        [ClientCallDetails, _TRequest], Any
    ],
    client_call_details: ClientCallDetails,
    request: _TRequest,
) -> Any

Intercepts a unary-unary RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request Any

The request object.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
def intercept_unary_unary(
    self,
    continuation: Callable[[grpc.ClientCallDetails, _TRequest], Any],
    client_call_details: grpc.ClientCallDetails,
    request: _TRequest,
) -> Any:
    """Intercepts a unary-unary RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.ClientCallDetails): Details of the RPC call.
        request (Any): The request object.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = self.intercept(_swap_args(continuation), request, client_call_details)
    return result

archipy.helpers.interceptors.grpc.trace.client_interceptor.GrpcClientTraceInterceptor.intercept_unary_stream

intercept_unary_stream(
    continuation: Callable[
        [ClientCallDetails, _TRequest], Any
    ],
    client_call_details: ClientCallDetails,
    request: _TRequest,
) -> Any

Intercepts a unary-stream RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request Any

The request object.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
def intercept_unary_stream(
    self,
    continuation: Callable[[grpc.ClientCallDetails, _TRequest], Any],
    client_call_details: grpc.ClientCallDetails,
    request: _TRequest,
) -> Any:
    """Intercepts a unary-stream RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.ClientCallDetails): Details of the RPC call.
        request (Any): The request object.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = self.intercept(_swap_args(continuation), request, client_call_details)
    return result

archipy.helpers.interceptors.grpc.trace.client_interceptor.GrpcClientTraceInterceptor.intercept_stream_unary

intercept_stream_unary(
    continuation: Callable[
        [ClientCallDetails, Iterator[_TRequest]], Any
    ],
    client_call_details: ClientCallDetails,
    request_iterator: Iterator[_TRequest],
) -> Any

Intercepts a stream-unary RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request_iterator Iterator[Any]

The request iterator.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
def intercept_stream_unary(
    self,
    continuation: Callable[[grpc.ClientCallDetails, Iterator[_TRequest]], Any],
    client_call_details: grpc.ClientCallDetails,
    request_iterator: Iterator[_TRequest],
) -> Any:
    """Intercepts a stream-unary RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.ClientCallDetails): Details of the RPC call.
        request_iterator (Iterator[Any]): The request iterator.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = self.intercept(_swap_args(continuation), request_iterator, client_call_details)
    return result

archipy.helpers.interceptors.grpc.trace.client_interceptor.GrpcClientTraceInterceptor.intercept_stream_stream

intercept_stream_stream(
    continuation: Callable[
        [ClientCallDetails, Iterator[_TRequest]], Any
    ],
    client_call_details: ClientCallDetails,
    request_iterator: Iterator[_TRequest],
) -> Any

Intercepts a stream-stream RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request_iterator Iterator[Any]

The request iterator.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
def intercept_stream_stream(
    self,
    continuation: Callable[[grpc.ClientCallDetails, Iterator[_TRequest]], Any],
    client_call_details: grpc.ClientCallDetails,
    request_iterator: Iterator[_TRequest],
) -> Any:
    """Intercepts a stream-stream RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.ClientCallDetails): Details of the RPC call.
        request_iterator (Iterator[Any]): The request iterator.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = self.intercept(_swap_args(continuation), request_iterator, client_call_details)
    return result

archipy.helpers.interceptors.grpc.trace.client_interceptor.AsyncGrpcClientTraceInterceptor

Bases: BaseAsyncGrpcClientInterceptor

An asynchronous gRPC client interceptor for tracing requests using Elastic APM and Sentry APM.

This interceptor injects the Elastic APM trace parent header into asynchronous gRPC client requests to enable distributed tracing across services. It also creates Sentry spans for monitoring performance.

Source code in archipy/helpers/interceptors/grpc/trace/client_interceptor.py
class AsyncGrpcClientTraceInterceptor(BaseAsyncGrpcClientInterceptor):
    """An asynchronous gRPC client interceptor for tracing requests using Elastic APM and Sentry APM.

    This interceptor injects the Elastic APM trace parent header into asynchronous gRPC client requests
    to enable distributed tracing across services. It also creates Sentry spans for monitoring performance.
    """

    async def intercept(
        self,
        method: Callable,
        request_or_iterator: Any,
        call_details: grpc.aio.ClientCallDetails,
    ) -> Any:
        """Intercepts an asynchronous gRPC client call to inject the Elastic APM trace parent header and monitor with Sentry.

        Args:
            method (Callable): The asynchronous gRPC method being intercepted.
            request_or_iterator (Any): The request or request iterator.
            call_details (grpc.aio.ClientCallDetails): Details of the gRPC call.

        Returns:
            Any: The result of the intercepted gRPC method.

        Notes:
            - If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
            - Creates Sentry spans for tracing async gRPC client calls.
            - Injects Elastic APM trace parent header when available.
        """
        config = BaseConfig.global_config()

        # Skip tracing if both APM systems are disabled
        if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
            return await method(request_or_iterator, call_details)

        # Initialize Sentry span if enabled
        sentry_span = None
        if config.SENTRY.IS_ENABLED:
            try:
                import sentry_sdk

                sentry_span = sentry_sdk.start_span(
                    op="grpc.client",
                    description=f"Async gRPC client call to {call_details.method}",
                )
                sentry_span.__enter__()
            except ImportError:
                logger.debug("sentry_sdk is not installed, skipping Sentry span creation.")
            except Exception:
                logger.exception("Failed to create Sentry span for async gRPC client call")

        # Handle Elastic APM trace propagation
        metadata = list(call_details.metadata or [])
        if config.ELASTIC_APM.IS_ENABLED:
            trace_parent_id = elasticapm.get_trace_parent_header()
            if trace_parent_id:
                metadata.append((TRACEPARENT_HEADER_NAME, f"{trace_parent_id}"))

        # Create new call details with updated metadata
        new_details = AsyncClientCallDetails(
            method=call_details.method,
            timeout=call_details.timeout,
            metadata=metadata,
            credentials=call_details.credentials,
            wait_for_ready=call_details.wait_for_ready,
        )

        try:
            # Execute the async gRPC method with the updated call details
            result = await method(request_or_iterator, new_details)
        except Exception as e:
            # Mark Sentry span as failed and capture exception
            if sentry_span:
                sentry_span.set_status("internal_error")
                sentry_span.set_tag("error", True)
                sentry_span.set_data("exception", str(e))
            raise
        else:
            # Mark Sentry span as successful
            if sentry_span:
                sentry_span.set_status("ok")
            return result
        finally:
            # Clean up Sentry span
            if sentry_span:
                try:
                    sentry_span.__exit__(None, None, None)
                except Exception:
                    logger.exception("Error closing Sentry span")

archipy.helpers.interceptors.grpc.trace.client_interceptor.AsyncGrpcClientTraceInterceptor.intercept async

intercept(
    method: Callable,
    request_or_iterator: Any,
    call_details: ClientCallDetails,
) -> Any

Intercepts an asynchronous gRPC client call to inject the Elastic APM trace parent header and monitor with Sentry.

Parameters:

Name Type Description Default
method Callable

The asynchronous gRPC method being intercepted.

required
request_or_iterator Any

The request or request iterator.

required
call_details ClientCallDetails

Details of the gRPC call.

required

Returns:

Name Type Description
Any Any

The result of the intercepted gRPC method.

Notes
  • If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
  • Creates Sentry spans for tracing async gRPC client calls.
  • Injects Elastic APM trace parent header when available.
Source code in archipy/helpers/interceptors/grpc/trace/client_interceptor.py
async def intercept(
    self,
    method: Callable,
    request_or_iterator: Any,
    call_details: grpc.aio.ClientCallDetails,
) -> Any:
    """Intercepts an asynchronous gRPC client call to inject the Elastic APM trace parent header and monitor with Sentry.

    Args:
        method (Callable): The asynchronous gRPC method being intercepted.
        request_or_iterator (Any): The request or request iterator.
        call_details (grpc.aio.ClientCallDetails): Details of the gRPC call.

    Returns:
        Any: The result of the intercepted gRPC method.

    Notes:
        - If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
        - Creates Sentry spans for tracing async gRPC client calls.
        - Injects Elastic APM trace parent header when available.
    """
    config = BaseConfig.global_config()

    # Skip tracing if both APM systems are disabled
    if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
        return await method(request_or_iterator, call_details)

    # Initialize Sentry span if enabled
    sentry_span = None
    if config.SENTRY.IS_ENABLED:
        try:
            import sentry_sdk

            sentry_span = sentry_sdk.start_span(
                op="grpc.client",
                description=f"Async gRPC client call to {call_details.method}",
            )
            sentry_span.__enter__()
        except ImportError:
            logger.debug("sentry_sdk is not installed, skipping Sentry span creation.")
        except Exception:
            logger.exception("Failed to create Sentry span for async gRPC client call")

    # Handle Elastic APM trace propagation
    metadata = list(call_details.metadata or [])
    if config.ELASTIC_APM.IS_ENABLED:
        trace_parent_id = elasticapm.get_trace_parent_header()
        if trace_parent_id:
            metadata.append((TRACEPARENT_HEADER_NAME, f"{trace_parent_id}"))

    # Create new call details with updated metadata
    new_details = AsyncClientCallDetails(
        method=call_details.method,
        timeout=call_details.timeout,
        metadata=metadata,
        credentials=call_details.credentials,
        wait_for_ready=call_details.wait_for_ready,
    )

    try:
        # Execute the async gRPC method with the updated call details
        result = await method(request_or_iterator, new_details)
    except Exception as e:
        # Mark Sentry span as failed and capture exception
        if sentry_span:
            sentry_span.set_status("internal_error")
            sentry_span.set_tag("error", True)
            sentry_span.set_data("exception", str(e))
        raise
    else:
        # Mark Sentry span as successful
        if sentry_span:
            sentry_span.set_status("ok")
        return result
    finally:
        # Clean up Sentry span
        if sentry_span:
            try:
                sentry_span.__exit__(None, None, None)
            except Exception:
                logger.exception("Error closing Sentry span")

archipy.helpers.interceptors.grpc.trace.client_interceptor.AsyncGrpcClientTraceInterceptor.intercept_unary_unary async

intercept_unary_unary(
    continuation: Callable[
        [ClientCallDetails, _TRequest], Any
    ],
    client_call_details: ClientCallDetails,
    request: _TRequest,
) -> Any

Intercepts an asynchronous unary-unary RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request Any

The request object.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
async def intercept_unary_unary(
    self,
    continuation: Callable[[grpc.aio.ClientCallDetails, _TRequest], Any],
    client_call_details: grpc.aio.ClientCallDetails,
    request: _TRequest,
) -> Any:
    """Intercepts an asynchronous unary-unary RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
        request (Any): The request object.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = await self.intercept(_swap_args(continuation), request, client_call_details)
    return result

archipy.helpers.interceptors.grpc.trace.client_interceptor.AsyncGrpcClientTraceInterceptor.intercept_unary_stream async

intercept_unary_stream(
    continuation: Callable[
        [ClientCallDetails, _TRequest], Any
    ],
    client_call_details: ClientCallDetails,
    request: _TRequest,
) -> Any

Intercepts an asynchronous unary-stream RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request Any

The request object.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
async def intercept_unary_stream(
    self,
    continuation: Callable[[grpc.aio.ClientCallDetails, _TRequest], Any],
    client_call_details: grpc.aio.ClientCallDetails,
    request: _TRequest,
) -> Any:
    """Intercepts an asynchronous unary-stream RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
        request (Any): The request object.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = await self.intercept(_swap_args(continuation), request, client_call_details)
    return result

archipy.helpers.interceptors.grpc.trace.client_interceptor.AsyncGrpcClientTraceInterceptor.intercept_stream_unary async

intercept_stream_unary(
    continuation: Callable[
        [
            ClientCallDetails,
            AsyncIterable[_TRequest] | Iterable[_TRequest],
        ],
        Any,
    ],
    client_call_details: ClientCallDetails,
    request_iterator: AsyncIterable[_TRequest]
    | Iterable[_TRequest],
) -> Any

Intercepts an asynchronous stream-unary RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request_iterator Iterator[Any]

The request iterator.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
async def intercept_stream_unary(
    self,
    continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterable[_TRequest] | Iterable[_TRequest]], Any],
    client_call_details: grpc.aio.ClientCallDetails,
    request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
) -> Any:
    """Intercepts an asynchronous stream-unary RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
        request_iterator (Iterator[Any]): The request iterator.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = await self.intercept(_swap_args(continuation), request_iterator, client_call_details)
    return result

archipy.helpers.interceptors.grpc.trace.client_interceptor.AsyncGrpcClientTraceInterceptor.intercept_stream_stream async

intercept_stream_stream(
    continuation: Callable[
        [
            ClientCallDetails,
            AsyncIterable[_TRequest] | Iterable[_TRequest],
        ],
        Any,
    ],
    client_call_details: ClientCallDetails,
    request_iterator: AsyncIterable[_TRequest]
    | Iterable[_TRequest],
) -> Any

Intercepts an asynchronous stream-stream RPC call.

Parameters:

Name Type Description Default
continuation Callable

The continuation function to call.

required
client_call_details ClientCallDetails

Details of the RPC call.

required
request_iterator Iterator[Any]

The request iterator.

required

Returns:

Name Type Description
Any Any

The result of the intercepted RPC call.

Source code in archipy/helpers/interceptors/grpc/base/client_interceptor.py
async def intercept_stream_stream(
    self,
    continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterable[_TRequest] | Iterable[_TRequest]], Any],
    client_call_details: grpc.aio.ClientCallDetails,
    request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
) -> Any:
    """Intercepts an asynchronous stream-stream RPC call.

    Args:
        continuation (Callable): The continuation function to call.
        client_call_details (grpc.aio.ClientCallDetails): Details of the RPC call.
        request_iterator (Iterator[Any]): The request iterator.

    Returns:
        Any: The result of the intercepted RPC call.
    """
    result = await self.intercept(_swap_args(continuation), request_iterator, client_call_details)
    return result

options: show_root_toc_entry: false heading_level: 3

archipy.helpers.interceptors.grpc.trace.server_interceptor.logger module-attribute

logger = getLogger(__name__)

archipy.helpers.interceptors.grpc.trace.server_interceptor.GrpcServerTraceInterceptor

Bases: BaseGrpcServerInterceptor

A gRPC server interceptor for tracing requests using Elastic APM and Sentry APM.

This interceptor captures and traces gRPC server requests, enabling distributed tracing across services. It integrates with both Elastic APM and Sentry to monitor and log transactions.

Source code in archipy/helpers/interceptors/grpc/trace/server_interceptor.py
class GrpcServerTraceInterceptor(BaseGrpcServerInterceptor):
    """A gRPC server interceptor for tracing requests using Elastic APM and Sentry APM.

    This interceptor captures and traces gRPC server requests, enabling distributed tracing
    across services. It integrates with both Elastic APM and Sentry to monitor and log transactions.
    """

    def intercept(
        self,
        method: Callable,
        request: object,
        context: grpc.ServicerContext,
        method_name_model: MethodName,
    ) -> object:
        """Intercepts a gRPC server call to trace the request using Elastic APM and Sentry APM.

        Args:
            method (Callable): The gRPC method being intercepted.
            request (object): The request object passed to the method.
            context (grpc.ServicerContext): The context of the gRPC call.
            method_name_model (MethodName): The parsed method name containing package, service, and method components.

        Returns:
            object: The result of the intercepted gRPC method.

        Raises:
            Exception: If an exception occurs during the method execution, it is captured and logged.

        Notes:
            - If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
            - Creates Sentry transactions for tracing gRPC server calls.
            - Handles Elastic APM distributed tracing with trace parent headers.
        """
        try:
            config = BaseConfig.global_config()

            # Skip tracing if both APM systems are disabled
            if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
                return method(request, context)

            # Convert metadata to a dictionary for easier access
            metadata_items = list(context.invocation_metadata())
            metadata_dict: dict[str, str] = {}
            for key, value in metadata_items:
                if isinstance(value, bytes):
                    metadata_dict[key] = value.decode("utf-8", errors="ignore")
                else:
                    metadata_dict[key] = str(value)

            # Initialize Sentry transaction if enabled
            sentry_transaction = None
            if config.SENTRY.IS_ENABLED:
                try:
                    import sentry_sdk

                    # Initialize Sentry if not already done
                    current_hub = sentry_sdk.Hub.current
                    if not getattr(current_hub, "client", None):
                        sentry_sdk.init(
                            dsn=config.SENTRY.DSN,
                            debug=config.SENTRY.DEBUG,
                            release=config.SENTRY.RELEASE,
                            sample_rate=config.SENTRY.SAMPLE_RATE,
                            traces_sample_rate=config.SENTRY.TRACES_SAMPLE_RATE,
                            environment=getattr(config, "ENVIRONMENT", None),
                        )

                    sentry_transaction = sentry_sdk.start_transaction(
                        name=method_name_model.full_name,
                        op="grpc.server",
                        description=f"gRPC server call {method_name_model.full_name}",
                    )
                    sentry_transaction.__enter__()
                except ImportError:
                    logger.debug("sentry_sdk is not installed, skipping Sentry transaction creation.")
                except Exception:
                    logger.exception("Failed to create Sentry transaction for gRPC server call")

            # Handle Elastic APM if enabled
            elastic_client: Any = None
            if config.ELASTIC_APM.IS_ENABLED:
                try:
                    # Get the Elastic APM client
                    elastic_client = elasticapm.get_client()
                    if not elastic_client:
                        elastic_client = elasticapm.Client(config.ELASTIC_APM.model_dump())
                    # Check if a trace parent header is present in the metadata
                    if parent := elasticapm.trace_parent_from_headers(metadata_dict):
                        # Start a transaction linked to the distributed trace
                        elastic_client.begin_transaction(transaction_type="request", trace_parent=parent)
                    else:
                        # Start a new transaction if no trace parent header is present
                        elastic_client.begin_transaction(transaction_type="request")
                except Exception:
                    logger.exception("Failed to initialize Elastic APM transaction")
                    elastic_client = None

            try:
                # Execute the gRPC method
                result = method(request, context)
            except Exception:
                # Mark transactions as failed and capture exception
                if sentry_transaction:
                    sentry_transaction.set_status("internal_error")
                if elastic_client is not None:
                    elastic_client.end_transaction(name=method_name_model.full_name, result="failure")
                raise
            else:
                # Mark transactions as successful
                if sentry_transaction:
                    sentry_transaction.set_status("ok")
                if elastic_client is not None:
                    elastic_client.end_transaction(name=method_name_model.full_name, result="success")
                return result
            finally:
                # Clean up Sentry transaction
                if sentry_transaction:
                    try:
                        sentry_transaction.__exit__(None, None, None)
                    except Exception:
                        logger.exception("Error closing Sentry transaction")

        except Exception as exception:
            BaseUtils.capture_exception(exception)
            raise

archipy.helpers.interceptors.grpc.trace.server_interceptor.GrpcServerTraceInterceptor.intercept

intercept(
    method: Callable,
    request: object,
    context: ServicerContext,
    method_name_model: MethodName,
) -> object

Intercepts a gRPC server call to trace the request using Elastic APM and Sentry APM.

Parameters:

Name Type Description Default
method Callable

The gRPC method being intercepted.

required
request object

The request object passed to the method.

required
context ServicerContext

The context of the gRPC call.

required
method_name_model MethodName

The parsed method name containing package, service, and method components.

required

Returns:

Name Type Description
object object

The result of the intercepted gRPC method.

Raises:

Type Description
Exception

If an exception occurs during the method execution, it is captured and logged.

Notes
  • If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
  • Creates Sentry transactions for tracing gRPC server calls.
  • Handles Elastic APM distributed tracing with trace parent headers.
Source code in archipy/helpers/interceptors/grpc/trace/server_interceptor.py
def intercept(
    self,
    method: Callable,
    request: object,
    context: grpc.ServicerContext,
    method_name_model: MethodName,
) -> object:
    """Intercepts a gRPC server call to trace the request using Elastic APM and Sentry APM.

    Args:
        method (Callable): The gRPC method being intercepted.
        request (object): The request object passed to the method.
        context (grpc.ServicerContext): The context of the gRPC call.
        method_name_model (MethodName): The parsed method name containing package, service, and method components.

    Returns:
        object: The result of the intercepted gRPC method.

    Raises:
        Exception: If an exception occurs during the method execution, it is captured and logged.

    Notes:
        - If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
        - Creates Sentry transactions for tracing gRPC server calls.
        - Handles Elastic APM distributed tracing with trace parent headers.
    """
    try:
        config = BaseConfig.global_config()

        # Skip tracing if both APM systems are disabled
        if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
            return method(request, context)

        # Convert metadata to a dictionary for easier access
        metadata_items = list(context.invocation_metadata())
        metadata_dict: dict[str, str] = {}
        for key, value in metadata_items:
            if isinstance(value, bytes):
                metadata_dict[key] = value.decode("utf-8", errors="ignore")
            else:
                metadata_dict[key] = str(value)

        # Initialize Sentry transaction if enabled
        sentry_transaction = None
        if config.SENTRY.IS_ENABLED:
            try:
                import sentry_sdk

                # Initialize Sentry if not already done
                current_hub = sentry_sdk.Hub.current
                if not getattr(current_hub, "client", None):
                    sentry_sdk.init(
                        dsn=config.SENTRY.DSN,
                        debug=config.SENTRY.DEBUG,
                        release=config.SENTRY.RELEASE,
                        sample_rate=config.SENTRY.SAMPLE_RATE,
                        traces_sample_rate=config.SENTRY.TRACES_SAMPLE_RATE,
                        environment=getattr(config, "ENVIRONMENT", None),
                    )

                sentry_transaction = sentry_sdk.start_transaction(
                    name=method_name_model.full_name,
                    op="grpc.server",
                    description=f"gRPC server call {method_name_model.full_name}",
                )
                sentry_transaction.__enter__()
            except ImportError:
                logger.debug("sentry_sdk is not installed, skipping Sentry transaction creation.")
            except Exception:
                logger.exception("Failed to create Sentry transaction for gRPC server call")

        # Handle Elastic APM if enabled
        elastic_client: Any = None
        if config.ELASTIC_APM.IS_ENABLED:
            try:
                # Get the Elastic APM client
                elastic_client = elasticapm.get_client()
                if not elastic_client:
                    elastic_client = elasticapm.Client(config.ELASTIC_APM.model_dump())
                # Check if a trace parent header is present in the metadata
                if parent := elasticapm.trace_parent_from_headers(metadata_dict):
                    # Start a transaction linked to the distributed trace
                    elastic_client.begin_transaction(transaction_type="request", trace_parent=parent)
                else:
                    # Start a new transaction if no trace parent header is present
                    elastic_client.begin_transaction(transaction_type="request")
            except Exception:
                logger.exception("Failed to initialize Elastic APM transaction")
                elastic_client = None

        try:
            # Execute the gRPC method
            result = method(request, context)
        except Exception:
            # Mark transactions as failed and capture exception
            if sentry_transaction:
                sentry_transaction.set_status("internal_error")
            if elastic_client is not None:
                elastic_client.end_transaction(name=method_name_model.full_name, result="failure")
            raise
        else:
            # Mark transactions as successful
            if sentry_transaction:
                sentry_transaction.set_status("ok")
            if elastic_client is not None:
                elastic_client.end_transaction(name=method_name_model.full_name, result="success")
            return result
        finally:
            # Clean up Sentry transaction
            if sentry_transaction:
                try:
                    sentry_transaction.__exit__(None, None, None)
                except Exception:
                    logger.exception("Error closing Sentry transaction")

    except Exception as exception:
        BaseUtils.capture_exception(exception)
        raise

archipy.helpers.interceptors.grpc.trace.server_interceptor.GrpcServerTraceInterceptor.intercept_service

intercept_service(
    continuation: Callable[
        [HandlerCallDetails], RpcMethodHandler | None
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler | None

Intercepts the service call and wraps the handler with custom logic.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], RpcMethodHandler | None]

The continuation function to call.

required
handler_call_details HandlerCallDetails

Details of the handler call.

required

Returns:

Type Description
RpcMethodHandler | None

grpc.RpcMethodHandler: The wrapped RPC method handler.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler | None],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler | None:
    """Intercepts the service call and wraps the handler with custom logic.

    Args:
        continuation: The continuation function to call.
        handler_call_details: Details of the handler call.

    Returns:
        grpc.RpcMethodHandler: The wrapped RPC method handler.
    """
    next_handler = continuation(handler_call_details)
    if next_handler is None:
        return None

    handler_factory, next_handler_method = _get_factory_and_method(next_handler)

    def invoke_intercept_method(request: object, context: grpc.ServicerContext) -> object:
        """Invokes the intercepted method.

        Args:
            request (object): The request object.
            context (grpc.ServicerContext): The context of the RPC call.

        Returns:
            object: The result of the intercepted method.
        """
        method_name_model = parse_method_name(handler_call_details.method)
        return self.intercept(next_handler_method, request, context, method_name_model)

    return handler_factory(
        invoke_intercept_method,
        request_deserializer=next_handler.request_deserializer,
        response_serializer=next_handler.response_serializer,
    )

archipy.helpers.interceptors.grpc.trace.server_interceptor.AsyncGrpcServerTraceInterceptor

Bases: BaseAsyncGrpcServerInterceptor

An async gRPC server interceptor for tracing requests using Elastic APM and Sentry APM.

This interceptor captures and traces async gRPC server requests, enabling distributed tracing across services. It integrates with both Elastic APM and Sentry to monitor and log transactions.

Source code in archipy/helpers/interceptors/grpc/trace/server_interceptor.py
class AsyncGrpcServerTraceInterceptor(BaseAsyncGrpcServerInterceptor):
    """An async gRPC server interceptor for tracing requests using Elastic APM and Sentry APM.

    This interceptor captures and traces async gRPC server requests, enabling distributed tracing
    across services. It integrates with both Elastic APM and Sentry to monitor and log transactions.
    """

    async def intercept(
        self,
        method: Callable,
        request: object,
        context: grpc.aio.ServicerContext,
        method_name_model: MethodName,
    ) -> object:
        """Intercepts an async gRPC server call to trace the request using Elastic APM and Sentry APM.

        Args:
            method (Callable): The async gRPC method being intercepted.
            request (object): The request object passed to the method.
            context (grpc.aio.ServicerContext): The context of the async gRPC call.
            method_name_model (MethodName): The parsed method name containing package, service, and method components.

        Returns:
            object: The result of the intercepted gRPC method.

        Raises:
            Exception: If an exception occurs during the method execution, it is captured and logged.

        Notes:
            - If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
            - Creates Sentry transactions for tracing async gRPC server calls.
            - Handles Elastic APM distributed tracing with trace parent headers.
        """
        try:
            config = BaseConfig.global_config()

            # Skip tracing if both APM systems are disabled
            if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
                return await method(request, context)

            # Convert metadata to a dictionary for easier access
            invocation_metadata = context.invocation_metadata()
            if invocation_metadata is not None:
                metadata_items = list(invocation_metadata)
            else:
                metadata_items = []
            metadata_dict: dict[str, str] = {}
            for key, value in metadata_items:
                if isinstance(value, bytes):
                    metadata_dict[key] = value.decode("utf-8", errors="ignore")
                else:
                    metadata_dict[key] = str(value)

            # Initialize Sentry transaction if enabled
            sentry_transaction = None
            if config.SENTRY.IS_ENABLED:
                try:
                    import sentry_sdk

                    # Initialize Sentry if not already done
                    current_hub = sentry_sdk.Hub.current
                    if not getattr(current_hub, "client", None):
                        sentry_sdk.init(
                            dsn=config.SENTRY.DSN,
                            debug=config.SENTRY.DEBUG,
                            release=config.SENTRY.RELEASE,
                            sample_rate=config.SENTRY.SAMPLE_RATE,
                            traces_sample_rate=config.SENTRY.TRACES_SAMPLE_RATE,
                            environment=getattr(config, "ENVIRONMENT", None),
                        )

                    sentry_transaction = sentry_sdk.start_transaction(
                        name=method_name_model.full_name,
                        op="grpc.server",
                        description=f"Async gRPC server call {method_name_model.full_name}",
                    )
                    sentry_transaction.__enter__()
                except ImportError:
                    logger.debug("sentry_sdk is not installed, skipping Sentry transaction creation.")
                except Exception:
                    logger.exception("Failed to create Sentry transaction for async gRPC server call")

            # Handle Elastic APM if enabled
            elastic_client: Any = None
            if config.ELASTIC_APM.IS_ENABLED:
                try:
                    # Get the Elastic APM client
                    elastic_client = elasticapm.get_client()
                    if not elastic_client:
                        elastic_client = elasticapm.Client(config.ELASTIC_APM.model_dump())

                    # Check if a trace parent header is present in the metadata
                    if parent := elasticapm.trace_parent_from_headers(metadata_dict):
                        # Start a transaction linked to the distributed trace
                        elastic_client.begin_transaction(transaction_type="request", trace_parent=parent)
                    else:
                        # Start a new transaction if no trace parent header is present
                        elastic_client.begin_transaction(transaction_type="request")
                except Exception:
                    logger.exception("Failed to initialize Elastic APM transaction")
                    elastic_client = None

            try:
                # Execute the async gRPC method
                result = await method(request, context)
            except Exception:
                # Mark transactions as failed and capture exception
                if sentry_transaction:
                    sentry_transaction.set_status("internal_error")
                if elastic_client is not None:
                    elastic_client.end_transaction(name=method_name_model.full_name, result="failure")
                raise
            else:
                # Mark transactions as successful
                if sentry_transaction:
                    sentry_transaction.set_status("ok")
                if elastic_client is not None:
                    elastic_client.end_transaction(name=method_name_model.full_name, result="success")
                return result
            finally:
                # Clean up Sentry transaction
                if sentry_transaction:
                    try:
                        sentry_transaction.__exit__(None, None, None)
                    except Exception:
                        logger.exception("Error closing Sentry transaction")

        except Exception as exception:
            BaseUtils.capture_exception(exception)
            raise

archipy.helpers.interceptors.grpc.trace.server_interceptor.AsyncGrpcServerTraceInterceptor.intercept async

intercept(
    method: Callable,
    request: object,
    context: ServicerContext,
    method_name_model: MethodName,
) -> object

Intercepts an async gRPC server call to trace the request using Elastic APM and Sentry APM.

Parameters:

Name Type Description Default
method Callable

The async gRPC method being intercepted.

required
request object

The request object passed to the method.

required
context ServicerContext

The context of the async gRPC call.

required
method_name_model MethodName

The parsed method name containing package, service, and method components.

required

Returns:

Name Type Description
object object

The result of the intercepted gRPC method.

Raises:

Type Description
Exception

If an exception occurs during the method execution, it is captured and logged.

Notes
  • If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
  • Creates Sentry transactions for tracing async gRPC server calls.
  • Handles Elastic APM distributed tracing with trace parent headers.
Source code in archipy/helpers/interceptors/grpc/trace/server_interceptor.py
async def intercept(
    self,
    method: Callable,
    request: object,
    context: grpc.aio.ServicerContext,
    method_name_model: MethodName,
) -> object:
    """Intercepts an async gRPC server call to trace the request using Elastic APM and Sentry APM.

    Args:
        method (Callable): The async gRPC method being intercepted.
        request (object): The request object passed to the method.
        context (grpc.aio.ServicerContext): The context of the async gRPC call.
        method_name_model (MethodName): The parsed method name containing package, service, and method components.

    Returns:
        object: The result of the intercepted gRPC method.

    Raises:
        Exception: If an exception occurs during the method execution, it is captured and logged.

    Notes:
        - If both Elastic APM and Sentry are disabled, the interceptor passes the call through.
        - Creates Sentry transactions for tracing async gRPC server calls.
        - Handles Elastic APM distributed tracing with trace parent headers.
    """
    try:
        config = BaseConfig.global_config()

        # Skip tracing if both APM systems are disabled
        if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
            return await method(request, context)

        # Convert metadata to a dictionary for easier access
        invocation_metadata = context.invocation_metadata()
        if invocation_metadata is not None:
            metadata_items = list(invocation_metadata)
        else:
            metadata_items = []
        metadata_dict: dict[str, str] = {}
        for key, value in metadata_items:
            if isinstance(value, bytes):
                metadata_dict[key] = value.decode("utf-8", errors="ignore")
            else:
                metadata_dict[key] = str(value)

        # Initialize Sentry transaction if enabled
        sentry_transaction = None
        if config.SENTRY.IS_ENABLED:
            try:
                import sentry_sdk

                # Initialize Sentry if not already done
                current_hub = sentry_sdk.Hub.current
                if not getattr(current_hub, "client", None):
                    sentry_sdk.init(
                        dsn=config.SENTRY.DSN,
                        debug=config.SENTRY.DEBUG,
                        release=config.SENTRY.RELEASE,
                        sample_rate=config.SENTRY.SAMPLE_RATE,
                        traces_sample_rate=config.SENTRY.TRACES_SAMPLE_RATE,
                        environment=getattr(config, "ENVIRONMENT", None),
                    )

                sentry_transaction = sentry_sdk.start_transaction(
                    name=method_name_model.full_name,
                    op="grpc.server",
                    description=f"Async gRPC server call {method_name_model.full_name}",
                )
                sentry_transaction.__enter__()
            except ImportError:
                logger.debug("sentry_sdk is not installed, skipping Sentry transaction creation.")
            except Exception:
                logger.exception("Failed to create Sentry transaction for async gRPC server call")

        # Handle Elastic APM if enabled
        elastic_client: Any = None
        if config.ELASTIC_APM.IS_ENABLED:
            try:
                # Get the Elastic APM client
                elastic_client = elasticapm.get_client()
                if not elastic_client:
                    elastic_client = elasticapm.Client(config.ELASTIC_APM.model_dump())

                # Check if a trace parent header is present in the metadata
                if parent := elasticapm.trace_parent_from_headers(metadata_dict):
                    # Start a transaction linked to the distributed trace
                    elastic_client.begin_transaction(transaction_type="request", trace_parent=parent)
                else:
                    # Start a new transaction if no trace parent header is present
                    elastic_client.begin_transaction(transaction_type="request")
            except Exception:
                logger.exception("Failed to initialize Elastic APM transaction")
                elastic_client = None

        try:
            # Execute the async gRPC method
            result = await method(request, context)
        except Exception:
            # Mark transactions as failed and capture exception
            if sentry_transaction:
                sentry_transaction.set_status("internal_error")
            if elastic_client is not None:
                elastic_client.end_transaction(name=method_name_model.full_name, result="failure")
            raise
        else:
            # Mark transactions as successful
            if sentry_transaction:
                sentry_transaction.set_status("ok")
            if elastic_client is not None:
                elastic_client.end_transaction(name=method_name_model.full_name, result="success")
            return result
        finally:
            # Clean up Sentry transaction
            if sentry_transaction:
                try:
                    sentry_transaction.__exit__(None, None, None)
                except Exception:
                    logger.exception("Error closing Sentry transaction")

    except Exception as exception:
        BaseUtils.capture_exception(exception)
        raise

archipy.helpers.interceptors.grpc.trace.server_interceptor.AsyncGrpcServerTraceInterceptor.intercept_service async

intercept_service(
    continuation: Callable[
        [HandlerCallDetails], Awaitable[RpcMethodHandler]
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler

Intercepts the service call using the simplified async pattern.

For async gRPC, we don't need the complex handler wrapping that sync interceptors require. Instead, we can use a much simpler pattern where we just await the continuation and then wrap the actual method call.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]]

The continuation function to call.

required
handler_call_details HandlerCallDetails

Details of the handler call.

required

Returns:

Type Description
RpcMethodHandler

grpc.RpcMethodHandler: The wrapped RPC method handler.

Source code in archipy/helpers/interceptors/grpc/base/server_interceptor.py
async def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
    """Intercepts the service call using the simplified async pattern.

    For async gRPC, we don't need the complex handler wrapping that sync interceptors require.
    Instead, we can use a much simpler pattern where we just await the continuation and
    then wrap the actual method call.

    Args:
        continuation: The continuation function to call.
        handler_call_details: Details of the handler call.

    Returns:
        grpc.RpcMethodHandler: The wrapped RPC method handler.
    """
    next_handler = await continuation(handler_call_details)

    handler_factory, next_handler_method = _get_factory_and_method(next_handler)

    async def invoke_intercept_method(request: object, context: grpc.aio.ServicerContext) -> object:
        """Invokes the intercepted async method.

        Args:
            request (object): The request object.
            context (grpc.aio.ServicerContext): The context of the async RPC call.

        Returns:
            object: The result of the intercepted method.
        """
        method_name_model = parse_method_name(handler_call_details.method)
        return await self.intercept(next_handler_method, request, context, method_name_model)

    return handler_factory(
        invoke_intercept_method,
        request_deserializer=getattr(next_handler, "request_deserializer", None),
        response_serializer=getattr(next_handler, "response_serializer", None),
    )

options: show_root_toc_entry: false heading_level: 3