Skip to content

Kafka

The kafka adapter provides integration with Apache Kafka for producing and consuming messages in event-driven architectures. It ships five adapter classes — one for admin operations and two sync/async pairs for producing and consuming.

Class Mode Port
KafkaAdminAdapter sync KafkaAdminPort
KafkaProducerAdapter sync KafkaProducerPort
KafkaConsumerAdapter sync KafkaConsumerPort
AsyncKafkaProducerAdapter async AsyncKafkaProducerPort
AsyncKafkaConsumerAdapter async AsyncKafkaConsumerPort

Ports

Abstract port interfaces defining the Kafka adapter contracts for sync and async message production and consumption.

archipy.adapters.kafka.ports.KafkaAdminPort

Interface for Kafka admin operations.

This interface defines the contract for performing administrative operations on Kafka topics.

Source code in archipy/adapters/kafka/ports.py
class KafkaAdminPort:
    """Interface for Kafka admin operations.

    This interface defines the contract for performing administrative operations on Kafka topics.
    """

    @abstractmethod
    def create_topic(self, topic: str, num_partitions: int = 1, replication_factor: int = 1) -> None:
        """Creates a new Kafka topic.

        Args:
            topic (str): Name of the topic to create.
            num_partitions (int, optional): Number of partitions for the topic. Defaults to 1.
            replication_factor (int, optional): Replication factor for the topic. Defaults to 1.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def delete_topic(self, topics: list[str]) -> None:
        """Deletes one or more Kafka topics.

        Args:
            topics (list[str]): List of topic names to delete.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def list_topics(self, topic: str | None = None, timeout: int = 1) -> ClusterMetadata:
        """Lists Kafka topics.

        Args:
            topic (str | None, optional): Specific topic to list. If None, lists all topics.
                Defaults to None.
            timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

        Returns:
            ClusterMetadata: Metadata about the Kafka cluster and topics.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

archipy.adapters.kafka.ports.KafkaAdminPort.create_topic abstractmethod

create_topic(
    topic: str,
    num_partitions: int = 1,
    replication_factor: int = 1,
) -> None

Creates a new Kafka topic.

Parameters:

Name Type Description Default
topic str

Name of the topic to create.

required
num_partitions int

Number of partitions for the topic. Defaults to 1.

1
replication_factor int

Replication factor for the topic. Defaults to 1.

1

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def create_topic(self, topic: str, num_partitions: int = 1, replication_factor: int = 1) -> None:
    """Creates a new Kafka topic.

    Args:
        topic (str): Name of the topic to create.
        num_partitions (int, optional): Number of partitions for the topic. Defaults to 1.
        replication_factor (int, optional): Replication factor for the topic. Defaults to 1.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaAdminPort.delete_topic abstractmethod

delete_topic(topics: list[str]) -> None

Deletes one or more Kafka topics.

Parameters:

Name Type Description Default
topics list[str]

List of topic names to delete.

required

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def delete_topic(self, topics: list[str]) -> None:
    """Deletes one or more Kafka topics.

    Args:
        topics (list[str]): List of topic names to delete.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaAdminPort.list_topics abstractmethod

list_topics(
    topic: str | None = None, timeout: int = 1
) -> ClusterMetadata

Lists Kafka topics.

Parameters:

Name Type Description Default
topic str | None

Specific topic to list. If None, lists all topics. Defaults to None.

None
timeout int

Timeout in seconds for the operation. Defaults to 1.

1

Returns:

Name Type Description
ClusterMetadata ClusterMetadata

Metadata about the Kafka cluster and topics.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def list_topics(self, topic: str | None = None, timeout: int = 1) -> ClusterMetadata:
    """Lists Kafka topics.

    Args:
        topic (str | None, optional): Specific topic to list. If None, lists all topics.
            Defaults to None.
        timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

    Returns:
        ClusterMetadata: Metadata about the Kafka cluster and topics.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaConsumerPort

Interface for Kafka consumer operations.

This interface defines the contract for consuming messages from Kafka topics.

Source code in archipy/adapters/kafka/ports.py
class KafkaConsumerPort:
    """Interface for Kafka consumer operations.

    This interface defines the contract for consuming messages from Kafka topics.
    """

    @abstractmethod
    def batch_consume(self, messages_number: int, timeout: int) -> list[Message]:
        """Consumes a batch of messages from subscribed topics.

        Args:
            messages_number (int): Maximum number of messages to consume.
            timeout (int): Timeout in seconds for the operation.

        Returns:
            list[Message]: List of consumed messages.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def poll(self, timeout: int) -> Message | None:
        """Polls for a single message from subscribed topics.

        Args:
            timeout (int): Timeout in seconds for the operation.

        Returns:
            Message | None: The consumed message or None if no message was received.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def commit(self, message: Message, asynchronous: bool) -> None | list[TopicPartition]:
        """Commits the offset of a consumed message.

        Args:
            message (Message): The message whose offset should be committed.
            asynchronous (bool): Whether to commit asynchronously.

        Returns:
            None | list[TopicPartition]: None for synchronous commits, or list of committed
                partitions for asynchronous commits.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def subscribe(self, topic_list: list[str]) -> None:
        """Subscribes to a list of topics.

        Args:
            topic_list (list[str]): List of topic names to subscribe to.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def assign(self, partition_list: list[TopicPartition]) -> None:
        """Assigns specific partitions to the consumer.

        Args:
            partition_list (list[TopicPartition]): List of partitions to assign.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def close(self) -> None:
        """Closes the consumer, leaving the consumer group and committing offsets.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

archipy.adapters.kafka.ports.KafkaConsumerPort.batch_consume abstractmethod

batch_consume(
    messages_number: int, timeout: int
) -> list[Message]

Consumes a batch of messages from subscribed topics.

Parameters:

Name Type Description Default
messages_number int

Maximum number of messages to consume.

required
timeout int

Timeout in seconds for the operation.

required

Returns:

Type Description
list[Message]

list[Message]: List of consumed messages.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def batch_consume(self, messages_number: int, timeout: int) -> list[Message]:
    """Consumes a batch of messages from subscribed topics.

    Args:
        messages_number (int): Maximum number of messages to consume.
        timeout (int): Timeout in seconds for the operation.

    Returns:
        list[Message]: List of consumed messages.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaConsumerPort.poll abstractmethod

poll(timeout: int) -> Message | None

Polls for a single message from subscribed topics.

Parameters:

Name Type Description Default
timeout int

Timeout in seconds for the operation.

required

Returns:

Type Description
Message | None

Message | None: The consumed message or None if no message was received.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def poll(self, timeout: int) -> Message | None:
    """Polls for a single message from subscribed topics.

    Args:
        timeout (int): Timeout in seconds for the operation.

    Returns:
        Message | None: The consumed message or None if no message was received.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaConsumerPort.commit abstractmethod

commit(
    message: Message, asynchronous: bool
) -> None | list[TopicPartition]

Commits the offset of a consumed message.

Parameters:

Name Type Description Default
message Message

The message whose offset should be committed.

required
asynchronous bool

Whether to commit asynchronously.

required

Returns:

Type Description
None | list[TopicPartition]

None | list[TopicPartition]: None for synchronous commits, or list of committed partitions for asynchronous commits.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def commit(self, message: Message, asynchronous: bool) -> None | list[TopicPartition]:
    """Commits the offset of a consumed message.

    Args:
        message (Message): The message whose offset should be committed.
        asynchronous (bool): Whether to commit asynchronously.

    Returns:
        None | list[TopicPartition]: None for synchronous commits, or list of committed
            partitions for asynchronous commits.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaConsumerPort.subscribe abstractmethod

subscribe(topic_list: list[str]) -> None

Subscribes to a list of topics.

Parameters:

Name Type Description Default
topic_list list[str]

List of topic names to subscribe to.

required

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def subscribe(self, topic_list: list[str]) -> None:
    """Subscribes to a list of topics.

    Args:
        topic_list (list[str]): List of topic names to subscribe to.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaConsumerPort.assign abstractmethod

assign(partition_list: list[TopicPartition]) -> None

Assigns specific partitions to the consumer.

Parameters:

Name Type Description Default
partition_list list[TopicPartition]

List of partitions to assign.

required

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def assign(self, partition_list: list[TopicPartition]) -> None:
    """Assigns specific partitions to the consumer.

    Args:
        partition_list (list[TopicPartition]): List of partitions to assign.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaConsumerPort.close abstractmethod

close() -> None

Closes the consumer, leaving the consumer group and committing offsets.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def close(self) -> None:
    """Closes the consumer, leaving the consumer group and committing offsets.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaProducerPort

Interface for Kafka producer operations.

This interface defines the contract for producing messages to Kafka topics.

Source code in archipy/adapters/kafka/ports.py
class KafkaProducerPort:
    """Interface for Kafka producer operations.

    This interface defines the contract for producing messages to Kafka topics.
    """

    @abstractmethod
    def produce(self, message: str | bytes, key: str | None = None) -> None:
        """Produces a message to the configured topic.

        Args:
            message (str | bytes): The message to produce.
            key (str | None, optional): The key for the message. Defaults to None.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def flush(self, timeout: int | None) -> None:
        """Flushes any pending messages to the broker.

        Args:
            timeout (int | None): Maximum time to wait for messages to be delivered.
                If None, wait indefinitely.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def validate_healthiness(self) -> None:
        """Validates the health of the producer connection.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def list_topics(self, topic: str | None, timeout: int) -> ClusterMetadata:
        """Lists Kafka topics.

        Args:
            topic (str | None): Specific topic to list. If None, lists all topics.
            timeout (int): Timeout in seconds for the operation.

        Returns:
            ClusterMetadata: Metadata about the Kafka cluster and topics.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    def close(self) -> None:
        """Closes the producer, flushing any remaining messages.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

archipy.adapters.kafka.ports.KafkaProducerPort.produce abstractmethod

produce(
    message: str | bytes, key: str | None = None
) -> None

Produces a message to the configured topic.

Parameters:

Name Type Description Default
message str | bytes

The message to produce.

required
key str | None

The key for the message. Defaults to None.

None

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def produce(self, message: str | bytes, key: str | None = None) -> None:
    """Produces a message to the configured topic.

    Args:
        message (str | bytes): The message to produce.
        key (str | None, optional): The key for the message. Defaults to None.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaProducerPort.flush abstractmethod

flush(timeout: int | None) -> None

Flushes any pending messages to the broker.

Parameters:

Name Type Description Default
timeout int | None

Maximum time to wait for messages to be delivered. If None, wait indefinitely.

required

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def flush(self, timeout: int | None) -> None:
    """Flushes any pending messages to the broker.

    Args:
        timeout (int | None): Maximum time to wait for messages to be delivered.
            If None, wait indefinitely.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaProducerPort.validate_healthiness abstractmethod

validate_healthiness() -> None

Validates the health of the producer connection.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def validate_healthiness(self) -> None:
    """Validates the health of the producer connection.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaProducerPort.list_topics abstractmethod

list_topics(
    topic: str | None, timeout: int
) -> ClusterMetadata

Lists Kafka topics.

Parameters:

Name Type Description Default
topic str | None

Specific topic to list. If None, lists all topics.

required
timeout int

Timeout in seconds for the operation.

required

Returns:

Name Type Description
ClusterMetadata ClusterMetadata

Metadata about the Kafka cluster and topics.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def list_topics(self, topic: str | None, timeout: int) -> ClusterMetadata:
    """Lists Kafka topics.

    Args:
        topic (str | None): Specific topic to list. If None, lists all topics.
        timeout (int): Timeout in seconds for the operation.

    Returns:
        ClusterMetadata: Metadata about the Kafka cluster and topics.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.KafkaProducerPort.close abstractmethod

close() -> None

Closes the producer, flushing any remaining messages.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
def close(self) -> None:
    """Closes the producer, flushing any remaining messages.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaConsumerPort

Async interface for Kafka consumer operations.

This interface defines the async contract for consuming messages from Kafka topics.

Source code in archipy/adapters/kafka/ports.py
class AsyncKafkaConsumerPort:
    """Async interface for Kafka consumer operations.

    This interface defines the async contract for consuming messages from Kafka topics.
    """

    @abstractmethod
    async def batch_consume(self, messages_number: int, timeout: int) -> list[Message]:  # noqa: ASYNC109
        """Consumes a batch of messages from subscribed topics.

        Args:
            messages_number (int): Maximum number of messages to consume.
            timeout (int): Timeout in seconds for the operation.

        Returns:
            list[Message]: List of consumed messages.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def poll(self, timeout: int) -> Message | None:  # noqa: ASYNC109
        """Polls for a single message from subscribed topics.

        Args:
            timeout (int): Timeout in seconds for the operation.

        Returns:
            Message | None: The consumed message or None if no message was received.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def commit(self, message: Message, asynchronous: bool) -> None | list[TopicPartition]:
        """Commits the offset of a consumed message.

        Args:
            message (Message): The message whose offset should be committed.
            asynchronous (bool): Whether to commit asynchronously.

        Returns:
            None | list[TopicPartition]: None for synchronous commits, or list of committed
                partitions for asynchronous commits.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def subscribe(self, topic_list: list[str]) -> None:
        """Subscribes to a list of topics.

        Args:
            topic_list (list[str]): List of topic names to subscribe to.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def assign(self, partition_list: list[TopicPartition]) -> None:
        """Assigns specific partitions to the consumer.

        Args:
            partition_list (list[TopicPartition]): List of partitions to assign.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def close(self) -> None:
        """Closes the consumer, leaving the consumer group and committing offsets.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaConsumerPort.batch_consume abstractmethod async

batch_consume(
    messages_number: int, timeout: int
) -> list[Message]

Consumes a batch of messages from subscribed topics.

Parameters:

Name Type Description Default
messages_number int

Maximum number of messages to consume.

required
timeout int

Timeout in seconds for the operation.

required

Returns:

Type Description
list[Message]

list[Message]: List of consumed messages.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def batch_consume(self, messages_number: int, timeout: int) -> list[Message]:  # noqa: ASYNC109
    """Consumes a batch of messages from subscribed topics.

    Args:
        messages_number (int): Maximum number of messages to consume.
        timeout (int): Timeout in seconds for the operation.

    Returns:
        list[Message]: List of consumed messages.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaConsumerPort.poll abstractmethod async

poll(timeout: int) -> Message | None

Polls for a single message from subscribed topics.

Parameters:

Name Type Description Default
timeout int

Timeout in seconds for the operation.

required

Returns:

Type Description
Message | None

Message | None: The consumed message or None if no message was received.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def poll(self, timeout: int) -> Message | None:  # noqa: ASYNC109
    """Polls for a single message from subscribed topics.

    Args:
        timeout (int): Timeout in seconds for the operation.

    Returns:
        Message | None: The consumed message or None if no message was received.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaConsumerPort.commit abstractmethod async

commit(
    message: Message, asynchronous: bool
) -> None | list[TopicPartition]

Commits the offset of a consumed message.

Parameters:

Name Type Description Default
message Message

The message whose offset should be committed.

required
asynchronous bool

Whether to commit asynchronously.

required

Returns:

Type Description
None | list[TopicPartition]

None | list[TopicPartition]: None for synchronous commits, or list of committed partitions for asynchronous commits.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def commit(self, message: Message, asynchronous: bool) -> None | list[TopicPartition]:
    """Commits the offset of a consumed message.

    Args:
        message (Message): The message whose offset should be committed.
        asynchronous (bool): Whether to commit asynchronously.

    Returns:
        None | list[TopicPartition]: None for synchronous commits, or list of committed
            partitions for asynchronous commits.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaConsumerPort.subscribe abstractmethod async

subscribe(topic_list: list[str]) -> None

Subscribes to a list of topics.

Parameters:

Name Type Description Default
topic_list list[str]

List of topic names to subscribe to.

required

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def subscribe(self, topic_list: list[str]) -> None:
    """Subscribes to a list of topics.

    Args:
        topic_list (list[str]): List of topic names to subscribe to.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaConsumerPort.assign abstractmethod async

assign(partition_list: list[TopicPartition]) -> None

Assigns specific partitions to the consumer.

Parameters:

Name Type Description Default
partition_list list[TopicPartition]

List of partitions to assign.

required

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def assign(self, partition_list: list[TopicPartition]) -> None:
    """Assigns specific partitions to the consumer.

    Args:
        partition_list (list[TopicPartition]): List of partitions to assign.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaConsumerPort.close abstractmethod async

close() -> None

Closes the consumer, leaving the consumer group and committing offsets.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def close(self) -> None:
    """Closes the consumer, leaving the consumer group and committing offsets.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaProducerPort

Async interface for Kafka producer operations.

This interface defines the async contract for producing messages to Kafka topics.

Source code in archipy/adapters/kafka/ports.py
class AsyncKafkaProducerPort:
    """Async interface for Kafka producer operations.

    This interface defines the async contract for producing messages to Kafka topics.
    """

    @abstractmethod
    async def produce(self, message: str | bytes, key: str | None = None) -> None:
        """Produces a message to the configured topic.

        Args:
            message (str | bytes): The message to produce.
            key (str | None, optional): The key for the message. Defaults to None.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def flush(self, timeout: int | None) -> None:  # noqa: ASYNC109
        """Flushes any pending messages to the broker.

        Args:
            timeout (int | None): Maximum time to wait for messages to be delivered.
                If None, wait indefinitely.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def validate_healthiness(self) -> None:
        """Validates the health of the producer connection.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def list_topics(self, topic: str | None, timeout: int) -> ClusterMetadata:  # noqa: ASYNC109
        """Lists Kafka topics.

        Args:
            topic (str | None): Specific topic to list. If None, lists all topics.
            timeout (int): Timeout in seconds for the operation.

        Returns:
            ClusterMetadata: Metadata about the Kafka cluster and topics.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

    @abstractmethod
    async def close(self) -> None:
        """Closes the producer, flushing any remaining messages.

        Raises:
            NotImplementedError: If the method is not implemented by the concrete class.
        """
        raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaProducerPort.produce abstractmethod async

produce(
    message: str | bytes, key: str | None = None
) -> None

Produces a message to the configured topic.

Parameters:

Name Type Description Default
message str | bytes

The message to produce.

required
key str | None

The key for the message. Defaults to None.

None

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def produce(self, message: str | bytes, key: str | None = None) -> None:
    """Produces a message to the configured topic.

    Args:
        message (str | bytes): The message to produce.
        key (str | None, optional): The key for the message. Defaults to None.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaProducerPort.flush abstractmethod async

flush(timeout: int | None) -> None

Flushes any pending messages to the broker.

Parameters:

Name Type Description Default
timeout int | None

Maximum time to wait for messages to be delivered. If None, wait indefinitely.

required

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def flush(self, timeout: int | None) -> None:  # noqa: ASYNC109
    """Flushes any pending messages to the broker.

    Args:
        timeout (int | None): Maximum time to wait for messages to be delivered.
            If None, wait indefinitely.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaProducerPort.validate_healthiness abstractmethod async

validate_healthiness() -> None

Validates the health of the producer connection.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def validate_healthiness(self) -> None:
    """Validates the health of the producer connection.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaProducerPort.list_topics abstractmethod async

list_topics(
    topic: str | None, timeout: int
) -> ClusterMetadata

Lists Kafka topics.

Parameters:

Name Type Description Default
topic str | None

Specific topic to list. If None, lists all topics.

required
timeout int

Timeout in seconds for the operation.

required

Returns:

Name Type Description
ClusterMetadata ClusterMetadata

Metadata about the Kafka cluster and topics.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def list_topics(self, topic: str | None, timeout: int) -> ClusterMetadata:  # noqa: ASYNC109
    """Lists Kafka topics.

    Args:
        topic (str | None): Specific topic to list. If None, lists all topics.
        timeout (int): Timeout in seconds for the operation.

    Returns:
        ClusterMetadata: Metadata about the Kafka cluster and topics.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

archipy.adapters.kafka.ports.AsyncKafkaProducerPort.close abstractmethod async

close() -> None

Closes the producer, flushing any remaining messages.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the concrete class.

Source code in archipy/adapters/kafka/ports.py
@abstractmethod
async def close(self) -> None:
    """Closes the producer, flushing any remaining messages.

    Raises:
        NotImplementedError: If the method is not implemented by the concrete class.
    """
    raise NotImplementedError

options: show_root_toc_entry: false heading_level: 3

Adapters

Concrete Kafka adapter implementations for both sync and async producer/consumer patterns, built on confluent_kafka and confluent_kafka.aio.

archipy.adapters.kafka.adapters.logger module-attribute

logger = getLogger(__name__)

archipy.adapters.kafka.adapters.KafkaExceptionHandlerMixin

Mixin class to handle Kafka exceptions in a consistent way.

Source code in archipy/adapters/kafka/adapters.py
class KafkaExceptionHandlerMixin:
    """Mixin class to handle Kafka exceptions in a consistent way."""

    @classmethod
    def _handle_kafka_exception(cls, exception: Exception, operation: str) -> NoReturn:
        """Handle Kafka exceptions and map them to appropriate application errors.

        Args:
            exception: The original exception
            operation: The name of the operation that failed

        Raises:
            Various application-specific errors based on the exception type/content
        """
        error_msg = str(exception).lower()

        # Configuration errors
        if "configuration" in error_msg:
            raise ConfigurationError(operation="kafka") from exception

        # Invalid argument errors
        if "invalid" in error_msg:
            raise InvalidArgumentError(argument_name=operation) from exception

        # Timeout errors
        if "timeout" in error_msg:
            # Extract timeout value if available
            timeout = None
            if hasattr(exception, "args") and len(exception.args) > 1:
                with contextlib.suppress(IndexError, ValueError):
                    timeout = int(exception.args[1])
            raise ConnectionTimeoutError(service="Kafka", timeout=timeout) from exception

        # Network/connectivity errors
        if "network" in error_msg:
            raise NetworkError(service="Kafka") from exception

        # Service availability errors
        if "unavailable" in error_msg or "connection" in error_msg:
            raise ServiceUnavailableError(service="Kafka") from exception
        raise InternalError(additional_data={"operation": operation}) from exception

    @classmethod
    def _handle_producer_exception(cls, exception: Exception, operation: str) -> NoReturn:
        """Handle producer-specific exceptions.

        Args:
            exception: The original exception
            operation: The name of the operation that failed

        Raises:
            ResourceExhaustedError: If the producer queue is full
            Various other errors from _handle_kafka_exception
        """
        # Producer-specific error handling
        if isinstance(exception, BufferError):
            raise ResourceExhaustedError(resource_type="producer_queue") from exception

        # Fall back to general Kafka error handling
        cls._handle_kafka_exception(exception, operation)

archipy.adapters.kafka.adapters.KafkaAdminAdapter

Bases: KafkaAdminPort, KafkaExceptionHandlerMixin

Synchronous Kafka admin adapter.

This adapter provides synchronous administrative operations for Kafka topics. It implements the KafkaAdminPort interface and handles topic creation, deletion, and listing operations.

Source code in archipy/adapters/kafka/adapters.py
class KafkaAdminAdapter(KafkaAdminPort, KafkaExceptionHandlerMixin):
    """Synchronous Kafka admin adapter.

    This adapter provides synchronous administrative operations for Kafka topics.
    It implements the KafkaAdminPort interface and handles topic creation, deletion,
    and listing operations.
    """

    def __init__(self, kafka_configs: KafkaConfig | None = None) -> None:
        """Initializes the admin adapter with Kafka configuration.

        Args:
            kafka_configs (KafkaConfig | None, optional): Kafka configuration. If None,
                uses global config. Defaults to None.

        Raises:
            ConfigurationError: If there is an error in the Kafka configuration.
            InternalError: If there is an error initializing the admin client.
        """
        configs: KafkaConfig = kafka_configs or BaseConfig.global_config().KAFKA
        try:
            broker_list_csv = ",".join(configs.BROKERS_LIST)
            config: dict[str, str | int | float] = {"bootstrap.servers": broker_list_csv}
            if configs.USERNAME and configs.PASSWORD and configs.SSL_CA_FILE:
                config["sasl.username"] = configs.USERNAME
                config["sasl.password"] = configs.PASSWORD.get_secret_value()
                config["security.protocol"] = configs.SECURITY_PROTOCOL
                config["sasl.mechanism"] = configs.SASL_MECHANISM or ""
                config["ssl.ca.location"] = configs.SSL_CA_FILE
                config["ssl.certificate.location"] = configs.SSL_CERT_FILE or ""
                config["ssl.key.location"] = configs.SSL_KEY_FILE or ""
                config["ssl.endpoint.identification.algorithm"] = "none"
            self.adapter: AdminClient = AdminClient(config)
        except Exception as e:
            self._handle_kafka_exception(e, "KafkaAdmin_init")

    @override
    def create_topic(self, topic: str, num_partitions: int = 1, replication_factor: int = 1) -> None:
        """Creates a new Kafka topic.

        Args:
            topic (str): Name of the topic to create.
            num_partitions (int, optional): Number of partitions for the topic. Defaults to 1.
            replication_factor (int, optional): Replication factor for the topic. Defaults to 1.

        Raises:
            InvalidArgumentError: If the topic name or partition configuration is invalid.
            ServiceUnavailableError: If the Kafka service is unavailable during topic creation.
            InternalError: If there is an internal error creating the topic.
        """
        try:
            new_topic = NewTopic(topic, num_partitions, replication_factor)
            self.adapter.create_topics([new_topic])
        except Exception as e:
            self._handle_kafka_exception(e, "create_topic")

    @override
    def delete_topic(self, topics: list[str]) -> None:
        """Deletes one or more Kafka topics.

        Args:
            topics (list[str]): List of topic names to delete.

        Raises:
            InvalidArgumentError: If the topics list is invalid.
            ServiceUnavailableError: If the Kafka service is unavailable during topic deletion.
            InternalError: If there is an internal error deleting the topics.
        """
        try:
            self.adapter.delete_topics(topics)
            logger.debug("Deleted topics: %s", topics)
        except Exception as e:
            self._handle_kafka_exception(e, "delete_topic")

    @override
    def list_topics(self, topic: str | None = None, timeout: int = 1) -> ClusterMetadata:
        """Lists Kafka topics.

        Args:
            topic (str | None, optional): Specific topic to list. If None, lists all topics.
                Defaults to None.
            timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

        Returns:
            ClusterMetadata: Metadata about the Kafka cluster and topics.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If the Kafka service is unavailable.
            UnavailableError: If there is an unknown issue accessing Kafka.
        """
        try:
            result = self.adapter.list_topics(topic=topic, timeout=timeout)
        except Exception as e:
            self._handle_kafka_exception(e, "list_topics")
            raise  # Exception handler always raises, but type checker needs this to be explicit
        else:
            # result is ClusterMetadata from confluent_kafka, compatible with port return type
            typed_result: ClusterMetadata = result
            return typed_result

archipy.adapters.kafka.adapters.KafkaAdminAdapter.adapter instance-attribute

adapter: AdminClient = AdminClient(config)

archipy.adapters.kafka.adapters.KafkaAdminAdapter.create_topic

create_topic(
    topic: str,
    num_partitions: int = 1,
    replication_factor: int = 1,
) -> None

Creates a new Kafka topic.

Parameters:

Name Type Description Default
topic str

Name of the topic to create.

required
num_partitions int

Number of partitions for the topic. Defaults to 1.

1
replication_factor int

Replication factor for the topic. Defaults to 1.

1

Raises:

Type Description
InvalidArgumentError

If the topic name or partition configuration is invalid.

ServiceUnavailableError

If the Kafka service is unavailable during topic creation.

InternalError

If there is an internal error creating the topic.

Source code in archipy/adapters/kafka/adapters.py
@override
def create_topic(self, topic: str, num_partitions: int = 1, replication_factor: int = 1) -> None:
    """Creates a new Kafka topic.

    Args:
        topic (str): Name of the topic to create.
        num_partitions (int, optional): Number of partitions for the topic. Defaults to 1.
        replication_factor (int, optional): Replication factor for the topic. Defaults to 1.

    Raises:
        InvalidArgumentError: If the topic name or partition configuration is invalid.
        ServiceUnavailableError: If the Kafka service is unavailable during topic creation.
        InternalError: If there is an internal error creating the topic.
    """
    try:
        new_topic = NewTopic(topic, num_partitions, replication_factor)
        self.adapter.create_topics([new_topic])
    except Exception as e:
        self._handle_kafka_exception(e, "create_topic")

archipy.adapters.kafka.adapters.KafkaAdminAdapter.delete_topic

delete_topic(topics: list[str]) -> None

Deletes one or more Kafka topics.

Parameters:

Name Type Description Default
topics list[str]

List of topic names to delete.

required

Raises:

Type Description
InvalidArgumentError

If the topics list is invalid.

ServiceUnavailableError

If the Kafka service is unavailable during topic deletion.

InternalError

If there is an internal error deleting the topics.

Source code in archipy/adapters/kafka/adapters.py
@override
def delete_topic(self, topics: list[str]) -> None:
    """Deletes one or more Kafka topics.

    Args:
        topics (list[str]): List of topic names to delete.

    Raises:
        InvalidArgumentError: If the topics list is invalid.
        ServiceUnavailableError: If the Kafka service is unavailable during topic deletion.
        InternalError: If there is an internal error deleting the topics.
    """
    try:
        self.adapter.delete_topics(topics)
        logger.debug("Deleted topics: %s", topics)
    except Exception as e:
        self._handle_kafka_exception(e, "delete_topic")

archipy.adapters.kafka.adapters.KafkaAdminAdapter.list_topics

list_topics(
    topic: str | None = None, timeout: int = 1
) -> ClusterMetadata

Lists Kafka topics.

Parameters:

Name Type Description Default
topic str | None

Specific topic to list. If None, lists all topics. Defaults to None.

None
timeout int

Timeout in seconds for the operation. Defaults to 1.

1

Returns:

Name Type Description
ClusterMetadata ClusterMetadata

Metadata about the Kafka cluster and topics.

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If the Kafka service is unavailable.

UnavailableError

If there is an unknown issue accessing Kafka.

Source code in archipy/adapters/kafka/adapters.py
@override
def list_topics(self, topic: str | None = None, timeout: int = 1) -> ClusterMetadata:
    """Lists Kafka topics.

    Args:
        topic (str | None, optional): Specific topic to list. If None, lists all topics.
            Defaults to None.
        timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

    Returns:
        ClusterMetadata: Metadata about the Kafka cluster and topics.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If the Kafka service is unavailable.
        UnavailableError: If there is an unknown issue accessing Kafka.
    """
    try:
        result = self.adapter.list_topics(topic=topic, timeout=timeout)
    except Exception as e:
        self._handle_kafka_exception(e, "list_topics")
        raise  # Exception handler always raises, but type checker needs this to be explicit
    else:
        # result is ClusterMetadata from confluent_kafka, compatible with port return type
        typed_result: ClusterMetadata = result
        return typed_result

archipy.adapters.kafka.adapters.KafkaConsumerAdapter

Bases: KafkaConsumerPort, KafkaExceptionHandlerMixin

Synchronous Kafka consumer adapter.

This adapter provides synchronous message consumption from Kafka topics. It implements the KafkaConsumerPort interface and handles message polling, batch consumption, and offset management.

Source code in archipy/adapters/kafka/adapters.py
class KafkaConsumerAdapter(KafkaConsumerPort, KafkaExceptionHandlerMixin):
    """Synchronous Kafka consumer adapter.

    This adapter provides synchronous message consumption from Kafka topics.
    It implements the KafkaConsumerPort interface and handles message polling,
    batch consumption, and offset management.
    """

    def __init__(
        self,
        group_id: str,
        topic_list: list[str] | None = None,
        partition_list: list[TopicPartition] | None = None,
        kafka_configs: KafkaConfig | None = None,
    ) -> None:
        """Initializes the consumer adapter with Kafka configuration and subscription.

        Args:
            group_id (str): Consumer group ID.
            topic_list (list[str] | None, optional): List of topics to subscribe to.
                Defaults to None.
            partition_list (list[TopicPartition] | None, optional): List of partitions
                to assign. Defaults to None.
            kafka_configs (KafkaConfig | None, optional): Kafka configuration. If None,
                uses global config. Defaults to None.

        Raises:
            InvalidArgumentError: If both topic_list and partition_list are provided or
                neither is provided.
            InternalError: If there is an error initializing the consumer.
        """
        configs: KafkaConfig = kafka_configs or BaseConfig.global_config().KAFKA
        self._adapter: Consumer = self._get_adapter(group_id, configs)
        if topic_list and not partition_list:
            self.subscribe(topic_list)
        elif not topic_list and partition_list:
            self.assign(partition_list)
        else:
            logger.error("Invalid topic or partition list")
            raise InvalidArgumentError(
                argument_name="topic_list or partition_list",
                additional_data={"reason": "Exactly one of topic_list or partition_list must be provided"},
            )

    @classmethod
    def _get_adapter(cls, group_id: str, configs: KafkaConfig) -> Consumer:
        """Creates and configures a Kafka Consumer instance.

        Args:
            group_id (str): Consumer group ID.
            configs (KafkaConfig): Kafka configuration.

        Returns:
            Consumer: Configured Kafka Consumer instance.

        Raises:
            ConfigurationError: If there is an error in the Kafka configuration.
            InternalError: If there is an error creating the consumer.
        """
        try:
            broker_list_csv = ",".join(configs.BROKERS_LIST)
            config: dict[str, str | int | float] = {
                "bootstrap.servers": broker_list_csv,
                "group.id": group_id,
                "session.timeout.ms": configs.SESSION_TIMEOUT_MS,
                "auto.offset.reset": configs.AUTO_OFFSET_RESET,
                "enable.auto.commit": configs.ENABLE_AUTO_COMMIT,
                "fetch.min.bytes": configs.FETCH_MIN_BYTES,
                "heartbeat.interval.ms": configs.HEARTBEAT_INTERVAL_MS,
                "isolation.level": configs.ISOLATION_LEVEL,
                "max.poll.interval.ms": configs.MAX_POLL_INTERVAL_MS,
                "partition.assignment.strategy": configs.PARTITION_ASSIGNMENT_STRATEGY,
                "fetch.max.bytes": configs.FETCH_MAX_BYTES,
                "max.partition.fetch.bytes": configs.MAX_PARTITION_FETCH_BYTES,
            }
            if configs.USERNAME and configs.PASSWORD and configs.SSL_CA_FILE:
                config["sasl.username"] = configs.USERNAME
                config["sasl.password"] = configs.PASSWORD.get_secret_value()
                config["security.protocol"] = configs.SECURITY_PROTOCOL
                config["sasl.mechanism"] = configs.SASL_MECHANISM or ""
                config["ssl.ca.location"] = configs.SSL_CA_FILE
                config["ssl.certificate.location"] = configs.SSL_CERT_FILE or ""
                config["ssl.key.location"] = configs.SSL_KEY_FILE or ""
                config["ssl.endpoint.identification.algorithm"] = "none"
            consumer = Consumer(config)
        except Exception as e:
            cls._handle_kafka_exception(e, "KafkaConsumer_init")
        else:
            return consumer

    @override
    def batch_consume(self, messages_number: int = 500, timeout: int = 1) -> list[Message]:
        """Consumes a batch of messages from subscribed topics.

        Args:
            messages_number (int, optional): Maximum number of messages to consume.
                Defaults to 500.
            timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

        Returns:
            list[Message]: List of consumed messages.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error consuming messages.
        """
        try:
            result_list: list[Message] = []
            messages: list[Message] = self._adapter.consume(num_messages=messages_number, timeout=timeout)
            for message in messages:
                if message.error():
                    logger.error("Consumer error: %s", message.error())
                    continue
                logger.debug("Message consumed: %s", message)
                message.set_value(message.value())
                result_list.append(message)
        except Exception as e:
            self._handle_kafka_exception(e, "batch_consume")
            raise  # Exception handler always raises, but type checker needs this to be explicit
        else:
            # result_list is list[Message] from confluent_kafka, compatible with port return type
            return result_list

    @override
    def poll(self, timeout: int = 1) -> Message | None:
        """Polls for a single message from subscribed topics.

        Args:
            timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

        Returns:
            Message | None: The consumed message or None if no message was received.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error polling for messages.
        """
        try:
            message: Message | None = self._adapter.poll(timeout)
            if message is None:
                logger.debug("No message received")
                return None
            if message.error():
                logger.error("Consumer error: %s", message.error())
                return None
            logger.debug("Message consumed: %s", message)
            message.set_value(message.value())
        except Exception as e:
            self._handle_kafka_exception(e, "poll")
        else:
            return message

    @override
    def commit(self, message: Message, asynchronous: bool = True) -> None | list[TopicPartition]:
        """Commits the offset for a message.

        Args:
            message (Message): The message to commit.
            asynchronous (bool, optional): Whether to commit asynchronously. Defaults to True.

        Returns:
            None | list[TopicPartition]: None for async commits, list of TopicPartition for sync commits.

        Raises:
            InvalidArgumentError: If the message is invalid.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error committing the offset.
        """
        try:
            if asynchronous:
                self._adapter.commit(message=message, asynchronous=True)
                result = None
            else:
                result = self._adapter.commit(message=message, asynchronous=False)
        except Exception as e:
            self._handle_kafka_exception(e, "commit")
        else:
            return result

    @override
    def subscribe(self, topic_list: list[str]) -> None:
        """Subscribes to a list of topics.

        Args:
            topic_list (list[str]): List of topics to subscribe to.

        Raises:
            InvalidArgumentError: If the topic list is invalid.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error subscribing to topics.
        """
        try:
            self._adapter.subscribe(topic_list)
        except Exception as e:
            self._handle_kafka_exception(e, "subscribe")

    @override
    def assign(self, partition_list: list[TopicPartition]) -> None:
        """Assigns the consumer to a list of topic partitions.

        Args:
            partition_list (list[TopicPartition]): List of partitions to assign.

        Raises:
            InvalidArgumentError: If the partition list is invalid.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error assigning partitions.
        """
        try:
            self._adapter.assign(partition_list)
        except Exception as e:
            self._handle_kafka_exception(e, "assign")

    @override
    def close(self) -> None:
        """Closes the consumer, leaving the consumer group and committing offsets.

        Raises:
            ServiceUnavailableError: If Kafka is unavailable during close.
            InternalError: If there is an error closing the consumer.
        """
        try:
            self._adapter.close()
            logger.debug("Consumer closed")
        except Exception as e:
            self._handle_kafka_exception(e, "close")

archipy.adapters.kafka.adapters.KafkaConsumerAdapter.batch_consume

batch_consume(
    messages_number: int = 500, timeout: int = 1
) -> list[Message]

Consumes a batch of messages from subscribed topics.

Parameters:

Name Type Description Default
messages_number int

Maximum number of messages to consume. Defaults to 500.

500
timeout int

Timeout in seconds for the operation. Defaults to 1.

1

Returns:

Type Description
list[Message]

list[Message]: List of consumed messages.

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error consuming messages.

Source code in archipy/adapters/kafka/adapters.py
@override
def batch_consume(self, messages_number: int = 500, timeout: int = 1) -> list[Message]:
    """Consumes a batch of messages from subscribed topics.

    Args:
        messages_number (int, optional): Maximum number of messages to consume.
            Defaults to 500.
        timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

    Returns:
        list[Message]: List of consumed messages.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error consuming messages.
    """
    try:
        result_list: list[Message] = []
        messages: list[Message] = self._adapter.consume(num_messages=messages_number, timeout=timeout)
        for message in messages:
            if message.error():
                logger.error("Consumer error: %s", message.error())
                continue
            logger.debug("Message consumed: %s", message)
            message.set_value(message.value())
            result_list.append(message)
    except Exception as e:
        self._handle_kafka_exception(e, "batch_consume")
        raise  # Exception handler always raises, but type checker needs this to be explicit
    else:
        # result_list is list[Message] from confluent_kafka, compatible with port return type
        return result_list

archipy.adapters.kafka.adapters.KafkaConsumerAdapter.poll

poll(timeout: int = 1) -> Message | None

Polls for a single message from subscribed topics.

Parameters:

Name Type Description Default
timeout int

Timeout in seconds for the operation. Defaults to 1.

1

Returns:

Type Description
Message | None

Message | None: The consumed message or None if no message was received.

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error polling for messages.

Source code in archipy/adapters/kafka/adapters.py
@override
def poll(self, timeout: int = 1) -> Message | None:
    """Polls for a single message from subscribed topics.

    Args:
        timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

    Returns:
        Message | None: The consumed message or None if no message was received.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error polling for messages.
    """
    try:
        message: Message | None = self._adapter.poll(timeout)
        if message is None:
            logger.debug("No message received")
            return None
        if message.error():
            logger.error("Consumer error: %s", message.error())
            return None
        logger.debug("Message consumed: %s", message)
        message.set_value(message.value())
    except Exception as e:
        self._handle_kafka_exception(e, "poll")
    else:
        return message

archipy.adapters.kafka.adapters.KafkaConsumerAdapter.commit

commit(
    message: Message, asynchronous: bool = True
) -> None | list[TopicPartition]

Commits the offset for a message.

Parameters:

Name Type Description Default
message Message

The message to commit.

required
asynchronous bool

Whether to commit asynchronously. Defaults to True.

True

Returns:

Type Description
None | list[TopicPartition]

None | list[TopicPartition]: None for async commits, list of TopicPartition for sync commits.

Raises:

Type Description
InvalidArgumentError

If the message is invalid.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error committing the offset.

Source code in archipy/adapters/kafka/adapters.py
@override
def commit(self, message: Message, asynchronous: bool = True) -> None | list[TopicPartition]:
    """Commits the offset for a message.

    Args:
        message (Message): The message to commit.
        asynchronous (bool, optional): Whether to commit asynchronously. Defaults to True.

    Returns:
        None | list[TopicPartition]: None for async commits, list of TopicPartition for sync commits.

    Raises:
        InvalidArgumentError: If the message is invalid.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error committing the offset.
    """
    try:
        if asynchronous:
            self._adapter.commit(message=message, asynchronous=True)
            result = None
        else:
            result = self._adapter.commit(message=message, asynchronous=False)
    except Exception as e:
        self._handle_kafka_exception(e, "commit")
    else:
        return result

archipy.adapters.kafka.adapters.KafkaConsumerAdapter.subscribe

subscribe(topic_list: list[str]) -> None

Subscribes to a list of topics.

Parameters:

Name Type Description Default
topic_list list[str]

List of topics to subscribe to.

required

Raises:

Type Description
InvalidArgumentError

If the topic list is invalid.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error subscribing to topics.

Source code in archipy/adapters/kafka/adapters.py
@override
def subscribe(self, topic_list: list[str]) -> None:
    """Subscribes to a list of topics.

    Args:
        topic_list (list[str]): List of topics to subscribe to.

    Raises:
        InvalidArgumentError: If the topic list is invalid.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error subscribing to topics.
    """
    try:
        self._adapter.subscribe(topic_list)
    except Exception as e:
        self._handle_kafka_exception(e, "subscribe")

archipy.adapters.kafka.adapters.KafkaConsumerAdapter.assign

assign(partition_list: list[TopicPartition]) -> None

Assigns the consumer to a list of topic partitions.

Parameters:

Name Type Description Default
partition_list list[TopicPartition]

List of partitions to assign.

required

Raises:

Type Description
InvalidArgumentError

If the partition list is invalid.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error assigning partitions.

Source code in archipy/adapters/kafka/adapters.py
@override
def assign(self, partition_list: list[TopicPartition]) -> None:
    """Assigns the consumer to a list of topic partitions.

    Args:
        partition_list (list[TopicPartition]): List of partitions to assign.

    Raises:
        InvalidArgumentError: If the partition list is invalid.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error assigning partitions.
    """
    try:
        self._adapter.assign(partition_list)
    except Exception as e:
        self._handle_kafka_exception(e, "assign")

archipy.adapters.kafka.adapters.KafkaConsumerAdapter.close

close() -> None

Closes the consumer, leaving the consumer group and committing offsets.

Raises:

Type Description
ServiceUnavailableError

If Kafka is unavailable during close.

InternalError

If there is an error closing the consumer.

Source code in archipy/adapters/kafka/adapters.py
@override
def close(self) -> None:
    """Closes the consumer, leaving the consumer group and committing offsets.

    Raises:
        ServiceUnavailableError: If Kafka is unavailable during close.
        InternalError: If there is an error closing the consumer.
    """
    try:
        self._adapter.close()
        logger.debug("Consumer closed")
    except Exception as e:
        self._handle_kafka_exception(e, "close")

archipy.adapters.kafka.adapters.KafkaProducerAdapter

Bases: KafkaProducerPort, KafkaExceptionHandlerMixin

Synchronous Kafka producer adapter.

This adapter provides synchronous message production to Kafka topics. It implements the KafkaProducerPort interface and handles message production.

Source code in archipy/adapters/kafka/adapters.py
class KafkaProducerAdapter(KafkaProducerPort, KafkaExceptionHandlerMixin):
    """Synchronous Kafka producer adapter.

    This adapter provides synchronous message production to Kafka topics.
    It implements the KafkaProducerPort interface and handles message production.
    """

    def __init__(self, topic_name: str, kafka_configs: KafkaConfig | None = None) -> None:
        """Initializes the producer adapter with Kafka configuration.

        Args:
            topic_name (str): Default topic name to produce messages to.
            kafka_configs (KafkaConfig | None, optional): Kafka configuration. If None,
                uses global config. Defaults to None.

        Raises:
            ConfigurationError: If there is an error in the Kafka configuration.
            InternalError: If there is an error initializing the producer.
        """
        self._topic_name = topic_name
        configs: KafkaConfig = kafka_configs or BaseConfig.global_config().KAFKA
        self._adapter: Producer = self._get_adapter(configs)

    @classmethod
    def _get_adapter(cls, configs: KafkaConfig) -> Producer:
        """Creates and configures a Kafka Producer instance.

        Args:
            configs (KafkaConfig): Kafka configuration.

        Returns:
            Producer: Configured Kafka Producer instance.

        Raises:
            ConfigurationError: If there is an error in the Kafka configuration.
            InternalError: If there is an error creating the producer.
        """
        try:
            broker_list_csv = ",".join(configs.BROKERS_LIST)
            config: dict[str, str | int | float] = {
                "bootstrap.servers": broker_list_csv,
                "linger.ms": configs.LINGER_MS,
                "batch.size": configs.BATCH_SIZE,
                "acks": configs.ACKS,
                "request.timeout.ms": configs.REQUEST_TIMEOUT_MS,
                "delivery.timeout.ms": configs.DELIVERY_TIMEOUT_MS,
                "compression.type": configs.COMPRESSION_TYPE or "none",
                "max.in.flight.requests.per.connection": configs.MAX_IN_FLIGHT_REQUESTS,
                "retries": configs.RETRIES,
                "enable.idempotence": configs.ENABLE_IDEMPOTENCE,
                "queue.buffering.max.messages": configs.QUEUE_BUFFERING_MAX_MESSAGES,
                "statistics.interval.ms": configs.STATISTICS_INTERVAL_MS,
            }
            if configs.TRANSACTIONAL_ID:
                config["transactional.id"] = configs.TRANSACTIONAL_ID
            if configs.USERNAME and configs.PASSWORD and configs.SSL_CA_FILE:
                config["sasl.username"] = configs.USERNAME
                config["sasl.password"] = configs.PASSWORD.get_secret_value()
                config["security.protocol"] = configs.SECURITY_PROTOCOL
                config["sasl.mechanism"] = configs.SASL_MECHANISM or ""
                config["ssl.ca.location"] = configs.SSL_CA_FILE
                config["ssl.certificate.location"] = configs.SSL_CERT_FILE or ""
                config["ssl.key.location"] = configs.SSL_KEY_FILE or ""
                config["ssl.endpoint.identification.algorithm"] = "none"
            producer = Producer(config)
        except Exception as e:
            cls._handle_kafka_exception(e, "KafkaProducer_init")
        else:
            return producer

    @staticmethod
    def _pre_process_message(message: str | bytes) -> bytes:
        """Pre-processes a message to ensure it's in the correct format.

        Args:
            message (str | bytes): The message to pre-process.

        Returns:
            bytes: The pre-processed message as bytes.
        """
        if isinstance(message, str):
            return message.encode("utf-8")
        return message

    @staticmethod
    def _delivery_callback(error: KafkaError | None, message: Message) -> None:
        """Callback for message delivery confirmation.

        Args:
            error (KafkaError | None): Error that occurred during delivery, or None if successful.
            message (Message): The delivered message.
        """
        if error:
            logger.error("Message delivery failed: %s: %s", error, message.value())
        else:
            logger.debug(
                "Message delivered to %s [%d] at offset %d",
                message.topic(),
                message.partition(),
                message.offset(),
            )

    @override
    def produce(self, message: str | bytes, key: str | None = None) -> None:
        """Produces a message to the configured topic.

        Args:
            message (str | bytes): The message to produce.
            key (str | None, optional): The key for the message. Defaults to None.

        Raises:
            NetworkError: If there is a network error producing the message.
            ResourceExhaustedError: If the producer queue is full.
            InternalError: If there is an error producing the message.
        """
        try:
            processed_message = self._pre_process_message(message)
            # Handle None key - convert to empty bytes if None
            processed_key = self._pre_process_message(key) if key is not None else b""
            self._adapter.produce(
                topic=self._topic_name,
                value=processed_message,
                callback=self._delivery_callback,
                key=processed_key,
            )
        except Exception as e:
            self._handle_producer_exception(e, "produce")

    @override
    def flush(self, timeout: int | None = None) -> None:
        """Flushes the producer queue.

        Args:
            timeout (int | None, optional): Timeout in seconds for the operation. Defaults to None.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error flushing the queue.
        """
        try:
            remaining_messages = self._adapter.flush(timeout=timeout if timeout is not None else -1)
            if remaining_messages > 0:
                logger.warning("%d messages left in the queue after flush", remaining_messages)
        except Exception as e:
            self._handle_kafka_exception(e, "flush")

    @override
    def validate_healthiness(self) -> None:
        """Validates the health of the Kafka connection.

        Raises:
            UnavailableError: If the Kafka service is unavailable.
        """
        try:
            self.list_topics(timeout=1)
        except Exception as e:
            raise UnavailableError(resource_type="Kafka") from e

    @override
    def list_topics(self, topic: str | None = None, timeout: int = 1) -> ClusterMetadata:
        """Lists Kafka topics.

        Args:
            topic (str | None, optional): Specific topic to list. If None, lists all topics.
                Defaults to None.
            timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

        Returns:
            ClusterMetadata: Metadata about the Kafka cluster and topics.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If the Kafka service is unavailable.
            UnavailableError: If there is an unknown issue accessing Kafka.
        """
        try:
            result = self._adapter.list_topics(topic=topic, timeout=timeout)
        except Exception as e:
            self._handle_kafka_exception(e, "list_topics")
            raise  # Exception handler always raises, but type checker needs this to be explicit
        else:
            # result is ClusterMetadata from confluent_kafka, compatible with port return type
            typed_result: ClusterMetadata = result
            return typed_result

    @override
    def close(self) -> None:
        """Closes the producer, flushing any remaining messages.

        Raises:
            ConnectionTimeoutError: If the flush times out.
            ServiceUnavailableError: If Kafka is unavailable during close.
            InternalError: If there is an error closing the producer.
        """
        try:
            self._adapter.flush()
            logger.debug("Producer flushed and closed")
        except Exception as e:
            self._handle_kafka_exception(e, "close")

archipy.adapters.kafka.adapters.KafkaProducerAdapter.produce

produce(
    message: str | bytes, key: str | None = None
) -> None

Produces a message to the configured topic.

Parameters:

Name Type Description Default
message str | bytes

The message to produce.

required
key str | None

The key for the message. Defaults to None.

None

Raises:

Type Description
NetworkError

If there is a network error producing the message.

ResourceExhaustedError

If the producer queue is full.

InternalError

If there is an error producing the message.

Source code in archipy/adapters/kafka/adapters.py
@override
def produce(self, message: str | bytes, key: str | None = None) -> None:
    """Produces a message to the configured topic.

    Args:
        message (str | bytes): The message to produce.
        key (str | None, optional): The key for the message. Defaults to None.

    Raises:
        NetworkError: If there is a network error producing the message.
        ResourceExhaustedError: If the producer queue is full.
        InternalError: If there is an error producing the message.
    """
    try:
        processed_message = self._pre_process_message(message)
        # Handle None key - convert to empty bytes if None
        processed_key = self._pre_process_message(key) if key is not None else b""
        self._adapter.produce(
            topic=self._topic_name,
            value=processed_message,
            callback=self._delivery_callback,
            key=processed_key,
        )
    except Exception as e:
        self._handle_producer_exception(e, "produce")

archipy.adapters.kafka.adapters.KafkaProducerAdapter.flush

flush(timeout: int | None = None) -> None

Flushes the producer queue.

Parameters:

Name Type Description Default
timeout int | None

Timeout in seconds for the operation. Defaults to None.

None

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error flushing the queue.

Source code in archipy/adapters/kafka/adapters.py
@override
def flush(self, timeout: int | None = None) -> None:
    """Flushes the producer queue.

    Args:
        timeout (int | None, optional): Timeout in seconds for the operation. Defaults to None.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error flushing the queue.
    """
    try:
        remaining_messages = self._adapter.flush(timeout=timeout if timeout is not None else -1)
        if remaining_messages > 0:
            logger.warning("%d messages left in the queue after flush", remaining_messages)
    except Exception as e:
        self._handle_kafka_exception(e, "flush")

archipy.adapters.kafka.adapters.KafkaProducerAdapter.validate_healthiness

validate_healthiness() -> None

Validates the health of the Kafka connection.

Raises:

Type Description
UnavailableError

If the Kafka service is unavailable.

Source code in archipy/adapters/kafka/adapters.py
@override
def validate_healthiness(self) -> None:
    """Validates the health of the Kafka connection.

    Raises:
        UnavailableError: If the Kafka service is unavailable.
    """
    try:
        self.list_topics(timeout=1)
    except Exception as e:
        raise UnavailableError(resource_type="Kafka") from e

archipy.adapters.kafka.adapters.KafkaProducerAdapter.list_topics

list_topics(
    topic: str | None = None, timeout: int = 1
) -> ClusterMetadata

Lists Kafka topics.

Parameters:

Name Type Description Default
topic str | None

Specific topic to list. If None, lists all topics. Defaults to None.

None
timeout int

Timeout in seconds for the operation. Defaults to 1.

1

Returns:

Name Type Description
ClusterMetadata ClusterMetadata

Metadata about the Kafka cluster and topics.

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If the Kafka service is unavailable.

UnavailableError

If there is an unknown issue accessing Kafka.

Source code in archipy/adapters/kafka/adapters.py
@override
def list_topics(self, topic: str | None = None, timeout: int = 1) -> ClusterMetadata:
    """Lists Kafka topics.

    Args:
        topic (str | None, optional): Specific topic to list. If None, lists all topics.
            Defaults to None.
        timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

    Returns:
        ClusterMetadata: Metadata about the Kafka cluster and topics.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If the Kafka service is unavailable.
        UnavailableError: If there is an unknown issue accessing Kafka.
    """
    try:
        result = self._adapter.list_topics(topic=topic, timeout=timeout)
    except Exception as e:
        self._handle_kafka_exception(e, "list_topics")
        raise  # Exception handler always raises, but type checker needs this to be explicit
    else:
        # result is ClusterMetadata from confluent_kafka, compatible with port return type
        typed_result: ClusterMetadata = result
        return typed_result

archipy.adapters.kafka.adapters.KafkaProducerAdapter.close

close() -> None

Closes the producer, flushing any remaining messages.

Raises:

Type Description
ConnectionTimeoutError

If the flush times out.

ServiceUnavailableError

If Kafka is unavailable during close.

InternalError

If there is an error closing the producer.

Source code in archipy/adapters/kafka/adapters.py
@override
def close(self) -> None:
    """Closes the producer, flushing any remaining messages.

    Raises:
        ConnectionTimeoutError: If the flush times out.
        ServiceUnavailableError: If Kafka is unavailable during close.
        InternalError: If there is an error closing the producer.
    """
    try:
        self._adapter.flush()
        logger.debug("Producer flushed and closed")
    except Exception as e:
        self._handle_kafka_exception(e, "close")

archipy.adapters.kafka.adapters.AsyncKafkaProducerAdapter

Bases: AsyncKafkaProducerPort, KafkaExceptionHandlerMixin

Async Kafka producer adapter.

This adapter provides async message production to Kafka topics using AIOProducer. It implements the AsyncKafkaProducerPort interface.

Source code in archipy/adapters/kafka/adapters.py
class AsyncKafkaProducerAdapter(AsyncKafkaProducerPort, KafkaExceptionHandlerMixin):
    """Async Kafka producer adapter.

    This adapter provides async message production to Kafka topics using AIOProducer.
    It implements the AsyncKafkaProducerPort interface.
    """

    def __init__(self, topic_name: str, kafka_configs: KafkaConfig | None = None) -> None:
        """Initializes the async producer adapter with Kafka configuration.

        Args:
            topic_name (str): Default topic name to produce messages to.
            kafka_configs (KafkaConfig | None, optional): Kafka configuration. If None,
                uses global config. Defaults to None.

        Raises:
            ConfigurationError: If there is an error in the Kafka configuration.
            InternalError: If there is an error initializing the producer.
        """
        self._topic_name = topic_name
        configs: KafkaConfig = kafka_configs or BaseConfig.global_config().KAFKA
        self._configs = configs
        self._adapter: AIOProducer | None = None

    def _build_producer_conf(self, configs: KafkaConfig) -> dict[str, str | int | float]:
        """Builds the producer configuration dictionary.

        Args:
            configs (KafkaConfig): Kafka configuration.

        Returns:
            dict[str, str | int | float]: Producer configuration dictionary.
        """
        broker_list_csv = ",".join(configs.BROKERS_LIST)
        config: dict[str, str | int | float] = {
            "bootstrap.servers": broker_list_csv,
            "linger.ms": configs.LINGER_MS,
            "batch.size": configs.BATCH_SIZE,
            "acks": configs.ACKS,
            "request.timeout.ms": configs.REQUEST_TIMEOUT_MS,
            "delivery.timeout.ms": configs.DELIVERY_TIMEOUT_MS,
            "compression.type": configs.COMPRESSION_TYPE or "none",
            "max.in.flight.requests.per.connection": configs.MAX_IN_FLIGHT_REQUESTS,
            "retries": configs.RETRIES,
            "enable.idempotence": configs.ENABLE_IDEMPOTENCE,
            "queue.buffering.max.messages": configs.QUEUE_BUFFERING_MAX_MESSAGES,
            "statistics.interval.ms": configs.STATISTICS_INTERVAL_MS,
        }
        if configs.TRANSACTIONAL_ID:
            config["transactional.id"] = configs.TRANSACTIONAL_ID
        if configs.USERNAME and configs.PASSWORD and configs.SSL_CA_FILE:
            config["sasl.username"] = configs.USERNAME
            config["sasl.password"] = configs.PASSWORD.get_secret_value()
            config["security.protocol"] = configs.SECURITY_PROTOCOL
            config["sasl.mechanism"] = configs.SASL_MECHANISM or ""
            config["ssl.ca.location"] = configs.SSL_CA_FILE
            config["ssl.certificate.location"] = configs.SSL_CERT_FILE or ""
            config["ssl.key.location"] = configs.SSL_KEY_FILE or ""
            config["ssl.endpoint.identification.algorithm"] = "none"
        return config

    async def _get_adapter(self) -> AIOProducer:
        """Returns the AIOProducer, creating it lazily on first async call.

        Returns:
            AIOProducer: The configured async producer instance.

        Raises:
            ConfigurationError: If there is an error in the Kafka configuration.
            InternalError: If there is an error creating the producer.
        """
        if self._adapter is None:
            try:
                config = self._build_producer_conf(self._configs)
                self._adapter = AIOProducer(
                    config,
                    max_workers=self._configs.PRODUCER_MAX_WORKERS,
                    batch_size=self._configs.PRODUCER_BATCH_SIZE,
                    buffer_timeout=self._configs.PRODUCER_BUFFER_TIMEOUT,
                )
            except Exception as e:
                self._handle_kafka_exception(e, "AsyncKafkaProducer_init")
        return self._adapter

    @staticmethod
    def _pre_process_message(message: str | bytes) -> bytes:
        """Pre-processes a message to ensure it's in the correct format.

        Args:
            message (str | bytes): The message to pre-process.

        Returns:
            bytes: The pre-processed message as bytes.
        """
        if isinstance(message, str):
            return message.encode("utf-8")
        return message

    @override
    async def produce(self, message: str | bytes, key: str | None = None) -> None:
        """Produces a message to the configured topic asynchronously.

        Args:
            message (str | bytes): The message to produce.
            key (str | None, optional): The key for the message. Defaults to None.

        Raises:
            NetworkError: If there is a network error producing the message.
            ResourceExhaustedError: If the producer queue is full.
            InternalError: If there is an error producing the message.
        """
        try:
            adapter = await self._get_adapter()
            processed_message = self._pre_process_message(message)
            processed_key = self._pre_process_message(key) if key is not None else b""
            await adapter.produce(
                topic=self._topic_name,
                value=processed_message,
                key=processed_key,
            )
        except Exception as e:
            self._handle_producer_exception(e, "produce")

    @override
    async def flush(self, timeout: int | None = None) -> None:  # noqa: ASYNC109
        """Flushes the producer queue asynchronously.

        Args:
            timeout (int | None, optional): Timeout in seconds for the operation. Defaults to None.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error flushing the queue.
        """
        try:
            adapter = await self._get_adapter()
            await adapter.flush()
        except Exception as e:
            self._handle_kafka_exception(e, "flush")

    @override
    async def validate_healthiness(self) -> None:
        """Validates the health of the async Kafka producer connection.

        Raises:
            UnavailableError: If the Kafka service is unavailable.
        """
        try:
            await self.list_topics(timeout=1)
        except Exception as e:
            raise UnavailableError(resource_type="Kafka") from e

    @override
    async def list_topics(self, topic: str | None = None, timeout: int = 1) -> ClusterMetadata:  # noqa: ASYNC109
        """Lists Kafka topics asynchronously.

        Args:
            topic (str | None, optional): Specific topic to list. If None, lists all topics.
                Defaults to None.
            timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

        Returns:
            ClusterMetadata: Metadata about the Kafka cluster and topics.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If the Kafka service is unavailable.
            UnavailableError: If there is an unknown issue accessing Kafka.
        """
        try:
            adapter = await self._get_adapter()
            result = await adapter.list_topics(topic=topic, timeout=timeout)
        except Exception as e:
            self._handle_kafka_exception(e, "list_topics")
            raise
        else:
            typed_result: ClusterMetadata = result
            return typed_result

    @override
    async def close(self) -> None:
        """Closes the async producer, flushing any remaining messages.

        Raises:
            ConnectionTimeoutError: If the flush times out.
            ServiceUnavailableError: If Kafka is unavailable during close.
            InternalError: If there is an error closing the producer.
        """
        try:
            if self._adapter is not None:
                await self._adapter.close()
                self._adapter = None
                logger.debug("Async producer closed")
        except Exception as e:
            self._handle_kafka_exception(e, "close")

archipy.adapters.kafka.adapters.AsyncKafkaProducerAdapter.produce async

produce(
    message: str | bytes, key: str | None = None
) -> None

Produces a message to the configured topic asynchronously.

Parameters:

Name Type Description Default
message str | bytes

The message to produce.

required
key str | None

The key for the message. Defaults to None.

None

Raises:

Type Description
NetworkError

If there is a network error producing the message.

ResourceExhaustedError

If the producer queue is full.

InternalError

If there is an error producing the message.

Source code in archipy/adapters/kafka/adapters.py
@override
async def produce(self, message: str | bytes, key: str | None = None) -> None:
    """Produces a message to the configured topic asynchronously.

    Args:
        message (str | bytes): The message to produce.
        key (str | None, optional): The key for the message. Defaults to None.

    Raises:
        NetworkError: If there is a network error producing the message.
        ResourceExhaustedError: If the producer queue is full.
        InternalError: If there is an error producing the message.
    """
    try:
        adapter = await self._get_adapter()
        processed_message = self._pre_process_message(message)
        processed_key = self._pre_process_message(key) if key is not None else b""
        await adapter.produce(
            topic=self._topic_name,
            value=processed_message,
            key=processed_key,
        )
    except Exception as e:
        self._handle_producer_exception(e, "produce")

archipy.adapters.kafka.adapters.AsyncKafkaProducerAdapter.flush async

flush(timeout: int | None = None) -> None

Flushes the producer queue asynchronously.

Parameters:

Name Type Description Default
timeout int | None

Timeout in seconds for the operation. Defaults to None.

None

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error flushing the queue.

Source code in archipy/adapters/kafka/adapters.py
@override
async def flush(self, timeout: int | None = None) -> None:  # noqa: ASYNC109
    """Flushes the producer queue asynchronously.

    Args:
        timeout (int | None, optional): Timeout in seconds for the operation. Defaults to None.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error flushing the queue.
    """
    try:
        adapter = await self._get_adapter()
        await adapter.flush()
    except Exception as e:
        self._handle_kafka_exception(e, "flush")

archipy.adapters.kafka.adapters.AsyncKafkaProducerAdapter.validate_healthiness async

validate_healthiness() -> None

Validates the health of the async Kafka producer connection.

Raises:

Type Description
UnavailableError

If the Kafka service is unavailable.

Source code in archipy/adapters/kafka/adapters.py
@override
async def validate_healthiness(self) -> None:
    """Validates the health of the async Kafka producer connection.

    Raises:
        UnavailableError: If the Kafka service is unavailable.
    """
    try:
        await self.list_topics(timeout=1)
    except Exception as e:
        raise UnavailableError(resource_type="Kafka") from e

archipy.adapters.kafka.adapters.AsyncKafkaProducerAdapter.list_topics async

list_topics(
    topic: str | None = None, timeout: int = 1
) -> ClusterMetadata

Lists Kafka topics asynchronously.

Parameters:

Name Type Description Default
topic str | None

Specific topic to list. If None, lists all topics. Defaults to None.

None
timeout int

Timeout in seconds for the operation. Defaults to 1.

1

Returns:

Name Type Description
ClusterMetadata ClusterMetadata

Metadata about the Kafka cluster and topics.

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If the Kafka service is unavailable.

UnavailableError

If there is an unknown issue accessing Kafka.

Source code in archipy/adapters/kafka/adapters.py
@override
async def list_topics(self, topic: str | None = None, timeout: int = 1) -> ClusterMetadata:  # noqa: ASYNC109
    """Lists Kafka topics asynchronously.

    Args:
        topic (str | None, optional): Specific topic to list. If None, lists all topics.
            Defaults to None.
        timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

    Returns:
        ClusterMetadata: Metadata about the Kafka cluster and topics.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If the Kafka service is unavailable.
        UnavailableError: If there is an unknown issue accessing Kafka.
    """
    try:
        adapter = await self._get_adapter()
        result = await adapter.list_topics(topic=topic, timeout=timeout)
    except Exception as e:
        self._handle_kafka_exception(e, "list_topics")
        raise
    else:
        typed_result: ClusterMetadata = result
        return typed_result

archipy.adapters.kafka.adapters.AsyncKafkaProducerAdapter.close async

close() -> None

Closes the async producer, flushing any remaining messages.

Raises:

Type Description
ConnectionTimeoutError

If the flush times out.

ServiceUnavailableError

If Kafka is unavailable during close.

InternalError

If there is an error closing the producer.

Source code in archipy/adapters/kafka/adapters.py
@override
async def close(self) -> None:
    """Closes the async producer, flushing any remaining messages.

    Raises:
        ConnectionTimeoutError: If the flush times out.
        ServiceUnavailableError: If Kafka is unavailable during close.
        InternalError: If there is an error closing the producer.
    """
    try:
        if self._adapter is not None:
            await self._adapter.close()
            self._adapter = None
            logger.debug("Async producer closed")
    except Exception as e:
        self._handle_kafka_exception(e, "close")

archipy.adapters.kafka.adapters.AsyncKafkaConsumerAdapter

Bases: AsyncKafkaConsumerPort, KafkaExceptionHandlerMixin

Async Kafka consumer adapter.

This adapter provides async message consumption from Kafka topics using AIOConsumer. It implements the AsyncKafkaConsumerPort interface.

Source code in archipy/adapters/kafka/adapters.py
class AsyncKafkaConsumerAdapter(AsyncKafkaConsumerPort, KafkaExceptionHandlerMixin):
    """Async Kafka consumer adapter.

    This adapter provides async message consumption from Kafka topics using AIOConsumer.
    It implements the AsyncKafkaConsumerPort interface.
    """

    def __init__(
        self,
        group_id: str,
        topic_list: list[str] | None = None,
        partition_list: list[TopicPartition] | None = None,
        kafka_configs: KafkaConfig | None = None,
    ) -> None:
        """Initializes the async consumer adapter with Kafka configuration and subscription.

        Args:
            group_id (str): Consumer group ID.
            topic_list (list[str] | None, optional): List of topics to subscribe to.
                Defaults to None.
            partition_list (list[TopicPartition] | None, optional): List of partitions
                to assign. Defaults to None.
            kafka_configs (KafkaConfig | None, optional): Kafka configuration. If None,
                uses global config. Defaults to None.

        Raises:
            InvalidArgumentError: If both topic_list and partition_list are provided or
                neither is provided.
            InternalError: If there is an error initializing the consumer.
        """
        configs: KafkaConfig = kafka_configs or BaseConfig.global_config().KAFKA
        self._configs = configs
        self._group_id = group_id
        self._topic_list = topic_list
        self._partition_list = partition_list
        self._adapter: AIOConsumer | None = None

        if not (topic_list or partition_list) or (topic_list and partition_list):
            logger.error("Invalid topic or partition list")
            raise InvalidArgumentError(
                argument_name="topic_list or partition_list",
                additional_data={"reason": "Exactly one of topic_list or partition_list must be provided"},
            )

    def _build_consumer_conf(self, configs: KafkaConfig) -> dict[str, str | int | float]:
        """Builds the consumer configuration dictionary.

        Args:
            configs (KafkaConfig): Kafka configuration.

        Returns:
            dict[str, str | int | float]: Consumer configuration dictionary.
        """
        broker_list_csv = ",".join(configs.BROKERS_LIST)
        config: dict[str, str | int | float] = {
            "bootstrap.servers": broker_list_csv,
            "group.id": self._group_id,
            "session.timeout.ms": configs.SESSION_TIMEOUT_MS,
            "auto.offset.reset": configs.AUTO_OFFSET_RESET,
            "enable.auto.commit": configs.ENABLE_AUTO_COMMIT,
            "fetch.min.bytes": configs.FETCH_MIN_BYTES,
            "heartbeat.interval.ms": configs.HEARTBEAT_INTERVAL_MS,
            "isolation.level": configs.ISOLATION_LEVEL,
            "max.poll.interval.ms": configs.MAX_POLL_INTERVAL_MS,
            "partition.assignment.strategy": configs.PARTITION_ASSIGNMENT_STRATEGY,
            "fetch.max.bytes": configs.FETCH_MAX_BYTES,
            "max.partition.fetch.bytes": configs.MAX_PARTITION_FETCH_BYTES,
        }
        if configs.USERNAME and configs.PASSWORD and configs.SSL_CA_FILE:
            config["sasl.username"] = configs.USERNAME
            config["sasl.password"] = configs.PASSWORD.get_secret_value()
            config["security.protocol"] = configs.SECURITY_PROTOCOL
            config["sasl.mechanism"] = configs.SASL_MECHANISM or ""
            config["ssl.ca.location"] = configs.SSL_CA_FILE
            config["ssl.certificate.location"] = configs.SSL_CERT_FILE or ""
            config["ssl.key.location"] = configs.SSL_KEY_FILE or ""
            config["ssl.endpoint.identification.algorithm"] = "none"
        return config

    async def _get_adapter(self) -> AIOConsumer:
        """Returns the AIOConsumer, creating and subscribing it lazily on first async call.

        Returns:
            AIOConsumer: The configured async consumer instance.

        Raises:
            ConfigurationError: If there is an error in the Kafka configuration.
            InternalError: If there is an error creating the consumer.
        """
        if self._adapter is None:
            try:
                config = self._build_consumer_conf(self._configs)
                self._adapter = AIOConsumer(config, max_workers=self._configs.CONSUMER_MAX_WORKERS)
                if self._topic_list:
                    await self._adapter.subscribe(self._topic_list)
                elif self._partition_list:
                    await self._adapter.assign(self._partition_list)
            except Exception as e:
                self._handle_kafka_exception(e, "AsyncKafkaConsumer_init")
        return self._adapter

    @override
    async def batch_consume(self, messages_number: int = 500, timeout: int = 1) -> list[Message]:  # noqa: ASYNC109
        """Consumes a batch of messages from subscribed topics asynchronously.

        Args:
            messages_number (int, optional): Maximum number of messages to consume.
                Defaults to 500.
            timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

        Returns:
            list[Message]: List of consumed messages.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error consuming messages.
        """
        try:
            adapter = await self._get_adapter()
            result_list: list[Message] = []
            messages: list[Message] = await adapter.consume(num_messages=messages_number, timeout=timeout)
            for message in messages:
                if message.error():
                    logger.error("Async consumer error: %s", message.error())
                    continue
                logger.debug("Async message consumed: %s", message)
                message.set_value(message.value())
                result_list.append(message)
        except Exception as e:
            self._handle_kafka_exception(e, "batch_consume")
            raise
        else:
            return result_list

    @override
    async def poll(self, timeout: int = 1) -> Message | None:  # noqa: ASYNC109
        """Polls for a single message from subscribed topics asynchronously.

        Args:
            timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

        Returns:
            Message | None: The consumed message or None if no message was received.

        Raises:
            ConnectionTimeoutError: If the operation times out.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error polling for messages.
        """
        try:
            adapter = await self._get_adapter()
            message: Message | None = await adapter.poll(timeout)
            if message is None:
                logger.debug("No async message received")
                return None
            if message.error():
                logger.error("Async consumer error: %s", message.error())
                return None
            logger.debug("Async message consumed: %s", message)
            message.set_value(message.value())
        except Exception as e:
            self._handle_kafka_exception(e, "poll")
        else:
            return message

    @override
    async def commit(self, message: Message, asynchronous: bool = True) -> None | list[TopicPartition]:
        """Commits the offset for a message asynchronously.

        Args:
            message (Message): The message to commit.
            asynchronous (bool, optional): Whether to commit asynchronously. Defaults to True.

        Returns:
            None | list[TopicPartition]: None for async commits, list of TopicPartition for sync commits.

        Raises:
            InvalidArgumentError: If the message is invalid.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error committing the offset.
        """
        try:
            adapter = await self._get_adapter()
            result = await adapter.commit(message=message, asynchronous=asynchronous)
        except Exception as e:
            self._handle_kafka_exception(e, "commit")
        else:
            return result

    @override
    async def subscribe(self, topic_list: list[str]) -> None:
        """Subscribes to a list of topics asynchronously.

        Args:
            topic_list (list[str]): List of topics to subscribe to.

        Raises:
            InvalidArgumentError: If the topic list is invalid.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error subscribing to topics.
        """
        try:
            adapter = await self._get_adapter()
            await adapter.subscribe(topic_list)
        except Exception as e:
            self._handle_kafka_exception(e, "subscribe")

    @override
    async def assign(self, partition_list: list[TopicPartition]) -> None:
        """Assigns the async consumer to a list of topic partitions.

        Args:
            partition_list (list[TopicPartition]): List of partitions to assign.

        Raises:
            InvalidArgumentError: If the partition list is invalid.
            ServiceUnavailableError: If Kafka is unavailable.
            InternalError: If there is an error assigning partitions.
        """
        try:
            adapter = await self._get_adapter()
            await adapter.assign(partition_list)
        except Exception as e:
            self._handle_kafka_exception(e, "assign")

    @override
    async def close(self) -> None:
        """Closes the async consumer, leaving the consumer group and committing offsets.

        Raises:
            ServiceUnavailableError: If Kafka is unavailable during close.
            InternalError: If there is an error closing the consumer.
        """
        try:
            if self._adapter is not None:
                await self._adapter.close()
                self._adapter = None
                logger.debug("Async consumer closed")
        except Exception as e:
            self._handle_kafka_exception(e, "close")

archipy.adapters.kafka.adapters.AsyncKafkaConsumerAdapter.batch_consume async

batch_consume(
    messages_number: int = 500, timeout: int = 1
) -> list[Message]

Consumes a batch of messages from subscribed topics asynchronously.

Parameters:

Name Type Description Default
messages_number int

Maximum number of messages to consume. Defaults to 500.

500
timeout int

Timeout in seconds for the operation. Defaults to 1.

1

Returns:

Type Description
list[Message]

list[Message]: List of consumed messages.

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error consuming messages.

Source code in archipy/adapters/kafka/adapters.py
@override
async def batch_consume(self, messages_number: int = 500, timeout: int = 1) -> list[Message]:  # noqa: ASYNC109
    """Consumes a batch of messages from subscribed topics asynchronously.

    Args:
        messages_number (int, optional): Maximum number of messages to consume.
            Defaults to 500.
        timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

    Returns:
        list[Message]: List of consumed messages.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error consuming messages.
    """
    try:
        adapter = await self._get_adapter()
        result_list: list[Message] = []
        messages: list[Message] = await adapter.consume(num_messages=messages_number, timeout=timeout)
        for message in messages:
            if message.error():
                logger.error("Async consumer error: %s", message.error())
                continue
            logger.debug("Async message consumed: %s", message)
            message.set_value(message.value())
            result_list.append(message)
    except Exception as e:
        self._handle_kafka_exception(e, "batch_consume")
        raise
    else:
        return result_list

archipy.adapters.kafka.adapters.AsyncKafkaConsumerAdapter.poll async

poll(timeout: int = 1) -> Message | None

Polls for a single message from subscribed topics asynchronously.

Parameters:

Name Type Description Default
timeout int

Timeout in seconds for the operation. Defaults to 1.

1

Returns:

Type Description
Message | None

Message | None: The consumed message or None if no message was received.

Raises:

Type Description
ConnectionTimeoutError

If the operation times out.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error polling for messages.

Source code in archipy/adapters/kafka/adapters.py
@override
async def poll(self, timeout: int = 1) -> Message | None:  # noqa: ASYNC109
    """Polls for a single message from subscribed topics asynchronously.

    Args:
        timeout (int, optional): Timeout in seconds for the operation. Defaults to 1.

    Returns:
        Message | None: The consumed message or None if no message was received.

    Raises:
        ConnectionTimeoutError: If the operation times out.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error polling for messages.
    """
    try:
        adapter = await self._get_adapter()
        message: Message | None = await adapter.poll(timeout)
        if message is None:
            logger.debug("No async message received")
            return None
        if message.error():
            logger.error("Async consumer error: %s", message.error())
            return None
        logger.debug("Async message consumed: %s", message)
        message.set_value(message.value())
    except Exception as e:
        self._handle_kafka_exception(e, "poll")
    else:
        return message

archipy.adapters.kafka.adapters.AsyncKafkaConsumerAdapter.commit async

commit(
    message: Message, asynchronous: bool = True
) -> None | list[TopicPartition]

Commits the offset for a message asynchronously.

Parameters:

Name Type Description Default
message Message

The message to commit.

required
asynchronous bool

Whether to commit asynchronously. Defaults to True.

True

Returns:

Type Description
None | list[TopicPartition]

None | list[TopicPartition]: None for async commits, list of TopicPartition for sync commits.

Raises:

Type Description
InvalidArgumentError

If the message is invalid.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error committing the offset.

Source code in archipy/adapters/kafka/adapters.py
@override
async def commit(self, message: Message, asynchronous: bool = True) -> None | list[TopicPartition]:
    """Commits the offset for a message asynchronously.

    Args:
        message (Message): The message to commit.
        asynchronous (bool, optional): Whether to commit asynchronously. Defaults to True.

    Returns:
        None | list[TopicPartition]: None for async commits, list of TopicPartition for sync commits.

    Raises:
        InvalidArgumentError: If the message is invalid.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error committing the offset.
    """
    try:
        adapter = await self._get_adapter()
        result = await adapter.commit(message=message, asynchronous=asynchronous)
    except Exception as e:
        self._handle_kafka_exception(e, "commit")
    else:
        return result

archipy.adapters.kafka.adapters.AsyncKafkaConsumerAdapter.subscribe async

subscribe(topic_list: list[str]) -> None

Subscribes to a list of topics asynchronously.

Parameters:

Name Type Description Default
topic_list list[str]

List of topics to subscribe to.

required

Raises:

Type Description
InvalidArgumentError

If the topic list is invalid.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error subscribing to topics.

Source code in archipy/adapters/kafka/adapters.py
@override
async def subscribe(self, topic_list: list[str]) -> None:
    """Subscribes to a list of topics asynchronously.

    Args:
        topic_list (list[str]): List of topics to subscribe to.

    Raises:
        InvalidArgumentError: If the topic list is invalid.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error subscribing to topics.
    """
    try:
        adapter = await self._get_adapter()
        await adapter.subscribe(topic_list)
    except Exception as e:
        self._handle_kafka_exception(e, "subscribe")

archipy.adapters.kafka.adapters.AsyncKafkaConsumerAdapter.assign async

assign(partition_list: list[TopicPartition]) -> None

Assigns the async consumer to a list of topic partitions.

Parameters:

Name Type Description Default
partition_list list[TopicPartition]

List of partitions to assign.

required

Raises:

Type Description
InvalidArgumentError

If the partition list is invalid.

ServiceUnavailableError

If Kafka is unavailable.

InternalError

If there is an error assigning partitions.

Source code in archipy/adapters/kafka/adapters.py
@override
async def assign(self, partition_list: list[TopicPartition]) -> None:
    """Assigns the async consumer to a list of topic partitions.

    Args:
        partition_list (list[TopicPartition]): List of partitions to assign.

    Raises:
        InvalidArgumentError: If the partition list is invalid.
        ServiceUnavailableError: If Kafka is unavailable.
        InternalError: If there is an error assigning partitions.
    """
    try:
        adapter = await self._get_adapter()
        await adapter.assign(partition_list)
    except Exception as e:
        self._handle_kafka_exception(e, "assign")

archipy.adapters.kafka.adapters.AsyncKafkaConsumerAdapter.close async

close() -> None

Closes the async consumer, leaving the consumer group and committing offsets.

Raises:

Type Description
ServiceUnavailableError

If Kafka is unavailable during close.

InternalError

If there is an error closing the consumer.

Source code in archipy/adapters/kafka/adapters.py
@override
async def close(self) -> None:
    """Closes the async consumer, leaving the consumer group and committing offsets.

    Raises:
        ServiceUnavailableError: If Kafka is unavailable during close.
        InternalError: If there is an error closing the consumer.
    """
    try:
        if self._adapter is not None:
            await self._adapter.close()
            self._adapter = None
            logger.debug("Async consumer closed")
    except Exception as e:
        self._handle_kafka_exception(e, "close")

options: show_root_toc_entry: false heading_level: 3