Skip to content

Architecture

Overview

Apache Kafka is a distributed, partitioned, replicated commit-log service. The cluster is composed of three logical roles since Kafka 4.0 (KRaft-only): brokers that store partitioned logs and serve produce/fetch requests, controllers that form a Raft quorum to maintain cluster metadata, and clients (KafkaProducer, KafkaConsumer, AdminClient, Connect workers, Streams applications) that interact via a versioned binary TCP protocol. Topics are partitioned across brokers; each partition is an append-only log replicated to a configurable number of brokers, with one elected leader serving reads and writes and the others (followers) replicating asynchronously via Fetch requests but only acknowledged once they are caught up to the in-sync replica (ISR) high-watermark.

Component Architecture

graph TB
    subgraph Clients["Client Layer"]
        KP["KafkaProducer<br/>(idempotent / transactional)"]
        KC["KafkaConsumer<br/>(group coordinator client)"]
        ADM["AdminClient"]
        STR["KafkaStreams runtime"]
        CON["Kafka Connect Worker"]
    end

    subgraph Broker["KafkaServer (Broker)"]
        SR["SocketServer<br/>(NIO acceptor + processors)"]
        RH["KafkaApis<br/>(request router)"]
        RM["ReplicaManager"]
        LM["LogManager"]
        GC["GroupCoordinator<br/>(consumer + share groups)"]
        TC["TransactionCoordinator"]
        QM["QuotaManager"]
        RLM["RemoteLogManager<br/>(KIP-405)"]
        KMP["KafkaMetadataPublisher<br/>(metadata cache)"]
    end

    subgraph Controller["KafkaController (KRaft)"]
        QC["QuorumController"]
        RAFT["KafkaRaftClient<br/>(__cluster_metadata)"]
        SnapStore["MetadataSnapshotStore"]
    end

    subgraph Storage["On-Disk Storage"]
        SEG["LogSegment files<br/>.log / .index / .timeindex / .txnindex"]
        OffStore["__consumer_offsets<br/>(50 partitions, compacted)"]
        TxnStore["__transaction_state<br/>(50 partitions, compacted)"]
    end

    subgraph Remote["Remote Tier"]
        RSM["RemoteStorageManager<br/>(S3 / GCS / HDFS plugin)"]
        RLMM["RemoteLogMetadataManager<br/>(__remote_log_metadata)"]
    end

    KP -->|Produce v9| SR
    KC -->|Fetch v15 / Heartbeat| SR
    ADM -->|CreateTopics, AlterConfigs| SR
    STR -->|Streams DSL| KP
    STR -->|Streams DSL| KC
    CON --> KP
    CON --> KC

    SR --> RH
    RH --> RM
    RH --> GC
    RH --> TC
    RH --> QM
    RM --> LM
    LM --> SEG
    GC --> OffStore
    TC --> TxnStore
    LM --> RLM
    RLM --> RSM
    RLM --> RLMM

    Controller -->|UpdateMetadataRecord<br/>BrokerRegistration<br/>PartitionRecord| KMP
    KMP --> RM
    QC --> RAFT
    RAFT --> SnapStore

    style Controller fill:#1f3a5f,color:#fff
    style Remote fill:#3a5a3a,color:#fff
    style Broker fill:#4a3a3a,color:#fff

Core Components

Component Role
KafkaServer Top-level broker process; bootstraps SocketServer, KafkaApis, ReplicaManager, LogManager, GroupCoordinator, TransactionCoordinator, RemoteLogManager.
KafkaController (QuorumController) KRaft active controller; processes metadata mutations and replicates them through the Raft log.
SocketServer NIO acceptor + processor threads handling TCP connections; passes parsed requests to a request channel.
KafkaApis Request router — dispatches each ApiKey (Produce, Fetch, Metadata, OffsetCommit, etc.) to the appropriate subsystem.
ReplicaManager Owns local replica state; appends to the LogManager on Produce, serves Fetch from leaders and from followers replicating from the leader.
LogManager Manages partition logs on disk: segment rolling, retention, compaction scheduling, recovery on startup.
GroupCoordinator Manages consumer-group state machine, member heartbeats, partition assignments, share-group state (KIP-932).
TransactionCoordinator Manages producer transactional IDs, transaction markers (commit/abort), fencing of zombie producers.
RemoteLogManager (RLM) Asynchronously copies cold local segments to the remote tier via the configured RemoteStorageManager plugin.
KafkaProducer Client API: serializes records, batches by partition, compresses, applies idempotence sequence numbers, optionally enrolls in transactions.
KafkaConsumer Client API: implements the group rebalance protocol (KIP-848 in 4.0+), commits offsets, supports read_committed isolation.
AdminClient Cluster administration API: topic CRUD, ACL CRUD, dynamic config, group describe/reset, partition reassignment.
KafkaStreams Embedded JVM library implementing KStream/KTable DSL on top of producer + consumer + RocksDB local state stores.
Kafka Connect Distributed worker framework hosting Source/Sink connectors (Debezium, JDBC, S3, Iceberg, etc.).

KRaft Consensus

Since Kafka 4.0, the cluster has no ZooKeeper dependency. A small set of nodes (typically 3 or 5) are designated as controllers by setting process.roles=controller (or controller,broker in combined mode for development). One controller is the active controller elected by Raft; the others are passive replicas.

sequenceDiagram
    participant Admin as AdminClient
    participant Active as Active KafkaController
    participant Followers as Follower Controllers
    participant MLog as __cluster_metadata log
    participant Brokers as Broker Pool

    Admin->>Active: CreateTopicsRequest("orders", parts=12, rf=3)
    Active->>Active: Validate; allocate partition-to-broker map
    Active->>MLog: Append TopicRecord + PartitionRecord(s)
    Active->>Followers: Raft AppendEntries
    Followers-->>Active: Ack (quorum reached)
    Active->>MLog: Mark records committed
    Active->>Brokers: Brokers fetch new metadata via Fetch on __cluster_metadata
    Brokers->>Brokers: KafkaMetadataPublisher applies records
    Brokers-->>Admin: CreateTopicsResponse(success)

Key properties of KRaft:

  • Single source of truth: metadata changes are records in the internal __cluster_metadata topic, replicated by the same Raft protocol the cluster uses for everything else.
  • Snapshotting: periodic snapshots prevent the metadata log from growing unbounded.
  • Faster failover: there is no ZK session timeout to wait through; brokers learn of leadership changes via metadata records they're already fetching.
  • Fewer moving parts: a 3-node cluster needs only 3 processes in combined mode, vs. 3 brokers + 3 ZK in the legacy world.
  • controller.quorum.bootstrap.servers (KIP-995, KRaft v1) replaces the older controller.quorum.voters and supports dynamic voter set changes.

A 3-controller quorum tolerates one controller failure; 5 controllers tolerate two. Controllers should run on stable, low-latency hardware — they are the cluster's brain.

Topic, Partition, and Log

A topic is a named ordered stream split into N partitions. Each partition is an append-only sequence of immutable records identified by 64-bit offsets. Records inside a partition have strict order; across partitions, order is not guaranteed.

graph LR
    subgraph Topic["Topic: orders (RF=3, parts=4)"]
        direction TB
        subgraph P0["Partition 0"]
            S0a["Segment 00000000.log"]
            S0b["Segment 00012450.log (active)"]
        end
        subgraph P1["Partition 1"]
            S1a["Segment 00000000.log"]
            S1b["Segment 00018932.log (active)"]
        end
        P2["Partition 2"]
        P3["Partition 3"]
    end
    P0 -->|Leader| KSa["KafkaServer A"]
    P0 -->|Follower| KSb["KafkaServer B"]
    P0 -->|Follower| KSc["KafkaServer C"]
    P1 -->|Leader| KSb
    P1 -->|Follower| KSa
    P1 -->|Follower| KSc

Log-Structured Storage Internals

Each partition's log directory contains a series of segments:

File Purpose
<base-offset>.log Append-only batch records (magic v2 record-batch format).
<base-offset>.index Sparse offset → physical position index for the segment.
<base-offset>.timeindex Sparse timestamp → offset index (for time-based seek).
<base-offset>.txnindex Aborted transaction index (used by read_committed consumers).
<base-offset>.snapshot Producer state snapshot (idempotence sequence numbers).
leader-epoch-checkpoint Per-leader-epoch end offsets for truncation safety.

The active segment receives writes; when it reaches segment.bytes (default 1 GiB) or segment.ms it is rolled and a new active segment is opened. Old segments become eligible for retention deletion or, with tiered storage, remote upload.

Record-Batch Format (Magic v2)

A record batch on disk includes (per Apache Kafka source):

baseOffset:        int64
batchLength:       int32
partitionLeaderEpoch: int32
magic:             int8 (= 2 in current Kafka)
crc:               uint32
attributes:        int16  (compression: 0=none, 1=gzip, 2=snappy, 3=lz4, 4=zstd; +flags)
lastOffsetDelta:   int32
baseTimestamp:     int64
maxTimestamp:      int64
producerId:        int64       <-- idempotence
producerEpoch:     int16       <-- idempotence
baseSequence:      int32       <-- idempotence
recordsCount:      int32
records:           [Record]    <-- compressed as a unit

Compression is applied to the entire records block, giving better ratios than per-record compression. This is also what makes the zero-copy sendfile() path so effective: the broker streams the on-disk compressed batch directly to the consumer socket without decompressing or copying through user space.

Log Compaction

In addition to time/size retention (cleanup.policy=delete), Kafka supports cleanup.policy=compact. The log cleaner thread periodically rewrites segments retaining only the most recent record per key, which makes compacted topics suitable for state-snapshot use cases (changelog topics for Streams state stores, __consumer_offsets, __transaction_state, K8s configmap-style topics).

Tunables:

  • log.cleaner.min.compaction.lag.ms — minimum age before a record is eligible for cleaning.
  • log.cleaner.max.compaction.lag.ms — maximum delay before uncompacted head is forced to compact.
  • min.cleanable.dirty.ratio — fraction of dirty bytes triggering a clean.

Tombstones (records with a null value) are retained for delete.retention.ms to ensure replicas observe the deletion before it is purged.

Replication and ISR

Each partition has a leader and N-1 followers (where N = replication.factor). Followers issue Fetch requests to the leader exactly like consumers, with one extra protocol affordance: when a follower's fetch offset reaches the leader's log-end-offset, the leader counts it as in-sync (ISR). A write becomes committed when all in-sync replicas have replicated it; only then can a read_committed consumer see it, and only then does the high-watermark advance.

sequenceDiagram
    participant Producer as KafkaProducer
    participant Leader as KafkaServer (Leader)
    participant F1 as KafkaServer (Follower 1)
    participant F2 as KafkaServer (Follower 2)

    Producer->>Leader: ProduceRequest acks=all batch[B]
    Leader->>Leader: ReplicaManager.appendToLocalLog(B)
    par Replication
        F1->>Leader: FetchRequest(offset=N)
        Leader-->>F1: FetchResponse[B]
        F1->>F1: append + advance LEO
    and
        F2->>Leader: FetchRequest(offset=N)
        Leader-->>F2: FetchResponse[B]
        F2->>F2: append + advance LEO
    end
    F1->>Leader: FetchRequest(offset=N+1) (ack)
    F2->>Leader: FetchRequest(offset=N+1) (ack)
    Leader->>Leader: HighWatermark advances to N+1
    Leader-->>Producer: ProduceResponse(offset=N, ack)
Setting Purpose
replication.factor Total replicas per partition (typically 3 in production).
min.insync.replicas Minimum replicas that must acknowledge for acks=all to succeed (typically 2 for RF=3).
acks=all Producer waits for all ISR; combined with idempotence gives the strongest delivery guarantees.
unclean.leader.election.enable If true, a non-ISR replica may become leader (data loss risk). Default false.

Eligible Leader Replicas (KIP-966, GA 4.0)

Pre-4.0, when the ISR shrank below min.insync.replicas, the high-watermark could not advance and the partition went read-only. KIP-966 introduced Eligible Leader Replicas (ELR) stored in the partition record. The KRaft controller now picks a leader in this priority: ISR (if non-empty) → ELR (if non-empty and not fenced) → last known leader (if unfenced). This restores availability for many ISR-shrinkage scenarios that previously required unclean.leader.election=true. ELR is enabled by default on new clusters from Kafka 4.1.

Consumer Group Protocol

Consumers join a group identified by group.id. The GroupCoordinator on a designated broker assigns partitions to members. Pre-KIP-848 ("classic" / "generic" protocol) used a stop-the-world rebalance triggered by JoinGroup → SyncGroup. KIP-848 (GA in 4.0) introduces the next-generation consumer rebalance protocol that pushes assignment computation to the broker and applies changes incrementally — no global synchronization barrier.

stateDiagram-v2
    [*] --> Joining: subscribe(topics)
    Joining --> Stable: Coordinator computes assignment & member ACKs
    Stable --> Reassigning: Member added/removed/heartbeat lost
    Reassigning --> Stable: Incremental partition revoke + assign
    Stable --> Dead: close()
    Dead --> [*]

Other group protocols delivered after KIP-848:

  • Streams Rebalance Protocol (KIP-1071, early access in 4.1, GA in 4.2): broker-side task assignment for Streams applications.
  • Share Groups (Queues for Kafka) (KIP-932, preview 4.1, GA 4.2): per-record acknowledgement enabling competing-consumer (queue) semantics rather than partition-exclusive ownership.

Exactly-Once Semantics

EOS in Kafka builds on two primitives:

  1. Idempotent Producer (default in 4.x). The broker assigns each producer a producerId (PID) and tracks per-partition sequence numbers. Duplicate retries are deduplicated server-side; the broker rejects out-of-sequence batches with OutOfOrderSequenceException.
  2. Transactions. A producer with transactional.id=... calls initTransactions(), then groups multiple sends and offset commits inside beginTransaction()/commitTransaction(). Atomic commit/abort is implemented via transaction markers appended to each touched partition by the TransactionCoordinator. Consumers configured with isolation.level=read_committed skip aborted records using the per-segment .txnindex.

Read-process-write loops (the canonical Kafka Streams pattern) achieve end-to-end exactly-once by including the consumer's offset commit in the producer's transaction via producer.sendOffsetsToTransaction(...). See the official design doc on Exactly-Once Semantics for the full state machine.

sequenceDiagram
    participant App as Kafka Streams Task
    participant Prod as Transactional Producer
    participant TC as TransactionCoordinator
    participant Cons as Consumer
    participant T1 as Topic A (input)
    participant T2 as Topic B (output)
    participant OS as __consumer_offsets

    Cons->>T1: poll() -> records[K, V]
    App->>Prod: beginTransaction()
    Prod->>TC: AddPartitionsToTxn(topic-B-partitions, __consumer_offsets-partition)
    Prod->>T2: send(transformed records)
    Prod->>OS: sendOffsetsToTransaction(consumer offsets)
    Prod->>TC: commitTransaction()
    TC->>T2: write commit marker
    TC->>OS: write commit marker
    Note over Cons: read_committed consumers now see records & advanced offsets atomically

Kafka Streams, Connect, ksqlDB

Layer Built On Purpose
Kafka Streams Producer + Consumer + RocksDB local stores Embedded JVM stream-processing library; KStream/KTable DSL, windows, joins, exactly-once via transactions.
Kafka Connect Distributed worker framework Hosts source connectors (e.g. Debezium MySQL/Postgres CDC) and sink connectors (S3, Iceberg, Snowflake, Elastic). Stores task state in 3 internal compacted topics: connect-offsets, connect-configs, connect-status.
ksqlDB Streams + REST API SQL-like declarative engine for continuous queries over Kafka topics.

Tiered Storage (KIP-405)

Kafka 3.6 GA'd tiered storage. Brokers retain only "hot" segments locally; cold segments are uploaded to a remote object store via a pluggable RemoteStorageManager.

flowchart TB
    subgraph LocalTier["Local Tier (broker disk)"]
        Active["Active segment (writes)"]
        Hot["Recent segments (page cache hits)"]
    end
    subgraph RemoteTier["Remote Tier (S3/GCS/HDFS)"]
        Cold["Cold segments<br/>(uploaded by RemoteLogManager)"]
    end
    subgraph Metadata["Remote Log Metadata"]
        RLMM["__remote_log_metadata<br/>(per-segment metadata)"]
    end

    Active -->|roll| Hot
    Hot -->|local.retention.ms<br/>elapsed| RemoteLogManager
    RemoteLogManager -->|put| Cold
    RemoteLogManager -->|append metadata| RLMM
    Consumer["Consumer historical fetch"] -->|Fetch (cold offset)| RemoteLogManager
    RemoteLogManager -->|get| Cold
    RemoteLogManager -->|stream to consumer| Consumer

Per-topic configuration:

  • remote.storage.enable=true
  • local.retention.ms — how long to keep segments locally after upload (typically minutes to hours).
  • retention.ms — total (local + remote) retention before deletion.
  • segment.bytes — segment roll size (smaller = more upload granularity, more metadata overhead).

The reference S3 plugin (org.apache.kafka.server.log.remote.storage.S3RemoteStorageManager and similar Aiven/Confluent implementations) handles segment upload and historical fetch streaming.

Request Flow Examples

Produce Path

sequenceDiagram
    participant App
    participant KP as KafkaProducer
    participant Net as Network thread
    participant SS as SocketServer
    participant KA as KafkaApis
    participant RM as ReplicaManager
    participant LM as LogManager
    participant Disk as LogSegment

    App->>KP: send(ProducerRecord)
    KP->>KP: serialize, partition, batch by topic-partition
    KP->>KP: assign idempotence sequence
    Net->>SS: ProduceRequest v9
    SS->>KA: dispatch
    KA->>RM: appendRecords(timeout, acks=all, batch)
    RM->>LM: append to leader log
    LM->>Disk: append + fsync (per flush policy)
    RM-->>KA: produce result (delayed if acks=all until ISR)
    KA-->>Net: ProduceResponse(offset)
    Net-->>KP: callback success

Fetch Path (with Zero-Copy)

sequenceDiagram
    participant KC as KafkaConsumer
    participant SS as SocketServer
    participant KA as KafkaApis
    participant RM as ReplicaManager
    participant LM as LogManager
    participant Kernel as Linux kernel sendfile()

    KC->>SS: FetchRequest v15 (topic, partition, offset)
    SS->>KA: dispatch
    KA->>RM: fetchMessages(maxWait, minBytes)
    RM->>LM: read(partition, offset, maxBytes)
    LM-->>RM: FileRecords (mmap'd on-disk batch)
    RM-->>KA: FetchResponse with FileRecords reference
    KA->>Kernel: sendfile(socket_fd, file_fd, offset, len)
    Note over Kernel: bytes go disk -> NIC without copy through userspace
    Kernel-->>KC: response bytes

Benchmarks

The throughput numbers below come from public sources; reproduce in your own environment before sizing.

LinkedIn (2014) — original Kafka benchmark

  • 3 brokers, commodity hardware: Intel Xeon, 6×7200 RPM SATA, 32 GiB RAM, 1 GbE.
  • 2,024,032 msgs/sec with 100-byte messages on a single producer to a 6-partition / RF=3 topic.
  • Sustained writes scale linearly with partitions until the disk subsystem saturates.

Source: LinkedIn Engineering — "Benchmarking Apache Kafka: 2 Million Writes Per Second on Three Cheap Machines".

Confluent OpenMessaging Benchmark (2020) — Kafka vs Pulsar vs RabbitMQ

  • Test rig: 3 × i3en.2xlarge brokers (8 vCPU, 64 GiB RAM, 2 × 2.5 TB NVMe, 25 Gbps).
  • Topic: 100 partitions, RF=3, snappy compression, 1 KiB messages.
  • Peak stable throughput: ~605 MB/s (Kafka), with consumers keeping up.
  • Lower p99 publish latency than Pulsar at the same throughput in this rig.

Source: Confluent — "Benchmarking RabbitMQ vs Kafka vs Pulsar Performance".

Dell EMC / Confluent Platform Characterization

  • Larger configuration with 13 producers writing concurrently.
  • 18,623,322 records/sec aggregate (1,776 MB/s), 83 ms average producer latency, 650M records over the test run.
  • Best per-producer throughput observed at 9–20 partitions per topic.

Source: Dell EMC — Confluent Kafka Performance Characterization white paper.

Confluent Cloud (Kora) vs Apache Kafka

  • Confluent claims Kora delivers >10× lower tail latency at 5.6 GB/s aggregate (1.4 GB/s ingress + 4.2 GB/s egress) vs vanilla Apache Kafka.
  • This is a vendor benchmark of the cloud-native rewrite; reproduce on your workload before drawing conclusions.

Source: Confluent — "Apache Kafka vs Confluent Cloud Latency Benchmarking".

Practical Sizing Heuristics

Cluster size Brokers Sustained ingress Topic / partition budget
Dev 1 (KRaft combined) <50 MB/s <100 partitions
Small prod 3 brokers (RF=3) 100–500 MB/s a few thousand partitions
Medium prod 6–12 brokers 500 MB/s – 2 GB/s 10k–50k partitions
Large prod 30+ brokers, tiered storage 2 GB/s+ 100k+ partitions

Benchmark Caveat

Throughput is heavily influenced by message size, compression codec (zstd often beats snappy on CPU-rich brokers), batch size, network latency between producer and broker, replication factor, and disk subsystem. Treat published numbers as upper bounds.

Sources