kafka-consumer-implementation

Implement type-safe Kafka consumers for event consumption with msgspec deserialization. Use when building async consumers that process domain events (order messages, transactions) with offset management, error recovery, graceful shutdown, and distributed tracing. Handles consumer configuration, manual commits, and rebalancing strategies.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "kafka-consumer-implementation" with this command: npx skills add dawiddutoit/custom-claude/dawiddutoit-custom-claude-kafka-consumer-implementation

Kafka Consumer Implementation

Purpose

Implement production-grade Kafka consumers that reliably consume and process domain events with high performance, type safety, and comprehensive error recovery. Covers msgspec deserialization, confluent-kafka configuration, offset management, OpenTelemetry tracing, and anti-corruption layer patterns for translating message schemas to domain models.

When to Use This Skill

Use when building event-driven systems that consume domain events from Kafka topics with "implement Kafka consumer", "consume events from Kafka", "process order messages", or "set up event consumer".

Do NOT use when mocking Kafka consumers in unit tests (use pytest-adapter-integration-testing), implementing producers (use kafka-producer-implementation), or testing with testcontainers (use kafka-integration-testing).

Quick Start

Create a high-performance Kafka consumer in 3 steps:

  1. Define message schema:
import msgspec

class OrderEventMessage(msgspec.Struct, frozen=True):
    order_id: str
    created_at: str
    customer_name: str
    total_price: float
  1. Implement consumer:
from confluent_kafka import Consumer
import msgspec

class OrderEventConsumer:
    def __init__(self, brokers: list[str], topic: str, group_id: str) -> None:
        config = {
            "bootstrap.servers": ",".join(brokers),
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
        }
        self.consumer = Consumer(config)
        self.consumer.subscribe([topic])
        self.decoder = msgspec.json.Decoder(OrderEventMessage)

    def consume(self, timeout: float = 1.0) -> OrderEventMessage | None:
        msg = self.consumer.poll(timeout)
        if msg is None or msg.error():
            return None
        return self.decoder.decode(msg.value())

    def commit(self) -> None:
        self.consumer.commit(asynchronous=False)
  1. Use in application:
consumer = OrderEventConsumer(["localhost:9092"], "orders", "loader")
message = consumer.consume()
if message:
    process(message)
    consumer.commit()

Implementation Steps

1. Consumer Configuration

Key configuration for exactly-once processing:

config = {
    "bootstrap.servers": ",".join(brokers),
    "group.id": group_id,
    "auto.offset.reset": "earliest",  # Start from beginning
    "enable.auto.commit": False,  # Manual offset management
    "session.timeout.ms": 300000,  # 5 minute timeout
    "max.poll.interval.ms": 300000,  # 5 minutes for processing
}

2. Consumer Adapter

Implement consumer with error handling:

  • msgspec deserialization (10-20x faster than Pydantic)
  • OpenTelemetry distributed tracing
  • Manual offset management for exactly-once semantics
  • Comprehensive error logging

See references/detailed-implementation.md for complete consumer adapter code.

3. Anti-Corruption Layer

Translate Kafka messages to domain entities:

class OrderEventTranslator:
    @staticmethod
    def to_domain_order(message: OrderEventMessage) -> Order:
        # Validate
        if not message.order_id:
            raise ValueError("order_id is required")

        # Convert types (str -> OrderId, float -> Money)
        created_at = datetime.fromisoformat(message.created_at)
        order_id = OrderId(message.order_id)
        total_price = Money(Decimal(str(message.total_price)))

        return Order(order_id, created_at, message.customer_name, total_price, [])

4. Processing Loop

Main consumer loop pattern:

  1. Poll for messages (5s timeout)
  2. Translate to domain objects
  3. Process (load into storage)
  4. Commit offset (only after success)
  5. Handle errors without stopping loop

See references/detailed-implementation.md for complete processing loop code.

5. Lifecycle Management

Use context managers for clean shutdown:

@asynccontextmanager
async def managed_consumer(brokers, topic, group_id):
    consumer = OrderEventConsumer(brokers, topic, group_id)
    try:
        yield consumer
    finally:
        consumer.close()

Requirements

  • confluent-kafka>=2.3.0 - Production-grade Kafka client
  • msgspec>=0.18.6 - Ultra-fast deserialization
  • structlog>=23.2.0 - Structured logging
  • opentelemetry-api>=1.22.0 - Distributed tracing
  • Kafka/Redpanda broker running (3.x or later)
  • Python 3.11+ with type checking enabled

Consumer Groups and Offset Management

Consumer Groups: Consumers in the same group share responsibility for topic partitions. Kafka automatically rebalances when members join/leave.

Manual Offset Management (exactly-once-per-restart):

  1. Disable auto-commit: "enable.auto.commit": False
  2. Commit only after successful processing: consumer.commit()
  3. Offset reset behavior: "auto.offset.reset": "earliest" starts from beginning

See references/detailed-implementation.md for complete offset management patterns, consumer lag monitoring, and rebalancing behavior.

Error Handling

Key error handling strategies:

  • Deserialization failures: Log error, commit offset to skip poison pill
  • Processing failures: Don't commit, message will be retried on restart
  • Commit failures: Log error, continue (will retry on next message)

See references/error-handling.md for comprehensive error handling strategies and dead letter queue patterns.

Testing

Use testcontainers for integration tests:

from testcontainers.kafka import KafkaContainer

@pytest.fixture
def kafka_container():
    with KafkaContainer() as kafka:
        yield kafka

def test_consumer_roundtrip(kafka_container):
    brokers = [kafka_container.get_bootstrap_server()]
    # Test consumer/producer workflow

See examples/integration-examples.md for complete integration test patterns.

See Also

  • references/detailed-implementation.md - Complete consumer adapter and processing loop code
  • references/error-handling.md - Comprehensive error handling strategies
  • examples/integration-examples.md - Real-world integration patterns
  • kafka-producer-implementation skill - For producing events
  • kafka-schema-management skill - For schema design

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

playwright-web-scraper

No summary provided by upstream source.

Repository SourceNeeds Review
General

java-test-generator

No summary provided by upstream source.

Repository SourceNeeds Review
General

openscad-collision-detection

No summary provided by upstream source.

Repository SourceNeeds Review
General

pytest-async-testing

No summary provided by upstream source.

Repository SourceNeeds Review