Event-Driven Architecture with Kafka and CloudEvents: Patterns for 2026
on Kafka, Event-driven architecture, Cloudevents, Microservices, Distributed systems, Backend
Event-Driven Architecture with Kafka and CloudEvents: Patterns for 2026
Event-driven architecture promises decoupling and scalability. It delivers—when designed well. When designed poorly, it delivers debugging nightmares, silent data loss, and services that break in ways that are impossible to trace. This post covers the production patterns that separate the former from the latter.
Photo by NASA on Unsplash
CloudEvents: Standardize Your Event Envelope
Before discussing Kafka, let’s talk about the event structure itself. CloudEvents (CNCF Graduated) standardizes the event envelope so tooling, middleware, and consumers can work without knowing your domain.
{
"specversion": "1.0",
"id": "7d3c5f9e-2b1a-4e8d-9f0c-3a6b2e1d8c5f",
"source": "/services/orders/production",
"type": "com.example.orders.v1.created",
"subject": "order/ord-123456",
"time": "2026-05-16T13:00:00Z",
"datacontenttype": "application/json",
"schemaurl": "https://schemas.example.com/orders/v1/created.json",
"data": {
"orderId": "ord-123456",
"customerId": "cust-78901",
"totalAmountUsd": 149.99,
"items": [
{ "sku": "PROD-001", "quantity": 2, "priceUsd": 74.99 }
]
}
}
CloudEvents SDK Usage
from cloudevents.http import CloudEvent
from cloudevents.kafka import to_structured, from_structured
import uuid
from datetime import datetime, timezone
# Create a CloudEvent
def create_order_event(order: Order) -> CloudEvent:
return CloudEvent(
attributes={
"type": "com.example.orders.v1.created",
"source": "/services/orders/production",
"subject": f"order/{order.id}",
"id": str(uuid.uuid4()),
"time": datetime.now(timezone.utc).isoformat(),
"datacontenttype": "application/json",
},
data={
"orderId": order.id,
"customerId": order.customer_id,
"totalAmountUsd": float(order.total),
"items": [
{
"sku": item.sku,
"quantity": item.quantity,
"priceUsd": float(item.price)
}
for item in order.items
]
}
)
# Serialize for Kafka
event = create_order_event(order)
message = to_structured(event) # Returns (headers, body) tuple
Kafka Production Patterns
1. Topic Design: One Event Type Per Topic
A common mistake is putting multiple event types on one topic. Don’t:
# ❌ Mixed topic — consumer must check type before processing
# Topic: "order-events"
# Contains: OrderCreated, OrderUpdated, OrderCancelled, PaymentProcessed
# ✅ Typed topics — consumers subscribe to exactly what they need
# Topics:
# - orders.v1.created
# - orders.v1.updated
# - orders.v1.cancelled
# - payments.v1.processed
TOPIC_CONFIG = {
"orders.v1.created": {
"partitions": 12, # Based on throughput
"replication_factor": 3, # Never < 3 in production
"retention_ms": 7 * 24 * 3600 * 1000, # 7 days
"cleanup_policy": "delete",
"min_insync_replicas": 2,
},
# Compact topics for state (keep last event per key)
"orders.v1.state": {
"partitions": 12,
"replication_factor": 3,
"cleanup_policy": "compact",
"min_compaction_lag_ms": 3600000, # 1 hour before compaction
}
}
2. Exactly-Once Semantics (EOS)
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
class ExactlyOnceProducer:
"""
Transactional producer for exactly-once delivery.
Required: Kafka broker 0.11+, transactional.id set.
"""
def __init__(self, bootstrap_servers: list[str], transactional_id: str):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
# Idempotent: dedup retries
enable_idempotence=True,
# Transactional: atomic multi-topic writes
transactional_id=transactional_id,
acks="all",
max_in_flight_requests_per_connection=5,
retries=2147483647,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
self.producer.init_transactions()
def send_transactionally(
self,
messages: list[tuple[str, str, dict]] # (topic, key, value)
):
"""Send multiple messages atomically (all or nothing)."""
self.producer.begin_transaction()
try:
futures = []
for topic, key, value in messages:
future = self.producer.send(
topic,
key=key.encode("utf-8"),
value=value
)
futures.append(future)
# Wait for all sends before committing
for future in futures:
future.get(timeout=10)
self.producer.commit_transaction()
except Exception as e:
self.producer.abort_transaction()
raise RuntimeError(f"Transaction aborted: {e}") from e
3. Consumer Group Patterns
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
import signal
class ReliableConsumer:
def __init__(
self,
topics: list[str],
group_id: str,
bootstrap_servers: list[str],
batch_size: int = 100,
):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
# Manual commit: commit after successful processing
enable_auto_commit=False,
auto_offset_reset="earliest",
# Isolation: read only committed transactions
isolation_level="read_committed",
# Cooperative rebalancing (reduces stop-the-world pauses)
partition_assignment_strategy=[
"cooperative-sticky"
],
max_poll_records=batch_size,
# Prevent rebalance during slow processing
max_poll_interval_ms=300_000, # 5 min
session_timeout_ms=45_000,
heartbeat_interval_ms=3_000,
)
self._running = True
signal.signal(signal.SIGTERM, self._shutdown)
def process(self, handler: callable):
"""
Process messages with manual offset commit after successful batch.
Exactly-once processing when combined with idempotent handler.
"""
while self._running:
records = self.consumer.poll(timeout_ms=1000)
if not records:
continue
offsets_to_commit = {}
for tp, messages in records.items():
for msg in messages:
try:
handler(msg)
# Track highest offset per partition
offsets_to_commit[tp] = OffsetAndMetadata(
msg.offset + 1, None
)
except Exception as e:
# Log + send to DLQ, don't commit this offset
self._send_to_dlq(msg, e)
# Commit offsets up to (but not including) failed message
break
if offsets_to_commit:
self.consumer.commit(offsets=offsets_to_commit)
def _send_to_dlq(self, message, error: Exception):
"""Dead-letter queue for unprocessable messages."""
# Preserve original message + add error metadata
dlq_payload = {
"original_topic": message.topic,
"original_partition": message.partition,
"original_offset": message.offset,
"original_key": message.key.decode() if message.key else None,
"original_value": message.value.decode(),
"error": str(error),
"failed_at": datetime.utcnow().isoformat(),
}
# Send to dead-letter topic
dlq_producer.send(
f"{message.topic}.dlq",
key=message.key,
value=json.dumps(dlq_payload).encode()
)
def _shutdown(self, signum, frame):
self._running = False
self.consumer.close()
Schema Registry: Preventing Breaking Changes
Confluent Schema Registry (or Apicurio) enforces schema compatibility before messages hit the topic.
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
# Avro schema definition
ORDER_CREATED_SCHEMA = """
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.orders.v1",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmountUsd", "type": "double"},
{"name": "createdAt", "type": "string"},
{
"name": "metadata",
"type": {"type": "map", "values": "string"},
"default": {}
}
]
}
"""
schema_registry = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_serializer = AvroSerializer(
schema_registry,
ORDER_CREATED_SCHEMA,
conf={"auto.register.schemas": False} # Fail fast on unregistered schema
)
# Producer with schema validation
producer = Producer({
"bootstrap.servers": "kafka:9092",
"enable.idempotence": True,
})
producer.produce(
topic="orders.v1.created",
key=order_id,
value=avro_serializer(
order_data,
SerializationContext("orders.v1.created", MessageField.VALUE)
),
on_delivery=lambda err, msg: print(f"Error: {err}" if err else f"Delivered: {msg.offset()}")
)
Schema Evolution Rules (BACKWARD_TRANSITIVE)
BACKWARD compatible (consumers can read new format with old schema):
✅ Add optional field with default
✅ Delete required field → becomes optional
❌ Remove field with no default
❌ Change field type
Safe evolution pattern:
v1: {orderId, customerId, totalAmount}
v2: {orderId, customerId, totalAmount, shippingAddress=""} ← Add with default
v3: {orderId, customerId, totalAmount, shippingAddress, region="UNKNOWN"}
Event Sourcing with Kafka
from dataclasses import dataclass, field
from typing import Any
import json
@dataclass
class DomainEvent:
aggregate_id: str
aggregate_type: str
event_type: str
sequence: int # Monotonic, per aggregate
payload: dict
metadata: dict = field(default_factory=dict)
class EventStore:
"""
Kafka as an append-only event log.
Aggregate ID as partition key ensures ordering per aggregate.
"""
def __init__(self, producer, topic_prefix: str):
self.producer = producer
self.topic_prefix = topic_prefix
def append(self, event: DomainEvent):
topic = f"{self.topic_prefix}.{event.aggregate_type}.events"
self.producer.send(
topic=topic,
key=event.aggregate_id.encode(), # Partition by aggregate
value=json.dumps({
"aggregateId": event.aggregate_id,
"eventType": event.event_type,
"sequence": event.sequence,
"payload": event.payload,
"metadata": event.metadata,
}).encode(),
headers=[
("event-type", event.event_type.encode()),
("sequence", str(event.sequence).encode()),
]
)
def replay(
self,
aggregate_id: str,
aggregate_type: str,
from_sequence: int = 0
) -> list[DomainEvent]:
"""
Replay events for a specific aggregate.
In production, use a snapshot + tail pattern for large aggregates.
"""
topic = f"{self.topic_prefix}.{aggregate_type}.events"
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
auto_offset_reset="earliest",
enable_auto_commit=False,
group_id=None # No group — independent read
)
events = []
# Filter by key in consumer (or use log compaction + key-based scan)
for msg in consumer:
data = json.loads(msg.value)
if (data["aggregateId"] == aggregate_id
and data["sequence"] >= from_sequence):
events.append(data)
if consumer.position(TopicPartition(topic, msg.partition)) >= end_offset:
break
return sorted(events, key=lambda e: e["sequence"])
Monitoring Kafka in Production
# Prometheus JMX exporter config for Kafka
rules:
# Consumer lag — the most important Kafka metric
- pattern: "kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+), topic=(.+), partition=(.+)><>records-lag"
name: kafka_consumer_records_lag
labels:
client_id: "$1"
topic: "$2"
partition: "$3"
# Producer metrics
- pattern: "kafka.producer<type=producer-metrics, client-id=(.+)><>record-send-rate"
name: kafka_producer_record_send_rate
labels:
client_id: "$1"
Key alerts to configure:
# Alertmanager rules
groups:
- name: kafka
rules:
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_records_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer lag > 10K on "
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 1m
labels:
severity: critical
- alert: KafkaBrokerDown
expr: up{job="kafka"} == 0
for: 30s
labels:
severity: critical
Conclusion
Event-driven systems earn their complexity premium only when built carefully. The essentials: standardize your envelope with CloudEvents, enforce schema contracts before events reach topics, design for exactly-once delivery with idempotent consumers, and monitor consumer lag religiously.
The rest—compacted topics for state, event sourcing, CQRS—are tools to reach for when your complexity justifies them. Start with structured events on typed topics, reliable consumers, and a schema registry. That foundation handles most production use cases and makes the advanced patterns accessible when you need them.
이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)
