Event-Driven Architecture with Kafka and CloudEvents: Patterns for 2026



Event-Driven Architecture with Kafka and CloudEvents: Patterns for 2026

Event-driven architecture promises decoupling and scalability. It delivers—when designed well. When designed poorly, it delivers debugging nightmares, silent data loss, and services that break in ways that are impossible to trace. This post covers the production patterns that separate the former from the latter.

Data Flow Photo by NASA on Unsplash

CloudEvents: Standardize Your Event Envelope

Before discussing Kafka, let’s talk about the event structure itself. CloudEvents (CNCF Graduated) standardizes the event envelope so tooling, middleware, and consumers can work without knowing your domain.

{
  "specversion": "1.0",
  "id": "7d3c5f9e-2b1a-4e8d-9f0c-3a6b2e1d8c5f",
  "source": "/services/orders/production",
  "type": "com.example.orders.v1.created",
  "subject": "order/ord-123456",
  "time": "2026-05-16T13:00:00Z",
  "datacontenttype": "application/json",
  "schemaurl": "https://schemas.example.com/orders/v1/created.json",
  "data": {
    "orderId": "ord-123456",
    "customerId": "cust-78901",
    "totalAmountUsd": 149.99,
    "items": [
      { "sku": "PROD-001", "quantity": 2, "priceUsd": 74.99 }
    ]
  }
}

CloudEvents SDK Usage

from cloudevents.http import CloudEvent
from cloudevents.kafka import to_structured, from_structured
import uuid
from datetime import datetime, timezone

# Create a CloudEvent
def create_order_event(order: Order) -> CloudEvent:
    return CloudEvent(
        attributes={
            "type": "com.example.orders.v1.created",
            "source": "/services/orders/production",
            "subject": f"order/{order.id}",
            "id": str(uuid.uuid4()),
            "time": datetime.now(timezone.utc).isoformat(),
            "datacontenttype": "application/json",
        },
        data={
            "orderId": order.id,
            "customerId": order.customer_id,
            "totalAmountUsd": float(order.total),
            "items": [
                {
                    "sku": item.sku,
                    "quantity": item.quantity,
                    "priceUsd": float(item.price)
                }
                for item in order.items
            ]
        }
    )

# Serialize for Kafka
event = create_order_event(order)
message = to_structured(event)  # Returns (headers, body) tuple

Kafka Production Patterns

1. Topic Design: One Event Type Per Topic

A common mistake is putting multiple event types on one topic. Don’t:

# ❌ Mixed topic — consumer must check type before processing
# Topic: "order-events"
# Contains: OrderCreated, OrderUpdated, OrderCancelled, PaymentProcessed

# ✅ Typed topics — consumers subscribe to exactly what they need
# Topics:
# - orders.v1.created
# - orders.v1.updated
# - orders.v1.cancelled
# - payments.v1.processed

TOPIC_CONFIG = {
    "orders.v1.created": {
        "partitions": 12,           # Based on throughput
        "replication_factor": 3,    # Never < 3 in production
        "retention_ms": 7 * 24 * 3600 * 1000,   # 7 days
        "cleanup_policy": "delete",
        "min_insync_replicas": 2,
    },
    # Compact topics for state (keep last event per key)
    "orders.v1.state": {
        "partitions": 12,
        "replication_factor": 3,
        "cleanup_policy": "compact",
        "min_compaction_lag_ms": 3600000,  # 1 hour before compaction
    }
}

2. Exactly-Once Semantics (EOS)

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json

class ExactlyOnceProducer:
    """
    Transactional producer for exactly-once delivery.
    Required: Kafka broker 0.11+, transactional.id set.
    """

    def __init__(self, bootstrap_servers: list[str], transactional_id: str):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            # Idempotent: dedup retries
            enable_idempotence=True,
            # Transactional: atomic multi-topic writes
            transactional_id=transactional_id,
            acks="all",
            max_in_flight_requests_per_connection=5,
            retries=2147483647,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
        )
        self.producer.init_transactions()

    def send_transactionally(
        self,
        messages: list[tuple[str, str, dict]]  # (topic, key, value)
    ):
        """Send multiple messages atomically (all or nothing)."""
        self.producer.begin_transaction()
        try:
            futures = []
            for topic, key, value in messages:
                future = self.producer.send(
                    topic,
                    key=key.encode("utf-8"),
                    value=value
                )
                futures.append(future)

            # Wait for all sends before committing
            for future in futures:
                future.get(timeout=10)

            self.producer.commit_transaction()

        except Exception as e:
            self.producer.abort_transaction()
            raise RuntimeError(f"Transaction aborted: {e}") from e

3. Consumer Group Patterns

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
import signal

class ReliableConsumer:
    def __init__(
        self,
        topics: list[str],
        group_id: str,
        bootstrap_servers: list[str],
        batch_size: int = 100,
    ):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            # Manual commit: commit after successful processing
            enable_auto_commit=False,
            auto_offset_reset="earliest",
            # Isolation: read only committed transactions
            isolation_level="read_committed",
            # Cooperative rebalancing (reduces stop-the-world pauses)
            partition_assignment_strategy=[
                "cooperative-sticky"
            ],
            max_poll_records=batch_size,
            # Prevent rebalance during slow processing
            max_poll_interval_ms=300_000,  # 5 min
            session_timeout_ms=45_000,
            heartbeat_interval_ms=3_000,
        )
        self._running = True
        signal.signal(signal.SIGTERM, self._shutdown)

    def process(self, handler: callable):
        """
        Process messages with manual offset commit after successful batch.
        Exactly-once processing when combined with idempotent handler.
        """
        while self._running:
            records = self.consumer.poll(timeout_ms=1000)

            if not records:
                continue

            offsets_to_commit = {}

            for tp, messages in records.items():
                for msg in messages:
                    try:
                        handler(msg)
                        # Track highest offset per partition
                        offsets_to_commit[tp] = OffsetAndMetadata(
                            msg.offset + 1, None
                        )
                    except Exception as e:
                        # Log + send to DLQ, don't commit this offset
                        self._send_to_dlq(msg, e)
                        # Commit offsets up to (but not including) failed message
                        break

            if offsets_to_commit:
                self.consumer.commit(offsets=offsets_to_commit)

    def _send_to_dlq(self, message, error: Exception):
        """Dead-letter queue for unprocessable messages."""
        # Preserve original message + add error metadata
        dlq_payload = {
            "original_topic": message.topic,
            "original_partition": message.partition,
            "original_offset": message.offset,
            "original_key": message.key.decode() if message.key else None,
            "original_value": message.value.decode(),
            "error": str(error),
            "failed_at": datetime.utcnow().isoformat(),
        }
        # Send to dead-letter topic
        dlq_producer.send(
            f"{message.topic}.dlq",
            key=message.key,
            value=json.dumps(dlq_payload).encode()
        )

    def _shutdown(self, signum, frame):
        self._running = False
        self.consumer.close()

Schema Registry: Preventing Breaking Changes

Confluent Schema Registry (or Apicurio) enforces schema compatibility before messages hit the topic.

from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

# Avro schema definition
ORDER_CREATED_SCHEMA = """
{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders.v1",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "totalAmountUsd", "type": "double"},
    {"name": "createdAt", "type": "string"},
    {
      "name": "metadata",
      "type": {"type": "map", "values": "string"},
      "default": {}
    }
  ]
}
"""

schema_registry = SchemaRegistryClient({"url": "http://schema-registry:8081"})

avro_serializer = AvroSerializer(
    schema_registry,
    ORDER_CREATED_SCHEMA,
    conf={"auto.register.schemas": False}  # Fail fast on unregistered schema
)

# Producer with schema validation
producer = Producer({
    "bootstrap.servers": "kafka:9092",
    "enable.idempotence": True,
})

producer.produce(
    topic="orders.v1.created",
    key=order_id,
    value=avro_serializer(
        order_data,
        SerializationContext("orders.v1.created", MessageField.VALUE)
    ),
    on_delivery=lambda err, msg: print(f"Error: {err}" if err else f"Delivered: {msg.offset()}")
)

Schema Evolution Rules (BACKWARD_TRANSITIVE)

BACKWARD compatible (consumers can read new format with old schema):
  ✅ Add optional field with default
  ✅ Delete required field → becomes optional
  ❌ Remove field with no default
  ❌ Change field type

Safe evolution pattern:
  v1: {orderId, customerId, totalAmount}
  v2: {orderId, customerId, totalAmount, shippingAddress=""}  ← Add with default
  v3: {orderId, customerId, totalAmount, shippingAddress, region="UNKNOWN"}

Event Sourcing with Kafka

from dataclasses import dataclass, field
from typing import Any
import json

@dataclass
class DomainEvent:
    aggregate_id: str
    aggregate_type: str
    event_type: str
    sequence: int          # Monotonic, per aggregate
    payload: dict
    metadata: dict = field(default_factory=dict)

class EventStore:
    """
    Kafka as an append-only event log.
    Aggregate ID as partition key ensures ordering per aggregate.
    """

    def __init__(self, producer, topic_prefix: str):
        self.producer = producer
        self.topic_prefix = topic_prefix

    def append(self, event: DomainEvent):
        topic = f"{self.topic_prefix}.{event.aggregate_type}.events"
        self.producer.send(
            topic=topic,
            key=event.aggregate_id.encode(),  # Partition by aggregate
            value=json.dumps({
                "aggregateId": event.aggregate_id,
                "eventType": event.event_type,
                "sequence": event.sequence,
                "payload": event.payload,
                "metadata": event.metadata,
            }).encode(),
            headers=[
                ("event-type", event.event_type.encode()),
                ("sequence", str(event.sequence).encode()),
            ]
        )

    def replay(
        self,
        aggregate_id: str,
        aggregate_type: str,
        from_sequence: int = 0
    ) -> list[DomainEvent]:
        """
        Replay events for a specific aggregate.
        In production, use a snapshot + tail pattern for large aggregates.
        """
        topic = f"{self.topic_prefix}.{aggregate_type}.events"
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=self.bootstrap_servers,
            auto_offset_reset="earliest",
            enable_auto_commit=False,
            group_id=None  # No group — independent read
        )
        events = []
        # Filter by key in consumer (or use log compaction + key-based scan)
        for msg in consumer:
            data = json.loads(msg.value)
            if (data["aggregateId"] == aggregate_id
                    and data["sequence"] >= from_sequence):
                events.append(data)
            if consumer.position(TopicPartition(topic, msg.partition)) >= end_offset:
                break

        return sorted(events, key=lambda e: e["sequence"])

Monitoring Kafka in Production

# Prometheus JMX exporter config for Kafka
rules:
  # Consumer lag — the most important Kafka metric
  - pattern: "kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+), topic=(.+), partition=(.+)><>records-lag"
    name: kafka_consumer_records_lag
    labels:
      client_id: "$1"
      topic: "$2"
      partition: "$3"

  # Producer metrics
  - pattern: "kafka.producer<type=producer-metrics, client-id=(.+)><>record-send-rate"
    name: kafka_producer_record_send_rate
    labels:
      client_id: "$1"

Key alerts to configure:

# Alertmanager rules
groups:
  - name: kafka
    rules:
      - alert: KafkaConsumerLagHigh
        expr: kafka_consumer_records_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer  lag > 10K on "

      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_replicamanager_underreplicatedpartitions > 0
        for: 1m
        labels:
          severity: critical

      - alert: KafkaBrokerDown
        expr: up{job="kafka"} == 0
        for: 30s
        labels:
          severity: critical

Conclusion

Event-driven systems earn their complexity premium only when built carefully. The essentials: standardize your envelope with CloudEvents, enforce schema contracts before events reach topics, design for exactly-once delivery with idempotent consumers, and monitor consumer lag religiously.

The rest—compacted topics for state, event sourcing, CQRS—are tools to reach for when your complexity justifies them. Start with structured events on typed topics, reliable consumers, and a schema registry. That foundation handles most production use cases and makes the advanced patterns accessible when you need them.

이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)