Architecture¶
Related Notes
messaging/kafka/index | messaging/kafka/operations | messaging/kafka/security | messaging/index
Overview¶
Apache Kafka is a distributed, partitioned, replicated commit-log service. The cluster is composed of three logical roles since Kafka 4.0 (KRaft-only): brokers that store partitioned logs and serve produce/fetch requests, controllers that form a Raft quorum to maintain cluster metadata, and clients (KafkaProducer, KafkaConsumer, AdminClient, Connect workers, Streams applications) that interact via a versioned binary TCP protocol. Topics are partitioned across brokers; each partition is an append-only log replicated to a configurable number of brokers, with one elected leader serving reads and writes and the others (followers) replicating asynchronously via Fetch requests but only acknowledged once they are caught up to the in-sync replica (ISR) high-watermark.
Component Architecture¶
graph TB
subgraph Clients["Client Layer"]
KP["KafkaProducer<br/>(idempotent / transactional)"]
KC["KafkaConsumer<br/>(group coordinator client)"]
ADM["AdminClient"]
STR["KafkaStreams runtime"]
CON["Kafka Connect Worker"]
end
subgraph Broker["KafkaServer (Broker)"]
SR["SocketServer<br/>(NIO acceptor + processors)"]
RH["KafkaApis<br/>(request router)"]
RM["ReplicaManager"]
LM["LogManager"]
GC["GroupCoordinator<br/>(consumer + share groups)"]
TC["TransactionCoordinator"]
QM["QuotaManager"]
RLM["RemoteLogManager<br/>(KIP-405)"]
KMP["KafkaMetadataPublisher<br/>(metadata cache)"]
end
subgraph Controller["KafkaController (KRaft)"]
QC["QuorumController"]
RAFT["KafkaRaftClient<br/>(__cluster_metadata)"]
SnapStore["MetadataSnapshotStore"]
end
subgraph Storage["On-Disk Storage"]
SEG["LogSegment files<br/>.log / .index / .timeindex / .txnindex"]
OffStore["__consumer_offsets<br/>(50 partitions, compacted)"]
TxnStore["__transaction_state<br/>(50 partitions, compacted)"]
end
subgraph Remote["Remote Tier"]
RSM["RemoteStorageManager<br/>(S3 / GCS / HDFS plugin)"]
RLMM["RemoteLogMetadataManager<br/>(__remote_log_metadata)"]
end
KP -->|Produce v9| SR
KC -->|Fetch v15 / Heartbeat| SR
ADM -->|CreateTopics, AlterConfigs| SR
STR -->|Streams DSL| KP
STR -->|Streams DSL| KC
CON --> KP
CON --> KC
SR --> RH
RH --> RM
RH --> GC
RH --> TC
RH --> QM
RM --> LM
LM --> SEG
GC --> OffStore
TC --> TxnStore
LM --> RLM
RLM --> RSM
RLM --> RLMM
Controller -->|UpdateMetadataRecord<br/>BrokerRegistration<br/>PartitionRecord| KMP
KMP --> RM
QC --> RAFT
RAFT --> SnapStore
style Controller fill:#1f3a5f,color:#fff
style Remote fill:#3a5a3a,color:#fff
style Broker fill:#4a3a3a,color:#fff
Core Components¶
| Component | Role |
|---|---|
| KafkaServer | Top-level broker process; bootstraps SocketServer, KafkaApis, ReplicaManager, LogManager, GroupCoordinator, TransactionCoordinator, RemoteLogManager. |
| KafkaController (QuorumController) | KRaft active controller; processes metadata mutations and replicates them through the Raft log. |
| SocketServer | NIO acceptor + processor threads handling TCP connections; passes parsed requests to a request channel. |
| KafkaApis | Request router — dispatches each ApiKey (Produce, Fetch, Metadata, OffsetCommit, etc.) to the appropriate subsystem. |
| ReplicaManager | Owns local replica state; appends to the LogManager on Produce, serves Fetch from leaders and from followers replicating from the leader. |
| LogManager | Manages partition logs on disk: segment rolling, retention, compaction scheduling, recovery on startup. |
| GroupCoordinator | Manages consumer-group state machine, member heartbeats, partition assignments, share-group state (KIP-932). |
| TransactionCoordinator | Manages producer transactional IDs, transaction markers (commit/abort), fencing of zombie producers. |
| RemoteLogManager (RLM) | Asynchronously copies cold local segments to the remote tier via the configured RemoteStorageManager plugin. |
| KafkaProducer | Client API: serializes records, batches by partition, compresses, applies idempotence sequence numbers, optionally enrolls in transactions. |
| KafkaConsumer | Client API: implements the group rebalance protocol (KIP-848 in 4.0+), commits offsets, supports read_committed isolation. |
| AdminClient | Cluster administration API: topic CRUD, ACL CRUD, dynamic config, group describe/reset, partition reassignment. |
| KafkaStreams | Embedded JVM library implementing KStream/KTable DSL on top of producer + consumer + RocksDB local state stores. |
| Kafka Connect | Distributed worker framework hosting Source/Sink connectors (Debezium, JDBC, S3, Iceberg, etc.). |
KRaft Consensus¶
Since Kafka 4.0, the cluster has no ZooKeeper dependency. A small set of nodes (typically 3 or 5) are designated as controllers by setting process.roles=controller (or controller,broker in combined mode for development). One controller is the active controller elected by Raft; the others are passive replicas.
sequenceDiagram
participant Admin as AdminClient
participant Active as Active KafkaController
participant Followers as Follower Controllers
participant MLog as __cluster_metadata log
participant Brokers as Broker Pool
Admin->>Active: CreateTopicsRequest("orders", parts=12, rf=3)
Active->>Active: Validate; allocate partition-to-broker map
Active->>MLog: Append TopicRecord + PartitionRecord(s)
Active->>Followers: Raft AppendEntries
Followers-->>Active: Ack (quorum reached)
Active->>MLog: Mark records committed
Active->>Brokers: Brokers fetch new metadata via Fetch on __cluster_metadata
Brokers->>Brokers: KafkaMetadataPublisher applies records
Brokers-->>Admin: CreateTopicsResponse(success)
Key properties of KRaft:
- Single source of truth: metadata changes are records in the internal
__cluster_metadatatopic, replicated by the same Raft protocol the cluster uses for everything else. - Snapshotting: periodic snapshots prevent the metadata log from growing unbounded.
- Faster failover: there is no ZK session timeout to wait through; brokers learn of leadership changes via metadata records they're already fetching.
- Fewer moving parts: a 3-node cluster needs only 3 processes in combined mode, vs. 3 brokers + 3 ZK in the legacy world.
controller.quorum.bootstrap.servers(KIP-995, KRaft v1) replaces the oldercontroller.quorum.votersand supports dynamic voter set changes.
A 3-controller quorum tolerates one controller failure; 5 controllers tolerate two. Controllers should run on stable, low-latency hardware — they are the cluster's brain.
Topic, Partition, and Log¶
A topic is a named ordered stream split into N partitions. Each partition is an append-only sequence of immutable records identified by 64-bit offsets. Records inside a partition have strict order; across partitions, order is not guaranteed.
graph LR
subgraph Topic["Topic: orders (RF=3, parts=4)"]
direction TB
subgraph P0["Partition 0"]
S0a["Segment 00000000.log"]
S0b["Segment 00012450.log (active)"]
end
subgraph P1["Partition 1"]
S1a["Segment 00000000.log"]
S1b["Segment 00018932.log (active)"]
end
P2["Partition 2"]
P3["Partition 3"]
end
P0 -->|Leader| KSa["KafkaServer A"]
P0 -->|Follower| KSb["KafkaServer B"]
P0 -->|Follower| KSc["KafkaServer C"]
P1 -->|Leader| KSb
P1 -->|Follower| KSa
P1 -->|Follower| KSc
Log-Structured Storage Internals¶
Each partition's log directory contains a series of segments:
| File | Purpose |
|---|---|
<base-offset>.log |
Append-only batch records (magic v2 record-batch format). |
<base-offset>.index |
Sparse offset → physical position index for the segment. |
<base-offset>.timeindex |
Sparse timestamp → offset index (for time-based seek). |
<base-offset>.txnindex |
Aborted transaction index (used by read_committed consumers). |
<base-offset>.snapshot |
Producer state snapshot (idempotence sequence numbers). |
leader-epoch-checkpoint |
Per-leader-epoch end offsets for truncation safety. |
The active segment receives writes; when it reaches segment.bytes (default 1 GiB) or segment.ms it is rolled and a new active segment is opened. Old segments become eligible for retention deletion or, with tiered storage, remote upload.
Record-Batch Format (Magic v2)¶
A record batch on disk includes (per Apache Kafka source):
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (= 2 in current Kafka)
crc: uint32
attributes: int16 (compression: 0=none, 1=gzip, 2=snappy, 3=lz4, 4=zstd; +flags)
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64 <-- idempotence
producerEpoch: int16 <-- idempotence
baseSequence: int32 <-- idempotence
recordsCount: int32
records: [Record] <-- compressed as a unit
Compression is applied to the entire records block, giving better ratios than per-record compression. This is also what makes the zero-copy sendfile() path so effective: the broker streams the on-disk compressed batch directly to the consumer socket without decompressing or copying through user space.
Log Compaction¶
In addition to time/size retention (cleanup.policy=delete), Kafka supports cleanup.policy=compact. The log cleaner thread periodically rewrites segments retaining only the most recent record per key, which makes compacted topics suitable for state-snapshot use cases (changelog topics for Streams state stores, __consumer_offsets, __transaction_state, K8s configmap-style topics).
Tunables:
log.cleaner.min.compaction.lag.ms— minimum age before a record is eligible for cleaning.log.cleaner.max.compaction.lag.ms— maximum delay before uncompacted head is forced to compact.min.cleanable.dirty.ratio— fraction of dirty bytes triggering a clean.
Tombstones (records with a null value) are retained for delete.retention.ms to ensure replicas observe the deletion before it is purged.
Replication and ISR¶
Each partition has a leader and N-1 followers (where N = replication.factor). Followers issue Fetch requests to the leader exactly like consumers, with one extra protocol affordance: when a follower's fetch offset reaches the leader's log-end-offset, the leader counts it as in-sync (ISR). A write becomes committed when all in-sync replicas have replicated it; only then can a read_committed consumer see it, and only then does the high-watermark advance.
sequenceDiagram
participant Producer as KafkaProducer
participant Leader as KafkaServer (Leader)
participant F1 as KafkaServer (Follower 1)
participant F2 as KafkaServer (Follower 2)
Producer->>Leader: ProduceRequest acks=all batch[B]
Leader->>Leader: ReplicaManager.appendToLocalLog(B)
par Replication
F1->>Leader: FetchRequest(offset=N)
Leader-->>F1: FetchResponse[B]
F1->>F1: append + advance LEO
and
F2->>Leader: FetchRequest(offset=N)
Leader-->>F2: FetchResponse[B]
F2->>F2: append + advance LEO
end
F1->>Leader: FetchRequest(offset=N+1) (ack)
F2->>Leader: FetchRequest(offset=N+1) (ack)
Leader->>Leader: HighWatermark advances to N+1
Leader-->>Producer: ProduceResponse(offset=N, ack)
| Setting | Purpose |
|---|---|
replication.factor |
Total replicas per partition (typically 3 in production). |
min.insync.replicas |
Minimum replicas that must acknowledge for acks=all to succeed (typically 2 for RF=3). |
acks=all |
Producer waits for all ISR; combined with idempotence gives the strongest delivery guarantees. |
unclean.leader.election.enable |
If true, a non-ISR replica may become leader (data loss risk). Default false. |
Eligible Leader Replicas (KIP-966, GA 4.0)¶
Pre-4.0, when the ISR shrank below min.insync.replicas, the high-watermark could not advance and the partition went read-only. KIP-966 introduced Eligible Leader Replicas (ELR) stored in the partition record. The KRaft controller now picks a leader in this priority: ISR (if non-empty) → ELR (if non-empty and not fenced) → last known leader (if unfenced). This restores availability for many ISR-shrinkage scenarios that previously required unclean.leader.election=true. ELR is enabled by default on new clusters from Kafka 4.1.
Consumer Group Protocol¶
Consumers join a group identified by group.id. The GroupCoordinator on a designated broker assigns partitions to members. Pre-KIP-848 ("classic" / "generic" protocol) used a stop-the-world rebalance triggered by JoinGroup → SyncGroup. KIP-848 (GA in 4.0) introduces the next-generation consumer rebalance protocol that pushes assignment computation to the broker and applies changes incrementally — no global synchronization barrier.
stateDiagram-v2
[*] --> Joining: subscribe(topics)
Joining --> Stable: Coordinator computes assignment & member ACKs
Stable --> Reassigning: Member added/removed/heartbeat lost
Reassigning --> Stable: Incremental partition revoke + assign
Stable --> Dead: close()
Dead --> [*]
Other group protocols delivered after KIP-848:
- Streams Rebalance Protocol (KIP-1071, early access in 4.1, GA in 4.2): broker-side task assignment for Streams applications.
- Share Groups (Queues for Kafka) (KIP-932, preview 4.1, GA 4.2): per-record acknowledgement enabling competing-consumer (queue) semantics rather than partition-exclusive ownership.
Exactly-Once Semantics¶
EOS in Kafka builds on two primitives:
- Idempotent Producer (default in 4.x). The broker assigns each producer a
producerId(PID) and tracks per-partition sequence numbers. Duplicate retries are deduplicated server-side; the broker rejects out-of-sequence batches withOutOfOrderSequenceException. - Transactions. A producer with
transactional.id=...callsinitTransactions(), then groups multiple sends and offset commits insidebeginTransaction()/commitTransaction(). Atomic commit/abort is implemented via transaction markers appended to each touched partition by the TransactionCoordinator. Consumers configured withisolation.level=read_committedskip aborted records using the per-segment.txnindex.
Read-process-write loops (the canonical Kafka Streams pattern) achieve end-to-end exactly-once by including the consumer's offset commit in the producer's transaction via producer.sendOffsetsToTransaction(...). See the official design doc on Exactly-Once Semantics for the full state machine.
sequenceDiagram
participant App as Kafka Streams Task
participant Prod as Transactional Producer
participant TC as TransactionCoordinator
participant Cons as Consumer
participant T1 as Topic A (input)
participant T2 as Topic B (output)
participant OS as __consumer_offsets
Cons->>T1: poll() -> records[K, V]
App->>Prod: beginTransaction()
Prod->>TC: AddPartitionsToTxn(topic-B-partitions, __consumer_offsets-partition)
Prod->>T2: send(transformed records)
Prod->>OS: sendOffsetsToTransaction(consumer offsets)
Prod->>TC: commitTransaction()
TC->>T2: write commit marker
TC->>OS: write commit marker
Note over Cons: read_committed consumers now see records & advanced offsets atomically
Kafka Streams, Connect, ksqlDB¶
| Layer | Built On | Purpose |
|---|---|---|
| Kafka Streams | Producer + Consumer + RocksDB local stores | Embedded JVM stream-processing library; KStream/KTable DSL, windows, joins, exactly-once via transactions. |
| Kafka Connect | Distributed worker framework | Hosts source connectors (e.g. Debezium MySQL/Postgres CDC) and sink connectors (S3, Iceberg, Snowflake, Elastic). Stores task state in 3 internal compacted topics: connect-offsets, connect-configs, connect-status. |
| ksqlDB | Streams + REST API | SQL-like declarative engine for continuous queries over Kafka topics. |
Tiered Storage (KIP-405)¶
Kafka 3.6 GA'd tiered storage. Brokers retain only "hot" segments locally; cold segments are uploaded to a remote object store via a pluggable RemoteStorageManager.
flowchart TB
subgraph LocalTier["Local Tier (broker disk)"]
Active["Active segment (writes)"]
Hot["Recent segments (page cache hits)"]
end
subgraph RemoteTier["Remote Tier (S3/GCS/HDFS)"]
Cold["Cold segments<br/>(uploaded by RemoteLogManager)"]
end
subgraph Metadata["Remote Log Metadata"]
RLMM["__remote_log_metadata<br/>(per-segment metadata)"]
end
Active -->|roll| Hot
Hot -->|local.retention.ms<br/>elapsed| RemoteLogManager
RemoteLogManager -->|put| Cold
RemoteLogManager -->|append metadata| RLMM
Consumer["Consumer historical fetch"] -->|Fetch (cold offset)| RemoteLogManager
RemoteLogManager -->|get| Cold
RemoteLogManager -->|stream to consumer| Consumer
Per-topic configuration:
remote.storage.enable=truelocal.retention.ms— how long to keep segments locally after upload (typically minutes to hours).retention.ms— total (local + remote) retention before deletion.segment.bytes— segment roll size (smaller = more upload granularity, more metadata overhead).
The reference S3 plugin (org.apache.kafka.server.log.remote.storage.S3RemoteStorageManager and similar Aiven/Confluent implementations) handles segment upload and historical fetch streaming.
Request Flow Examples¶
Produce Path¶
sequenceDiagram
participant App
participant KP as KafkaProducer
participant Net as Network thread
participant SS as SocketServer
participant KA as KafkaApis
participant RM as ReplicaManager
participant LM as LogManager
participant Disk as LogSegment
App->>KP: send(ProducerRecord)
KP->>KP: serialize, partition, batch by topic-partition
KP->>KP: assign idempotence sequence
Net->>SS: ProduceRequest v9
SS->>KA: dispatch
KA->>RM: appendRecords(timeout, acks=all, batch)
RM->>LM: append to leader log
LM->>Disk: append + fsync (per flush policy)
RM-->>KA: produce result (delayed if acks=all until ISR)
KA-->>Net: ProduceResponse(offset)
Net-->>KP: callback success
Fetch Path (with Zero-Copy)¶
sequenceDiagram
participant KC as KafkaConsumer
participant SS as SocketServer
participant KA as KafkaApis
participant RM as ReplicaManager
participant LM as LogManager
participant Kernel as Linux kernel sendfile()
KC->>SS: FetchRequest v15 (topic, partition, offset)
SS->>KA: dispatch
KA->>RM: fetchMessages(maxWait, minBytes)
RM->>LM: read(partition, offset, maxBytes)
LM-->>RM: FileRecords (mmap'd on-disk batch)
RM-->>KA: FetchResponse with FileRecords reference
KA->>Kernel: sendfile(socket_fd, file_fd, offset, len)
Note over Kernel: bytes go disk -> NIC without copy through userspace
Kernel-->>KC: response bytes
Benchmarks¶
The throughput numbers below come from public sources; reproduce in your own environment before sizing.
LinkedIn (2014) — original Kafka benchmark¶
- 3 brokers, commodity hardware: Intel Xeon, 6×7200 RPM SATA, 32 GiB RAM, 1 GbE.
- 2,024,032 msgs/sec with 100-byte messages on a single producer to a 6-partition / RF=3 topic.
- Sustained writes scale linearly with partitions until the disk subsystem saturates.
Confluent OpenMessaging Benchmark (2020) — Kafka vs Pulsar vs RabbitMQ¶
- Test rig: 3 ×
i3en.2xlargebrokers (8 vCPU, 64 GiB RAM, 2 × 2.5 TB NVMe, 25 Gbps). - Topic: 100 partitions, RF=3, snappy compression, 1 KiB messages.
- Peak stable throughput: ~605 MB/s (Kafka), with consumers keeping up.
- Lower p99 publish latency than Pulsar at the same throughput in this rig.
Source: Confluent — "Benchmarking RabbitMQ vs Kafka vs Pulsar Performance".
Dell EMC / Confluent Platform Characterization¶
- Larger configuration with 13 producers writing concurrently.
- 18,623,322 records/sec aggregate (1,776 MB/s), 83 ms average producer latency, 650M records over the test run.
- Best per-producer throughput observed at 9–20 partitions per topic.
Source: Dell EMC — Confluent Kafka Performance Characterization white paper.
Confluent Cloud (Kora) vs Apache Kafka¶
- Confluent claims Kora delivers >10× lower tail latency at 5.6 GB/s aggregate (1.4 GB/s ingress + 4.2 GB/s egress) vs vanilla Apache Kafka.
- This is a vendor benchmark of the cloud-native rewrite; reproduce on your workload before drawing conclusions.
Source: Confluent — "Apache Kafka vs Confluent Cloud Latency Benchmarking".
Practical Sizing Heuristics¶
| Cluster size | Brokers | Sustained ingress | Topic / partition budget |
|---|---|---|---|
| Dev | 1 (KRaft combined) | <50 MB/s | <100 partitions |
| Small prod | 3 brokers (RF=3) | 100–500 MB/s | a few thousand partitions |
| Medium prod | 6–12 brokers | 500 MB/s – 2 GB/s | 10k–50k partitions |
| Large prod | 30+ brokers, tiered storage | 2 GB/s+ | 100k+ partitions |
Benchmark Caveat
Throughput is heavily influenced by message size, compression codec (zstd often beats snappy on CPU-rich brokers), batch size, network latency between producer and broker, replication factor, and disk subsystem. Treat published numbers as upper bounds.