Kafka Adapter Tutorial¶
The Kafka adapter provides a clean interface for interacting with Apache Kafka through five
separate adapters — covering admin (topic management), synchronous producing/consuming, and
asynchronous producing/consuming via confluent_kafka.aio.
Installation¶
Configuration¶
Configure Kafka via environment variables or a KafkaConfig object.
Environment Variables¶
KAFKA__BROKERS_LIST='["localhost:9092"]'
KAFKA__CLIENT_ID=my-app
KAFKA__SECURITY_PROTOCOL=PLAINTEXT
KAFKA__ACKS=all
KAFKA__AUTO_OFFSET_RESET=earliest
Direct Configuration¶
from archipy.configs.config_template import KafkaConfig
config = KafkaConfig(
BROKERS_LIST=["kafka1:9092", "kafka2:9092"],
CLIENT_ID="my-app",
SECURITY_PROTOCOL="SASL_SSL",
SASL_MECHANISM="PLAIN",
USERNAME="kafka-user",
)
Async-Specific Configuration¶
The async adapters respect three additional fields that tune the underlying AIOProducer /
AIOConsumer thread pools:
| Field | Default | Description |
|---|---|---|
PRODUCER_MAX_WORKERS |
4 |
Thread pool workers for AIOProducer |
PRODUCER_BATCH_SIZE |
1000 |
Max messages per AIOProducer batch |
PRODUCER_BUFFER_TIMEOUT |
1.0 |
Buffer flush timeout (seconds) for AIOProducer |
CONSUMER_MAX_WORKERS |
2 |
Thread pool workers for AIOConsumer |
from archipy.configs.config_template import KafkaConfig
async_config = KafkaConfig(
BROKERS_LIST=["kafka1:9092"],
PRODUCER_MAX_WORKERS=8,
PRODUCER_BATCH_SIZE=500,
PRODUCER_BUFFER_TIMEOUT=0.5,
CONSUMER_MAX_WORKERS=4,
)
Basic Usage¶
Admin — Topic Management¶
KafkaAdminAdapter handles topic CRUD operations.
import logging
from archipy.adapters.kafka.adapters import KafkaAdminAdapter
from archipy.models.errors import (
AlreadyExistsError,
InternalError,
InvalidArgumentError,
ServiceUnavailableError,
)
logger = logging.getLogger(__name__)
# Use global configuration
admin = KafkaAdminAdapter()
# Create a topic
try:
admin.create_topic("my-topic", num_partitions=3, replication_factor=1)
except InvalidArgumentError as e:
logger.error("Invalid topic configuration: %s", e)
raise
except ServiceUnavailableError as e:
logger.error("Kafka service unavailable: %s", e)
raise
except InternalError as e:
logger.error("Internal error creating topic: %s", e)
raise
else:
logger.info("Topic created successfully")
# List all topics — returns ClusterMetadata
try:
cluster_metadata = admin.list_topics()
except InternalError as e:
logger.error("Failed to list topics: %s", e)
raise
else:
for topic_name in cluster_metadata.topics:
logger.info("Topic: %s", topic_name)
# Delete topics — accepts a list of topic names
try:
admin.delete_topic(["my-topic"])
except InvalidArgumentError as e:
logger.error("Invalid topic list: %s", e)
raise
except InternalError as e:
logger.error("Failed to delete topic: %s", e)
raise
else:
logger.info("Topic deleted successfully")
Producer — Publishing Messages¶
KafkaProducerAdapter is bound to a single topic at construction time.
import logging
from archipy.adapters.kafka.adapters import KafkaProducerAdapter
from archipy.models.errors import InternalError, NetworkError, ResourceExhaustedError
logger = logging.getLogger(__name__)
# Requires topic_name at construction
producer = KafkaProducerAdapter(topic_name="my-topic")
# Publish a simple string message
try:
producer.produce("Hello, Kafka!")
except NetworkError as e:
logger.error("Network error producing message: %s", e)
raise
except ResourceExhaustedError as e:
logger.error("Producer queue full: %s", e)
raise
except InternalError as e:
logger.error("Failed to produce message: %s", e)
raise
else:
logger.info("Message enqueued successfully")
# Publish with a message key (for partition affinity)
try:
producer.produce("Hello, Kafka with key!", key="user-123")
except InternalError as e:
logger.error("Failed to produce keyed message: %s", e)
raise
else:
logger.info("Keyed message enqueued")
# Flush to ensure all pending messages are delivered
try:
producer.flush(timeout=10)
except InternalError as e:
logger.error("Flush failed: %s", e)
raise
else:
logger.info("All messages flushed")
Consumer — Consuming Messages¶
KafkaConsumerAdapter requires a group_id and exactly one of topic_list (subscribe)
or partition_list (assign). Providing both or neither raises InvalidArgumentError.
import logging
from archipy.adapters.kafka.adapters import KafkaConsumerAdapter
from archipy.models.errors import InternalError, ServiceUnavailableError
logger = logging.getLogger(__name__)
# Subscribe to topics by name
consumer = KafkaConsumerAdapter(
group_id="my-consumer-group",
topic_list=["my-topic"],
)
# Batch consume — returns list[Message], skips messages with errors
try:
messages = consumer.batch_consume(messages_number=10, timeout=5)
except ServiceUnavailableError as e:
logger.error("Kafka unavailable during consume: %s", e)
raise
except InternalError as e:
logger.error("Error consuming messages: %s", e)
raise
else:
for msg in messages:
value = msg.value()
if isinstance(value, bytes):
value = value.decode("utf-8")
logger.info("Received: %s", value)
# Commit offset after processing
consumer.commit(msg, asynchronous=False)
# Poll for a single message
msg = consumer.poll(timeout=1)
if msg is not None:
logger.info("Polled: %s", msg.value().decode("utf-8"))
Assign to Specific Partitions¶
import logging
from confluent_kafka import TopicPartition
from archipy.adapters.kafka.adapters import KafkaConsumerAdapter
logger = logging.getLogger(__name__)
# Assign to specific topic partitions (manual partition control)
consumer = KafkaConsumerAdapter(
group_id="partition-consumer-group",
partition_list=[
TopicPartition("my-topic", partition=0),
TopicPartition("my-topic", partition=1),
],
)
messages = consumer.batch_consume(messages_number=100, timeout=10)
logger.info("Consumed %d messages from assigned partitions", len(messages))
Async Adapters¶
The AsyncKafkaProducerAdapter and AsyncKafkaConsumerAdapter wrap confluent_kafka.aio's
AIOProducer and AIOConsumer respectively. Both use lazy initialisation — the underlying
AIOProducer / AIOConsumer is created on the first await call, not at construction time.
This makes them safe to construct in non-async contexts (e.g., at module level or in
dependency-injection containers) without requiring a running event loop.
Async Producer¶
import asyncio
import logging
from archipy.adapters.kafka.adapters import AsyncKafkaProducerAdapter
from archipy.models.errors import InternalError, NetworkError, ResourceExhaustedError, UnavailableError
logger = logging.getLogger(__name__)
async def publish_events() -> None:
"""Publish a batch of events using the async producer.
Raises:
NetworkError: If a network error prevents message delivery.
ResourceExhaustedError: If the internal producer queue is full.
InternalError: If an unexpected error occurs.
"""
producer = AsyncKafkaProducerAdapter(topic_name="events")
try:
await producer.produce("event-payload-1")
await producer.produce("event-payload-2", key="user-42")
await producer.flush()
except NetworkError as e:
logger.error("Network error: %s", e)
raise
except ResourceExhaustedError as e:
logger.error("Producer queue exhausted: %s", e)
raise
except InternalError as e:
logger.error("Unexpected error: %s", e)
raise
finally:
await producer.close()
logger.info("Events published successfully")
asyncio.run(publish_events())
Async Consumer¶
AsyncKafkaConsumerAdapter mirrors the sync consumer API — provide exactly one of
topic_list or partition_list.
import asyncio
import logging
from archipy.adapters.kafka.adapters import AsyncKafkaConsumerAdapter
from archipy.models.errors import InternalError, ServiceUnavailableError
logger = logging.getLogger(__name__)
async def consume_events() -> None:
"""Consume a batch of events using the async consumer.
Raises:
ServiceUnavailableError: If Kafka is unreachable.
InternalError: If an unexpected error occurs.
"""
consumer = AsyncKafkaConsumerAdapter(
group_id="async-consumer-group",
topic_list=["events"],
)
try:
messages = await consumer.batch_consume(messages_number=50, timeout=5)
except ServiceUnavailableError as e:
logger.error("Kafka unavailable: %s", e)
raise
except InternalError as e:
logger.error("Error consuming messages: %s", e)
raise
else:
for msg in messages:
value = msg.value()
if isinstance(value, bytes):
value = value.decode("utf-8")
logger.info("Received: %s", value)
await consumer.commit(msg, asynchronous=False)
finally:
await consumer.close()
asyncio.run(consume_events())
Async Consumer — Assign to Specific Partitions¶
import asyncio
import logging
from confluent_kafka import TopicPartition
from archipy.adapters.kafka.adapters import AsyncKafkaConsumerAdapter
logger = logging.getLogger(__name__)
async def consume_from_partitions() -> None:
"""Consume messages from manually assigned partitions.
Raises:
InternalError: If an unexpected error occurs during consumption.
"""
consumer = AsyncKafkaConsumerAdapter(
group_id="async-partition-group",
partition_list=[
TopicPartition("events", partition=0),
TopicPartition("events", partition=1),
],
)
try:
messages = await consumer.batch_consume(messages_number=100, timeout=10)
logger.info("Consumed %d messages", len(messages))
finally:
await consumer.close()
asyncio.run(consume_from_partitions())
Async Producer Health Check¶
import asyncio
import logging
from archipy.adapters.kafka.adapters import AsyncKafkaProducerAdapter
from archipy.models.errors import UnavailableError
logger = logging.getLogger(__name__)
async def check_health() -> None:
"""Check health of the async Kafka producer.
Raises:
UnavailableError: If the Kafka service is unreachable.
"""
producer = AsyncKafkaProducerAdapter(topic_name="health-check")
try:
await producer.validate_healthiness()
except UnavailableError as e:
logger.error("Async Kafka producer is unhealthy: %s", e)
raise
else:
logger.info("Async Kafka producer connection is healthy")
finally:
await producer.close()
asyncio.run(check_health())
Error Handling¶
The Kafka adapters map low-level Kafka errors to ArchiPy domain exceptions:
import logging
from archipy.adapters.kafka.adapters import KafkaAdminAdapter, KafkaProducerAdapter
from archipy.models.errors import (
ConfigurationError,
ConnectionTimeoutError,
InternalError,
InvalidArgumentError,
NetworkError,
ResourceExhaustedError,
ServiceUnavailableError,
)
logger = logging.getLogger(__name__)
try:
admin = KafkaAdminAdapter()
admin.create_topic("my-topic")
except ConfigurationError as e:
logger.error("Kafka misconfigured: %s", e)
raise
except InvalidArgumentError as e:
logger.error("Invalid topic name or partition config: %s", e)
raise
except ConnectionTimeoutError as e:
logger.error("Timed out connecting to Kafka: %s", e)
raise
except ServiceUnavailableError as e:
logger.error("Kafka broker unavailable: %s", e)
raise
except InternalError as e:
logger.error("Unexpected Kafka error: %s", e)
raise
try:
producer = KafkaProducerAdapter(topic_name="my-topic")
producer.produce("test message")
producer.flush()
except NetworkError as e:
logger.error("Network error: %s", e)
raise
except ResourceExhaustedError as e:
logger.error("Producer queue exhausted: %s", e)
raise
except InternalError as e:
logger.error("Unexpected error: %s", e)
raise
Producer Health Check¶
import logging
from archipy.adapters.kafka.adapters import KafkaProducerAdapter
from archipy.models.errors import UnavailableError
logger = logging.getLogger(__name__)
producer = KafkaProducerAdapter(topic_name="my-topic")
try:
producer.validate_healthiness()
except UnavailableError as e:
logger.error("Kafka producer is unhealthy: %s", e)
raise
else:
logger.info("Kafka producer connection is healthy")
Integration with FastAPI¶
The async adapters are designed for use in async frameworks such as FastAPI. Use a single
shared adapter instance (e.g., via application lifespan) and call close() on shutdown.
import logging
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from archipy.adapters.kafka.adapters import AsyncKafkaProducerAdapter, KafkaAdminAdapter
from archipy.models.errors import InternalError, NetworkError, UnavailableError
logger = logging.getLogger(__name__)
producer: AsyncKafkaProducerAdapter | None = None
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Manage async producer lifecycle.
Args:
app: The FastAPI application instance.
Yields:
None
"""
global producer
producer = AsyncKafkaProducerAdapter(topic_name="events")
yield
if producer is not None:
await producer.close()
app = FastAPI(lifespan=lifespan)
class EventMessage(BaseModel):
"""Payload for publishing an event."""
content: str
key: str | None = None
@app.post("/events/publish")
async def publish_event(event: EventMessage) -> dict[str, str]:
"""Publish an event to Kafka.
Args:
event: Event message with content and optional key.
Returns:
Status message.
Raises:
HTTPException: If the message cannot be delivered.
"""
if producer is None:
raise HTTPException(status_code=503, detail="Producer not initialised")
try:
await producer.produce(event.content, key=event.key)
await producer.flush()
except UnavailableError as e:
logger.error("Kafka unavailable: %s", e)
raise HTTPException(status_code=503, detail="Kafka service unavailable") from e
except NetworkError as e:
logger.error("Network error publishing event: %s", e)
raise HTTPException(status_code=502, detail="Network error") from e
except InternalError as e:
logger.error("Failed to publish event: %s", e)
raise HTTPException(status_code=500, detail=str(e)) from e
else:
logger.info("Event published to 'events' topic")
return {"message": "Event published successfully"}
@app.get("/topics")
async def list_topics() -> dict[str, list[str]]:
"""List all Kafka topics.
Returns:
Dictionary with list of topic names.
Raises:
HTTPException: If topic listing fails.
"""
try:
admin = KafkaAdminAdapter()
cluster_metadata = admin.list_topics()
except InternalError as e:
logger.error("Failed to list topics: %s", e)
raise HTTPException(status_code=500, detail=str(e)) from e
else:
logger.info("Retrieved topic list")
return {"topics": list(cluster_metadata.topics.keys())}
See Also¶
- Error Handling — Exception handling patterns with proper chaining
- Configuration Management — Kafka configuration setup
- BDD Testing — Testing Kafka operations
- API Reference — Full Kafka adapter API documentation