Kafka Adapter¶
The Kafka adapter provides a clean interface for interacting with Apache Kafka, supporting both synchronous and asynchronous operations.
Features¶
- Topic operations (create, list, delete)
- Message publishing and consuming
- Consumer group management
- Built-in error handling and retry mechanisms
- Support for both sync and async operations
- Comprehensive logging and monitoring
Basic Usage¶
Configuration¶
Configure Kafka in your application's config:
from archipy.configs.base_config import BaseConfig
# Using environment variables
# KAFKA__BOOTSTRAP_SERVERS=localhost:9092
# KAFKA__CLIENT_ID=my-app
# KAFKA__GROUP_ID=my-group
Initializing the Adapter¶
from archipy.adapters.kafka.adapters import KafkaAdapter, AsyncKafkaAdapter
# Use global configuration
kafka = KafkaAdapter()
# Or provide specific configuration
from archipy.configs.config_template import KafkaConfig
custom_config = KafkaConfig(
BOOTSTRAP_SERVERS="kafka1:9092,kafka2:9092",
CLIENT_ID="custom-client",
GROUP_ID="custom-group"
)
kafka = KafkaAdapter(custom_config)
Topic Operations¶
import logging
# Configure logging
logger = logging.getLogger(__name__)
# Create a topic
kafka.create_topic("my-topic", num_partitions=3, replication_factor=1)
# List all topics
topics = kafka.list_topics()
for topic in topics:
logger.info(f"Topic: {topic}")
# Delete a topic
kafka.delete_topic("my-topic")
Publishing Messages¶
import logging
from typing import Any
# Configure logging
logger = logging.getLogger(__name__)
# Publish a simple message
kafka.publish("my-topic", "Hello, Kafka!")
# Publish with key and headers
headers = {"source": "my-app", "version": "1.0"}
kafka.publish("my-topic", "Hello, Kafka!", key="message-1", headers=headers)
# Publish multiple messages
messages = [
{"key": "msg1", "value": "Message 1"},
{"key": "msg2", "value": "Message 2"}
]
kafka.publish_batch("my-topic", messages)
Consuming Messages¶
import logging
from typing import Any
# Configure logging
logger = logging.getLogger(__name__)
# Consume messages with a callback
def process_message(message: dict[str, Any]) -> None:
logger.info(f"Received message: {message['value']}")
# Start consuming
kafka.consume("my-topic", process_message)
# Consume with specific partition and offset
kafka.consume("my-topic", process_message, partition=0, offset=0)
# Consume with timeout
kafka.consume("my-topic", process_message, timeout_ms=5000)
Async Operations¶
import asyncio
import logging
from typing import Any
# Configure logging
logger = logging.getLogger(__name__)
async def async_example():
# Create async Kafka adapter
async_kafka = AsyncKafkaAdapter()
# Publish message asynchronously
await async_kafka.publish("my-topic", "Async message")
# Consume messages asynchronously
async def process_async(message: dict[str, Any]) -> None:
logger.info(f"Received async message: {message['value']}")
await async_kafka.consume("my-topic", process_async)
# Run the async example
asyncio.run(async_example())
Error Handling¶
The KafkaAdapter uses ArchiPy's domain-specific exceptions for consistent error handling:
import logging
from archipy.models.errors import (
AlreadyExistsError,
InternalError,
InvalidArgumentError,
NotFoundError,
PermissionDeniedError,
)
# Configure logging
logger = logging.getLogger(__name__)
try:
kafka.create_topic("existing-topic")
except AlreadyExistsError:
logger.warning("Topic already exists")
except PermissionDeniedError:
logger.exception("Permission denied to create topic")
except InvalidArgumentError as e:
logger.exception(f"Invalid argument: {e}")
except InternalError as e:
logger.exception(f"Internal error: {e}")
Consumer Group Management¶
import logging
# Configure logging
logger = logging.getLogger(__name__)
# List consumer groups
groups = kafka.list_consumer_groups()
for group in groups:
logger.info(f"Group: {group['group_id']}, State: {group['state']}")
# Describe consumer group
group_info = kafka.describe_consumer_group("my-group")
logger.info(f"Group members: {group_info['members']}")
# Delete consumer group
kafka.delete_consumer_group("my-group")
Integration with Web Applications¶
FastAPI Example¶
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from archipy.adapters.kafka.adapters import KafkaAdapter
from archipy.models.errors import InternalError
app = FastAPI()
kafka = KafkaAdapter()
class Message(BaseModel):
content: str
key: str | None = None
@app.post("/publish/{topic}")
async def publish_message(topic: str, message: Message):
try:
kafka.publish(topic, message.content, key=message.key)
return {"message": "Message published successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/topics")
async def list_topics():
try:
topics = kafka.list_topics()
return {"topics": topics}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Testing with BDD¶
The Kafka adapter comes with BDD tests to verify functionality. Here's a sample feature file:
Feature: Kafka Operations Testing
As a developer
I want to test Kafka messaging operations
So that I can ensure reliable message delivery
Scenario: Publishing and consuming messages
Given I have a Kafka topic "test-topic"
When I publish a message "Hello, Kafka!" to "test-topic"
Then I should be able to consume the message from "test-topic"