Event-Driven Architecture with Kafka and Flink: Stream Processing Patterns for 2026



Event-Driven Architecture with Kafka and Flink: Stream Processing Patterns for 2026

Real-time data processing has become a competitive necessity. Whether it’s fraud detection that needs sub-second decisions, personalization engines reacting to user behavior, or operational dashboards reflecting live system state — batch pipelines that run every hour simply don’t cut it anymore. Apache Kafka and Apache Flink remain the cornerstone of production event-driven architectures in 2026. This post covers the patterns, pitfalls, and practical implementations.

Data Streaming Photo by Franck V. on Unsplash


The Event-Driven Architecture Landscape in 2026

Core Technologies

LayerTechnologyAlternatives
Message brokerApache KafkaRedpanda, Pulsar, AWS Kinesis
Stream processingApache FlinkSpark Streaming, Kafka Streams
Schema registryConfluent Schema RegistryAWS Glue
Stream storageApache IcebergDelta Lake, Apache Hudi
ServingApache Pinot, DruidClickHouse, Materialize
  • Kafka: Proven at 100M+ events/sec, log-based storage enables replay, broad ecosystem
  • Flink: Stateful stream processing with exactly-once guarantees, SQL support, native Kubernetes operator

Event Design: The Foundation

Good event-driven systems live or die by event design. Events should be:

Immutable facts — not commands or state snapshots:

❌ Bad: UpdateUserPreferences { user_id: "123", dark_mode: true }
✅ Good: UserPreferenceChanged { user_id: "123", preference: "dark_mode", value: "true", timestamp: "..." }

Self-describing with schema evolution in mind:

// Using Protocol Buffers for Kafka messages
syntax = "proto3";
package com.myorg.events.v1;

message OrderPlaced {
  string event_id = 1;
  string order_id = 2;
  string user_id = 3;
  repeated OrderItem items = 4;
  Money total = 5;
  string currency = 6;
  google.protobuf.Timestamp placed_at = 7;
  // v2 additions — backward compatible
  optional string promo_code = 8;
  optional DeliveryAddress delivery_address = 9;
}

message OrderItem {
  string product_id = 1;
  string product_name = 2;
  int32 quantity = 3;
  Money unit_price = 4;
}

message Money {
  int64 amount_cents = 1;
}

Schema Registry with Avro

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField

schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

order_schema = """
{
  "type": "record",
  "name": "OrderPlaced",
  "namespace": "com.myorg.events",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "order_id", "type": "string"},
    {"name": "user_id", "type": "string"},
    {"name": "total_cents", "type": "long"},
    {"name": "placed_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
  ]
}
"""

avro_serializer = AvroSerializer(
    schema_registry_client,
    order_schema,
)

producer_conf = {'bootstrap.servers': 'kafka:9092'}
producer = Producer(producer_conf)

def publish_order_placed(order: dict):
    producer.produce(
        topic='orders.placed',
        key=order['order_id'],
        value=avro_serializer(
            order,
            SerializationContext('orders.placed', MessageField.VALUE)
        ),
        on_delivery=delivery_report,
    )
    producer.flush()

Kafka Patterns for Production

1. Partitioning Strategy

Partitioning determines parallelism and ordering guarantees:

orders.placed topic — 24 partitions
├── Partition by user_id hash → all orders for a user are ordered
├── 24 partitions × 3 brokers → 72-fold parallelism
└── Consumer group with 24 consumers → each partition assigned to one consumer
def get_partition_key(event: dict) -> str:
    """
    Choose partition key based on ordering requirements:
    - User events: user_id (preserves per-user ordering)
    - Order events: order_id (preserves per-order ordering)
    - Payment events: payment_id (no specific requirement, random distribution)
    """
    if event.get('user_id') and requires_user_ordering(event['event_type']):
        return event['user_id']
    return event.get('order_id', event['event_id'])

2. Consumer Group Management

from confluent_kafka import Consumer, KafkaException
import json

consumer_conf = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'fraud-detection-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Manual commit for exactly-once semantics
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 30000,
}

consumer = Consumer(consumer_conf)
consumer.subscribe(['orders.placed', 'payments.initiated'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())

        try:
            event = json.loads(msg.value())
            process_event(event)
            consumer.commit(asynchronous=False)  # Commit after successful processing
        except ProcessingError as e:
            # Send to dead letter queue
            publish_to_dlq(msg, e)
            consumer.commit(asynchronous=False)  # Still commit to avoid infinite loop
finally:
    consumer.close()

3. Transactional Outbox Pattern

Never write to Kafka directly from within a database transaction — use the outbox pattern:

-- 1. Application writes to DB and outbox atomically
BEGIN;
INSERT INTO orders (id, user_id, total) VALUES ($1, $2, $3);
INSERT INTO outbox (id, topic, key, payload, created_at)
VALUES (gen_random_uuid(), 'orders.placed', $1, $4, NOW());
COMMIT;
# 2. Outbox relay (Debezium CDC or custom poller) publishes to Kafka
# Debezium automatically captures INSERT into outbox table → Kafka topic
# No polling needed; CDC via Postgres logical replication
# debezium-connector-config.json
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.dbname": "myapp",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.key": "key",
    "transforms.outbox.route.by.field": "topic",
    "transforms.outbox.table.expand.json.payload": true
  }
}

Core Concepts

Data Sources → Transformations → Sinks
     ↑              ↑
  Kafka         State Backends
  Files         (RocksDB, Memory)
  HTTP          Checkpointing

Flink’s superpower is managed state — you can accumulate, aggregate, and query state across millions of keys without managing external databases.

Most Flink work now starts with SQL:

-- Create Kafka source tables
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    total_cents BIGINT,
    placed_at TIMESTAMP(3),
    WATERMARK FOR placed_at AS placed_at - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders.placed',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-analytics',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081'
);

CREATE TABLE payments (
    payment_id STRING,
    order_id STRING,
    status STRING,
    completed_at TIMESTAMP(3),
    WATERMARK FOR completed_at AS completed_at - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'payments.completed',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081'
);

-- Sink table (ClickHouse for analytics)
CREATE TABLE order_metrics (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    total_orders BIGINT,
    total_revenue_cents BIGINT,
    avg_order_value DOUBLE
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:clickhouse://clickhouse:8123/analytics',
    'table-name' = 'order_metrics'
);

-- 1-minute tumbling window aggregation
INSERT INTO order_metrics
SELECT
    TUMBLE_START(placed_at, INTERVAL '1' MINUTE) AS window_start,
    TUMBLE_END(placed_at, INTERVAL '1' MINUTE) AS window_end,
    COUNT(*) AS total_orders,
    SUM(total_cents) AS total_revenue_cents,
    AVG(total_cents) AS avg_order_value
FROM orders
GROUP BY TUMBLE(placed_at, INTERVAL '1' MINUTE);

Some patterns require the full DataStream API. Here’s a real-time velocity check for fraud:

import org.apache.flink.api.common.state.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Enable checkpointing for fault tolerance (every 10s)
        env.enableCheckpointing(10_000);

        DataStream<Transaction> transactions = env
            .fromSource(KafkaSource.<Transaction>builder()
                .setBootstrapServers("kafka:9092")
                .setTopics("payments.initiated")
                .setGroupId("fraud-detection")
                .setValueOnlyDeserializer(new TransactionDeserializer())
                .build(),
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
                "Kafka Source"
            );

        DataStream<FraudAlert> alerts = transactions
            .keyBy(Transaction::getUserId)
            .process(new FraudDetector());

        alerts.sinkTo(KafkaSink.<FraudAlert>builder()
            .setBootstrapServers("kafka:9092")
            .setRecordSerializer(new FraudAlertSerializer("fraud.alerts"))
            .build());

        env.execute("Fraud Detection");
    }
}

class FraudDetector extends KeyedProcessFunction<String, Transaction, FraudAlert> {
    // State: transaction count in last 60 seconds
    private transient ValueState<Long> countState;
    // State: total amount in last 60 seconds
    private transient ValueState<Long> amountState;
    // State: timer cleanup handle
    private transient ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) {
        countState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("tx-count", Long.class));
        amountState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("tx-amount", Long.class));
        timerState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer", Long.class));
    }

    @Override
    public void processElement(Transaction tx, Context ctx, Collector<FraudAlert> out)
            throws Exception {

        long count = countState.value() == null ? 0L : countState.value();
        long amount = amountState.value() == null ? 0L : amountState.value();

        count++;
        amount += tx.getAmountCents();

        countState.update(count);
        amountState.update(amount);

        // Register cleanup timer 60s in the future (overwrite previous)
        long timer = ctx.timerService().currentProcessingTime() + 60_000L;
        if (timerState.value() != null) {
            ctx.timerService().deleteProcessingTimeTimer(timerState.value());
        }
        ctx.timerService().registerProcessingTimeTimer(timer);
        timerState.update(timer);

        // Velocity check: > 10 transactions OR > $1000 in 60 seconds
        if (count > 10 || amount > 100_000L) {
            out.collect(new FraudAlert(
                tx.getUserId(),
                tx.getPaymentId(),
                String.format("Velocity exceeded: %d txns, $%.2f in 60s",
                    count, amount / 100.0),
                FraudAlert.Severity.HIGH
            ));
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<FraudAlert> out)
            throws Exception {
        // Clear state after 60 seconds
        countState.clear();
        amountState.clear();
        timerState.clear();
    }
}

The Kappa Architecture Pattern

Modern event-driven systems increasingly use the Kappa Architecture (streaming-only, no batch):

Raw Events (Kafka, retained forever)
          ↓
   Stream Processor (Flink)
     ├── Real-time materialized views → Serving DB (Pinot, Redis)
     ├── Aggregated metrics → OLAP (ClickHouse)
     └── Derived events → Other Kafka topics

Benefits over Lambda Architecture (batch + stream):

  • Single codebase for historical and real-time processing
  • No divergence between batch and streaming results
  • Replay from Kafka log to reprocess historical data

Key Operational Considerations

Kafka Sizing (2026 Rules of Thumb)

FactorGuideline
Partitions per topicmax(target throughput / 10MB/s, num consumers)
Replication factor3 (never 2 — split-brain risk)
RetentionBased on replay needs; 7-30 days is common
Consumer lag alertAlert if lag > 60s of messages
# flink-conf.yaml
execution.checkpointing.interval: 30s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 3min
execution.checkpointing.max-concurrent-checkpoints: 1
state.backend: rocksdb
state.checkpoints.dir: s3://my-flink-checkpoints/
state.savepoints.dir: s3://my-flink-savepoints/

Conclusion

Event-driven architecture with Kafka and Flink gives you the building blocks for truly real-time systems. The patterns — outbox, schema registry, stateful stream processing, windowed aggregations — are well-understood and battle-tested in production.

The shift in 2026 is toward Flink SQL for most use cases, with the DataStream API reserved for complex stateful logic. Managed services (Confluent Cloud, Amazon MSK, Ververica Platform) have reduced operational burden significantly.

Start with a single event stream, design your events carefully, and build processing pipelines incrementally. Real-time is no longer a luxury — it’s a baseline expectation. ⚡

이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)