Real-Time Data Streaming in 2026: Apache Kafka + Flink Architecture Guide



Batch processing is dead. Well, not really—but real-time is eating its lunch. In 2026, users expect instant updates, fraud detection in milliseconds, and analytics that reflect reality, not yesterday’s snapshot.

Data visualization Photo by Luke Chesser on Unsplash

The Modern Streaming Stack

The dominant architecture in 2026:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Sources   │────►│    Kafka    │────►│    Flink    │
│ (apps, IoT) │     │  (storage)  │     │ (processing)│
└─────────────┘     └─────────────┘     └─────────────┘
                           │                    │
                           ▼                    ▼
                    ┌─────────────┐     ┌─────────────┐
                    │   Topics    │     │   Sinks     │
                    │ (retention) │     │ (DB, lake)  │
                    └─────────────┘     └─────────────┘
  • Kafka: Distributed event log (source of truth)
  • Flink: Stateful stream processing
  • Sinks: Databases, data lakes, other Kafka topics

Apache Kafka: The Event Backbone

Kafka is the central nervous system of modern data architecture. Every event flows through it.

Kafka Cluster Setup

# docker-compose.yml for local development
version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    volumes:
      - kafka-data:/var/lib/kafka/data

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
    depends_on:
      - kafka

volumes:
  kafka-data:

Producer Example (Python)

# producer.py
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json

# Schema for order events
order_schema = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "user_id", "type": "string"},
    {"name": "product_id", "type": "string"},
    {"name": "quantity", "type": "int"},
    {"name": "price", "type": "double"},
    {"name": "timestamp", "type": "long"}
  ]
}
"""

schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})
avro_serializer = AvroSerializer(schema_registry, order_schema)

producer = Producer({
    "bootstrap.servers": "localhost:9092",
    "client.id": "order-producer"
})

def produce_order(order: dict):
    producer.produce(
        topic="orders",
        key=order["order_id"],
        value=avro_serializer(order, SerializationContext("orders", "value")),
        callback=lambda err, msg: print(f"Delivered: {msg.key()}") if not err else print(f"Error: {err}")
    )
    producer.flush()

# Example usage
produce_order({
    "order_id": "ord-123",
    "user_id": "user-456",
    "product_id": "prod-789",
    "quantity": 2,
    "price": 49.99,
    "timestamp": 1707235200000
})

Consumer Example

# consumer.py
from confluent_kafka import Consumer
from confluent_kafka.schema_registry.avro import AvroDeserializer

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-processor",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False
})

consumer.subscribe(["orders"])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        
        order = avro_deserializer(msg.value())
        print(f"Processing order: {order['order_id']}")
        
        # Process the order...
        
        consumer.commit(msg)
finally:
    consumer.close()

Flink handles the complex logic—aggregations, joins, windowing—that simple consumers can’t.

-- Create source table from Kafka
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    product_id STRING,
    quantity INT,
    price DOUBLE,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://localhost:8081'
);

-- Real-time aggregation: orders per minute by product
CREATE TABLE order_stats (
    window_start TIMESTAMP,
    window_end TIMESTAMP,
    product_id STRING,
    order_count BIGINT,
    total_revenue DOUBLE,
    PRIMARY KEY (window_start, product_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'order-stats',
    'properties.bootstrap.servers' = 'localhost:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);

-- The streaming query
INSERT INTO order_stats
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end,
    product_id,
    COUNT(*) as order_count,
    SUM(price * quantity) as total_revenue
FROM orders
GROUP BY 
    TUMBLE(event_time, INTERVAL '1' MINUTE),
    product_id;

Data flow Photo by Carlos Muza on Unsplash

// FraudDetectionJob.java
public class FraudDetectionJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000); // 1 minute checkpoints
        
        // Source: Kafka
        KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("transactions")
            .setGroupId("fraud-detector")
            .setValueOnlyDeserializer(new TransactionDeserializer())
            .build();
        
        DataStream<Transaction> transactions = env.fromSource(
            source, 
            WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
            "Kafka Source"
        );
        
        // Fraud detection pattern: 3+ transactions > $1000 in 1 minute
        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .process(new FraudDetector());
        
        // Sink: Kafka alerts topic
        KafkaSink<Alert> sink = KafkaSink.<Alert>builder()
            .setBootstrapServers("localhost:9092")
            .setRecordSerializer(new AlertSerializer("fraud-alerts"))
            .build();
        
        alerts.sinkTo(sink);
        
        env.execute("Fraud Detection");
    }
}

class FraudDetector extends ProcessWindowFunction<Transaction, Alert, String, TimeWindow> {
    @Override
    public void process(String userId, Context ctx, Iterable<Transaction> txns, Collector<Alert> out) {
        long highValueCount = StreamSupport.stream(txns.spliterator(), false)
            .filter(t -> t.getAmount() > 1000)
            .count();
        
        if (highValueCount >= 3) {
            out.collect(new Alert(userId, "HIGH_FREQUENCY_LARGE_TRANSACTIONS", ctx.window().getEnd()));
        }
    }
}
# pyflink_job.py
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(env, settings)

# Define source
t_env.execute_sql("""
    CREATE TABLE page_views (
        user_id STRING,
        page_url STRING,
        view_time TIMESTAMP(3),
        WATERMARK FOR view_time AS view_time - INTERVAL '10' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'page-views',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json'
    )
""")

# Session window analysis: user sessions with 30-min gap
t_env.execute_sql("""
    CREATE TABLE user_sessions (
        user_id STRING,
        session_start TIMESTAMP(3),
        session_end TIMESTAMP(3),
        page_count BIGINT,
        PRIMARY KEY (user_id, session_start) NOT ENFORCED
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://localhost:5432/analytics',
        'table-name' = 'user_sessions'
    )
""")

# Session windowing
t_env.execute_sql("""
    INSERT INTO user_sessions
    SELECT
        user_id,
        SESSION_START(view_time, INTERVAL '30' MINUTE) as session_start,
        SESSION_END(view_time, INTERVAL '30' MINUTE) as session_end,
        COUNT(*) as page_count
    FROM page_views
    GROUP BY user_id, SESSION(view_time, INTERVAL '30' MINUTE)
""")

Production Architecture

High Availability Setup

# Kubernetes Flink deployment
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: fraud-detection
spec:
  image: flink:1.18
  flinkVersion: v1_18
  flinkConfiguration:
    state.backend: rocksdb
    state.checkpoints.dir: s3://my-bucket/checkpoints
    state.savepoints.dir: s3://my-bucket/savepoints
    high-availability: kubernetes
    high-availability.storageDir: s3://my-bucket/ha
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2g"
      cpu: 1
  taskManager:
    resource:
      memory: "4g"
      cpu: 2
    replicas: 3
  job:
    jarURI: s3://my-bucket/jobs/fraud-detection.jar
    parallelism: 6
    upgradeMode: savepoint

Monitoring with Prometheus

# Flink metrics configuration
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9999

# Key metrics to monitor:
# - flink_jobmanager_job_uptime
# - flink_taskmanager_job_task_numRecordsInPerSecond  
# - flink_taskmanager_job_task_numRecordsOutPerSecond
# - flink_taskmanager_Status_JVM_Memory_Heap_Used
# - flink_jobmanager_job_lastCheckpointDuration

Common Patterns

1. Exactly-Once Delivery

// Enable exactly-once with Kafka transactions
KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("my-app")
    .build();

2. Late Data Handling

-- Allow late data up to 1 hour
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    COUNT(*) as cnt
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)
-- Flink handles late arrivals via watermarks

3. Stream-Table Join

-- Enrich orders with product info
SELECT 
    o.order_id,
    o.quantity,
    p.name as product_name,
    p.price * o.quantity as total
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.event_time AS p
    ON o.product_id = p.id;

When to Use What

Use CaseSolution
Event log/replayKafka only
Simple transformationsKafka Streams
Complex aggregationsFlink SQL
ML inference in streamFlink + ONNX/TF Serving
Sub-second latencyFlink DataStream API
CDC from databasesDebezium → Kafka → Flink

Conclusion

Kafka + Flink is the battle-tested stack for real-time data. Kafka provides durable event storage; Flink provides powerful stateful processing. Together, they enable:

  • Sub-second fraud detection
  • Real-time personalization
  • Live dashboards and analytics
  • Event-driven microservices
  • Stream-to-lake pipelines

Start with Flink SQL for most use cases—it’s surprisingly powerful. Drop to the DataStream API when you need fine-grained control.

The future is streaming. Build for it.


Working on a streaming project? Share your architecture in the comments.

이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)