Designing and testing a highly available Kafka cluster on Kubernetes
April 2022
TL;DR: In this article, you'll look at Kafka's architecture and how it supports high availability with replicated partitions. Then, you will design a Kafka cluster to achieve high availability using standard Kubernetes resources and see how it tolerates node maintenance and total node failure.
In its simplest form, the architecture of Kafka consists of a single Broker server and its Producers and Consumers as clients.
- 1/2
Producers create records and publish them to the Kafka broker.
- 2/2
A consumer consumes records from the broker.
Although this Kafka cluster can support typical Kafka use cases, it is too simplistic for most practical cases.
Kafka is typically run as a cluster of three or more brokers that can span multiple data centers or cloud regions.
This cluster architecture supports the need for scalability, consistency, availability, partition tolerance and performance.
Like any engineering endeavour, there are trade-offs to be made between these qualities.
In this article, your learning goals are to explore the availability of Kafka on Kubernetes.
In particular, we will design a Kafka cluster that:
- Prefers availability over consistency, which is a trade-off you may want to make for a use case such as real-time metrics collection, where, in case of failure, availability to write new data is more important than losing some historical data points.
- Chooses simplicity over other non-functional requirements (e.g. security, performance, efficiency, etc.) to focus on learning Kafka and Kubernetes.
- Assumes that maintenance and unplanned disruptions are more likely than infrastructure failure.
With those goals in mind, let's first discuss a typical highly available Kafka cluster — without Kubernetes.
Table of content
- Kafka partitions and replication-factor
- Understanding broker outages
- Requirements to mitigate common failures
- Deploying a 3-node Kafka cluster on Kubernetes
- The Kafka StatefulSet
- Combining a StatefulSet with a Headless Service
- Producing an event
- Consume the events on the "test" topic
- Surviving a node down for maintenance: drain the node hosting the leader
- Do producers and consumers still work?
- A Kafka pod is Pending
- Pod Topology Constraints help you spread the pods across failure domains
- Should you use Pod Topology Constraints or Node Affinity?
- Return to full strength
- Surviving multiple nodes down for maintenance
- Pod Disruption Budget
- Breaking badly: the node ain't coming back!
- Kafka-2 is dead. Long live its successor, kafka-2.
- Is the replacement broker in sync?
- Summary
Kafka partitions and replication-factor
In Kafka, messages are categorized into topics, and each topic has a name that is unique across the entire cluster.
For example, if you build a chat app, you might have a topic for each room (e.g. "dave-tom-chat").
But what happens when the number of messages outgrows the size of the broker?
Topics are broken down into partitions, each of which can live on a separate node in the Kafka cluster.
In other words, all messages from a single topic could be stored in different brokers, but all the messages from a single partition can only be found on the same node.
- 1/3
If a topic contains all messages, how does it work when there is no space on the device?
- 2/3
Kafka uses partitions to distribute records to multiple brokers.
- 3/3
Each topic can have a different number of partitions. All the records from a single partition are always stored together on the node.
This design choice enables parallelization of topics, scalability and high message throughput.
But there's more.
Topics are configured with a replication factor, which determines the number of copies for each partition.
If a cluster has a single topic with one partition, a replication factor of three means that there are three partitions: one copy for each partition.
All replicas of a partition exist on separate brokers, so you cannot have more partition copies than nodes in the cluster.
In the previous example, with a replication factor of three, you should expect at least three nodes in your Kafka cluster.
But how does Kafka keep those copies in sync?
Partitions are organized into leaders and followers, where the partition leader handles all writes and reads, and followers are purely for failover.
A follower can either be in-sync with the leader (containing all the partition leader's messages, except for messages within a small buffer window) or out of sync.
The set of all in-sync replicas is referred to as the ISR (in-sync replicas).
Those are the basics of Kafka and replication; let's see what happens when it breaks.
Understanding broker outages
Let's imagine the Kafka cluster has three brokers and a replication factor of 1.
There's a single topic in the cluster with a single partition.
When the broker becomes unavailable, the partition is unavailable too, and the cluster can't serve consumers or producers.
Let's change this by setting the replication factor to 3.
In this scenario, each broker has a copy of a partition.
What happens when a broker is made unavailable?
If the partition has additional in-sync replicas, one of those will become the interim partition leader.
The cluster can operate as usual, and there's no downtime for consumers or producers.
- 1/2
A Kafka cluster with all partitions in sync loses a broker.
- 2/2
One of the two partitions will be promoted as the leader, and the cluster will keep operating as usual.
What about when there are partition copies, but they are not in sync?
In this case, there are two options:
- Either we choose to wait for the partition leader to come back online–sacrificing availability or
- Allow an out-of-sync replica to become the interim partition leader–sacrificing consistency.
- 1/3
A Kafka cluster with partitions not in sync loses a broker.
- 2/3
The cluster can promote one of the out of sync replicas to be the leader. However, you might miss some records.
- 3/3
Alternatively, you can wait for the broker to return and thus compromise your availability to dispatch events.
Now that we've discussed a few failure scenarios let's see how you could mitigate them.
Requirements to mitigate common failures
You probably noticed that a partition should have an extra in-sync replica (ISR) available to survive the loss of the partition leader.
So a naive cluster size could have two brokers with a minimum in-sync replica size of 2.
However, that's not enough.
If you only have two replicas and then lose a broker, the in-sync replica size decreases to 1 and neither the producer nor consumer can work (i.e. minimum in-sync replica is 2).
Therefore, the number of brokers should be greater than the minimum in-sync replica size (i.e. at least 3).
- 1/4
You could set up a Kafka cluster with only two brokers and a minimum in-sync replica size of 2.
- 2/4
However, when a broker is lost, the cluster becomes unavailable because a single replica is in sync.
- 3/4
You should provision a Kafka cluster that has one broker more than the size of the in-sync replica.
- 4/4
In this case, the Kafka cluster can still carry on if one broker is lost.
But where should you place those broker nodes?
Considering that you will have to host the Kafka cluster, it's good to spread brokers among failure-domains such as regions, zones, nodes, etc.
So, if you wish to design a Kafka cluster that can tolerate one planned and one unplanned failure, you should consider the following requirements:
- A minimum in-sync replicas of 2.
- A replication factor of 3 for topics.
- At least 3 Kafka brokers, each running on different nodes.
- Nodes spread across three availability zones.
In the remaining part of the article, you will build and break a Kafka cluster on Kubernetes to validate those assumptions.
Deploying a 3-node Kafka cluster on Kubernetes
Let's create a three-node cluster that spans three availability zones with:
bash
k3d cluster create kube-cluster \
--agents 3 \
--k3s-node-label topology.kubernetes.io/zone=zone-a@agent:0 \
--k3s-node-label topology.kubernetes.io/zone=zone-b@agent:1 \
--k3s-node-label topology.kubernetes.io/zone=zone-c@agent:2
INFO[0000] Created network 'k3d-kube-cluster'
INFO[0000] Created image volume k3d-kube-cluster-images
INFO[0000] Starting new tools node...
INFO[0001] Creating node 'k3d-kube-cluster-server-0'
INFO[0003] Starting Node 'k3d-kube-cluster-tools'
INFO[0012] Creating node 'k3d-kube-cluster-agent-0'
INFO[0012] Creating node 'k3d-kube-cluster-agent-1'
INFO[0012] Creating node 'k3d-kube-cluster-agent-2'
INFO[0012] Creating LoadBalancer 'k3d-kube-cluster-serverlb'
INFO[0017] Starting new tools node...
INFO[0017] Starting Node 'k3d-kube-cluster-tools'
INFO[0018] Starting cluster 'kube-cluster'
INFO[0018] Starting servers...
INFO[0018] Starting Node 'k3d-kube-cluster-server-0'
INFO[0022] Starting agents...
INFO[0022] Starting Node 'k3d-kube-cluster-agent-1'
INFO[0022] Starting Node 'k3d-kube-cluster-agent-0'
INFO[0022] Starting Node 'k3d-kube-cluster-agent-2'
INFO[0032] Starting helpers...
INFO[0032] Starting Node 'k3d-kube-cluster-serverlb'
INFO[0041] Cluster 'kube-cluster' created successfully!
You can verify that the cluster is ready with:
bash
kubectl get nodes
NAME STATUS ROLES VERSION
k3d-kube-cluster-server-0 Ready control-plane,master v1.22.7+k3s1
k3d-kube-cluster-agent-1 Ready <none> v1.22.7+k3s1
k3d-kube-cluster-agent-0 Ready <none> v1.22.7+k3s1
k3d-kube-cluster-agent-2 Ready <none> v1.22.7+k3s1
Next, let's deploy a Kafka cluster as a Kubernetes StatefulSet.
Here's a YAML manifest, kafka.yaml
, defining the resources required to create a simple Kafka cluster:
kafka.yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
spec:
clusterIP: None
ports:
- name: '9092'
port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
containers:
- name: kafka-container
image: doughgle/kafka-kraft
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: REPLICAS
value: '3'
- name: SERVICE
value: kafka-svc
- name: NAMESPACE
value: default
- name: SHARE_DIR
value: /mnt/kafka
- name: CLUSTER_ID
value: oh-sxaDRTcyAr6pFRbXyzA
- name: DEFAULT_REPLICATION_FACTOR
value: '3'
- name: DEFAULT_MIN_INSYNC_REPLICAS
value: '2'
volumeMounts:
- name: data
mountPath: /mnt/kafka
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: "1Gi"
You can apply all the resources in this YAML file with:
bash
kubectl apply -f kafka.yaml
service/kafka-svc created
statefulset.apps/kafka created
Inspect the resources created with:
bash
kubectl get -f kafka.yaml
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
service/kafka-svc ClusterIP None <none> 9092/TCP
NAME READY
statefulset.apps/kafka 3/3
There is a StatefulSet with three ready Kafka broker pods and a service.
There are also three independent PersistentVolumeClaims for storing Kafka data, one for each broker:
bash
kubectl get pvc,pv
NAME STATUS VOLUME CAPACITY ACCESS MODES
persistentvolumeclaim/data-kafka-0 Bound pvc-eec953ae 1Gi RWO
persistentvolumeclaim/data-kafka-1 Bound pvc-5544a431 1Gi RWO
persistentvolumeclaim/data-kafka-2 Bound pvc-11a64b48 1Gi RWO
What are all of those resources?
Let's examine some of the highlights of the configuration in the kafka.yaml
manifest.
There are two resources defined:
- A StatefulSet.
- A Headless service.
The Kafka StatefulSet
A StatefulSet is an object designed to create pod replicas — just like a Deployment.
But unlike a Deployment, a StatefulSet provides guarantees about the ordering and uniqueness of these Pods.
Each Pod in a StatefulSet derives its hostname from the name of the StatefulSet and the ordinal of the Pod.
The pattern is $(statefulset name)-$(ordinal)
.
In your case, the name of the StatefulSets is kafka
, so you should expect three pods with kafka-0
, kafka-1
, kafka-2
.
Let's verify that with:
bash
kubectl get pods
NAME READY STATUS RESTARTS
kafka-0 1/1 Running 0
kafka-1 1/1 Running 0
kafka-2 1/1 Running 0
What happens when you delete kafka-0
?
Does Kubernetes spawn kafka-3
?
Let's test it with:
bash
kubectl delete pod kafka-0
pod "kafka-0" deleted
List the running pods with:
bash
kubectl get pods
NAME READY STATUS RESTARTS
kafka-1 1/1 Running 0
kafka-2 1/1 Running 0
kafka-0 1/1 Running 0
Kubernetes recreated the Pod with the same name!
Let's inspect the rest of the StatefulSet YAML definition.
yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
containers:
- name: kafka-container
image: doughgle/kafka-kraft
ports:
- containerPort: 9092
# truncated output
The StatefulSet defines three replicas so that three pods will be created from the pod spec template.
There's a container image that, when it starts, it:
- Configures the broker's
server.properties
with its unique broker id, internal and external listeners, and quorum voters list. - Formats the log directory.
- Starts the Kafka Java process.
If you are interested in the details of those actions, you can find the script in this repository.
The container image exposes two ports:
9092
for client communication. That is necessary for producers and consumers to connect.9093
for internal, inter-broker communication.
In the next part of the YAML, there is a long list of environment variables:
kafka.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
containers:
- name: kafka-container
image: doughgle/kafka-kraft
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: REPLICAS
value: '3'
- name: SERVICE
value: kafka-svc
- name: NAMESPACE
value: default
- name: SHARE_DIR
value: /mnt/kafka
- name: CLUSTER_ID
value: oh-sxaDRTcyAr6pFRbXyzA
- name: DEFAULT_REPLICATION_FACTOR
value: '3'
- name: DEFAULT_MIN_INSYNC_REPLICAS
value: '2'
volumeMounts:
- name: data
mountPath: /mnt/kafka
volumeClaimTemplates:
# truncated output
Those are used in the entry point script to derive values for broker settings in server.properties
:
REPLICAS
- used as an iterator boundary to set thecontroller.quorum.voters
property to a list of brokers.SERVICE
andNAMESPACE
- used to derive the CoreDNS name for each broker in the cluster for settingcontroller.quorum.voters
,listeners
andadvertised.listeners
.SHARE_DIR
- used to setlog.dirs
; The directories in which the Kafka data is stored.CLUSTER_ID
is the unique identifier for the Kafka cluster.DEFAULT_REPLICATION_FACTOR
is the cluster-wide default replication factor.DEFAULT_MIN_INSYNC_REPLICAS
is the cluster-wise default in-sync replicas size.
In the rest of the YAML, there's the definition for a PersitentVolumeClaim template and the volumeMounts
:
kafka.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
containers:
- name: kafka-container
image: doughgle/kafka-kraft
ports:
- containerPort: 9092
- containerPort: 9093
env:
# truncated output
volumeMounts:
- name: data
mountPath: /mnt/kafka
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: "1Gi"
For each pod, the StatefulSet creates a PersistentVolumeClaim using the details in the volumeClaimTemplates
.
In this case, it creates a PersistentVolumeClaim with:
ReadWriteOnce
access mode to enforce the constraint that the volume should only belong to one node at a time.1Gi
of storage.
The PersistentVolumeClaim is then bound to the underlying storage via a PersistentVolume.
The claim is mounted as a volume in the container at /mnt/kafka
.
This is where the Kafka broker stores data in files organised by topic and partition.
It's important to notice that the StatefulSet guarantees that a given Pod will always map to the same storage identity.
If the pod kafka-0
is deleted, Kubernetes will recreate one with the same name and mount the same PersistentVolumeClaim and PersistentVolume.
Keep this in mind as it will become useful later.
Combining a StatefulSet with a Headless Service
At the beginning of the YAML definition for your Kafka cluster, there is a Service definition:
kafka.yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
spec:
clusterIP: None
ports:
- name: '9092'
port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka-app
A Service with clusterIP: None
is usually called a Headless Service.
But Kubernetes has four types of services:
- ClusterIP.
- NodePort.
- LoadBalancer.
- External.
So, what's a Headless Service?
A Headless Service is a variation of the ClusterIP service with no IP address.
So, how do you use it?
A headless service is helpful in combination with CoreDNS.
When you issue a DNS query to a standard ClusterIP service, you receive a single IP address:
bash
dig standard-cluster-ip.default.svc.cluster.local
;; QUESTION SECTION:
;standard-cluster-ip.default.svc.cluster.local. IN A
;; ANSWER SECTION:
standard-cluster-ip.default.svc.cluster.local. 30 IN A 10.100.0.1
However, when you query a Headless service, the DNS replies with all of the individual IP addresses of the Pods (in this case, the service has two pods):
bash
dig headless.default.svc.cluster.local
;; QUESTION SECTION:
;headless.default.svc.cluster.local. IN A
;; ANSWER SECTION:
headless.default.svc.cluster.local. 13 IN
A 10.0.0.1
headless.default.svc.cluster.local. 13 IN
A 10.0.0.2
How does this work with a StatefulSet?
- The StatefulSet sets the name of the pods to its hostname (e.g.
kafka-0
,kafka-1
, etc.). - Each Pod has an optional
subdomain
field which can be used to specify its DNS subdomain. - The StatefulSet assigns a subdomain when the Pod is created in the form of
$(podname).$(governing service domain)
, where theserviceName
field defines the governing service on the StatefulSet. - The Pod is now addressable with a fully qualified name of
<hostname>.<subdomain>.<namespace>.svc.cluster.local
.
For example, if Pod with hostname set to kafka-1
, and subdomain set to kafka-svc
, in namespace default
, will have the fully qualified domain name (FQDN) kafka-1.kafka-svc.default.svc.cluster.local
.
Now that we've covered the theory let's test the Kafka cluster by sending messages.
Producing an event
In Kafka terminology, Producers can publish Events to Topics.
Consumers can subscribe to those Topics and consume those Events.
Let's publish a simple event to a topic and consume it.
Before you interact with the container, let's find the IP addresses of the brokers by describing the headless service:
bash
kubectl describe service kafka-svc
Name: kafka-svc
Namespace: default
Labels: app=kafka-app
Selector: app=kafka-app
Type: ClusterIP
Port: 9092 9092/TCP
TargetPort: 9092/TCP
Endpoints: 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
Now, let's create a pod that you can use as a Kafka client:
bash
kubectl run kafka-client --rm -ti --image bitnami/kafka:3.1.0 -- bash
I have no name!@kafka-producer:/$
Inside the Kafka client container, there are a collection of scripts that make it easier to:
- Simulate a producer or consumer.
- Trigger leader election.
- Verify the replicas.
And more.
You can list them all with:
bash@kafka-client
ls /opt/bitnami/kafka/bin
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-cluster.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-consumer-perf-test.sh
kafka-delegation-tokens.sh
kafka-delete-records.sh
# truncated output
Using a "test" topic, let's run the example console producer script kafka-console-producer
:
bash@kafka-client
kafka-console-producer.sh \
--topic test \
--request-required-acks all \
--bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
When the >
prompt becomes visible, you can produce a "hello world" event:
prompt
>hello world
Notice how the script:
- Requires acknowledgements from all in-sync replicas to commit a batch of messages.
- There is a comma-separated list of Kafka broker IP addresses and port numbers.
The event is stored in Kafka, but how should a consumer retrieve it?
Consume the events on the "test" topic
In the same terminal session, terminate the script with Ctrl+C
and run the consumer script:
@kafka-client
kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
hello world
^CProcessed a total of 1 messages
The consumer continues to poll the broker for more events on the test
topic and process them as they happen.
Excellent!
You published a "hello world" event to the test
topic, and another process consumed it.
Let's move on to something more interesting.
What happens when there's a maintenance activity on a worker node?
How does it affect our Kafka cluster?
Surviving a node down for maintenance: drain the node hosting the leader
Let's simulate replacing a Kubernetes node hosting the broker.
First, from a Kafka client, let's determine which broker is the leader for the test
topic.
You can describe a topic using the kafka-topics.sh
script:
prompt@kafka-client
kafka-topics.sh --describe \
--topic test \
--bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
Topic: test
TopicId: P0SP1tEKTduolPh4apeV8Q
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test
Partition: 0
Leader: 1
Replicas: 1,0,2
Isr: 1,0,2
Leader: 1
means that the leader for the test
topic is broker 1.
In this Kafka setup (and by typical convention), its pod name is kafka-1
.
So now that you know that the test
topic leader is on the kafka-1
pod, you should find out where that pod is deployed with:
bash
kubectl get pod kafka-1 -o wide
NAME READY STATUS RESTARTS IP NODE
kafka-1 1/1 Running 0 10.42.0.12 k3d-kube-cluster-agent-0
Broker 1 is on the Kubernetes worker node k3d-kube-cluster-agent-0
.
Let's drain it to evict the pods with:
bash
kubectl drain k3d-kube-cluster-agent-0 \
--delete-emptydir-data \
--force \
--ignore-daemonsets
node/k3d-kube-cluster-agent-0 cordoned
evicting pod default/kafka-1
pod/kafka-1 evicted
node/k3d-kube-cluster-agent-0 evicted
The leader, kafka-1
was evicted as intended.
Since the brokers were spread equally across Kubernetes worker nodes, maintenance on one node will only bring down a fraction of the total brokers.
Do producers and consumers still work?
Does the Kafka cluster still work?
Can producers and consumers continue with business as usual?
Let's rerun the kafka console producer script with:
bash@kafka-client
kafka-console-producer.sh \
--topic test \
--bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
At the >
prompt, you can produce another "hello world" event with:
prompt
WARN Bootstrap broker 10.42.0.10:9092 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
>hello again, world
Notice the warning that one of the bootstrap servers failed to resolve.
Nonetheless, you managed to produce another message.
But can the consumer receive it?
Terminate the command with Ctrl+C
and issue the following command:
bash@kafka-client
kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
hello world
hello again, world
What happened?
Both messages were retrieved from the Kafka cluster — it worked!
Now stop the interactive session and describe the test
topic again with:
bash@kafka-client
kafka-topics.sh --describe \
--topic test \
--bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
Topic: test
TopicId: QqrcLtJSRoufzOZqNc9KcQ
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test
Partition: 0
Leader: 2
Replicas: 1,2,0
Isr: 2,0
There are a few interesting details:
- The topic Leader is now
2
(was 1). - The list of in-sync replicas
Isr
contains2,0
- broker 0 and broker 2. - Broker 1, however, is not in-sync.
This makes sense since broker one isn't available anymore.
A Kafka pod is Pending
So a node is down for maintenance, and if you list all the running Pods, you will notice that kafka-0
is Pending.
prompt
kubectl get pod -l app=kafka-app
NAME READY STATUS RESTARTS
kafka-0 1/1 Running 0
kafka-2 1/1 Running 0
kafka-1 0/1 Pending 0
But isn't Kubernetes supposed to reschedule the Pod to another worker node?
Let's investigate by describing the pod:
bash
kubectl describe pod kafka-1
# truncated
Events:
Type Reason From Message
---- ------ ---- -------
Warning FailedScheduling default-scheduler 0/3 nodes are available:
1 node(s) were unschedulable,
3 node(s) had volume node affinity conflict.
There are no nodes available for kafka-1
.
Although only k3d-kube-cluster-agent-0
is offline for maintenance, the other nodes don't meet the persistent volume's node affinity constraint.
Let's verify that.
First, let's find the PersistentVolume bound to the (defunct) kafka-1
:
bash
kubectl get persistentvolumes,persistentvolumeclaims
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM
persistentvolume/pvc-018e8d78 1Gi RWO Delete Bound default/data-kafka-1
persistentvolume/pvc-455a7f5b 1Gi RWO Delete Bound default/data-kafka-2
persistentvolume/pvc-abd6b6cf 1Gi RWO Delete Bound default/data-kafka-0
NAME STATUS VOLUME CAPACITY ACCESS MODES
persistentvolumeclaim/data-kafka-1 Bound pvc-018e8d78 1Gi RWO
persistentvolumeclaim/data-kafka-2 Bound pvc-455a7f5b 1Gi RWO
persistentvolumeclaim/data-kafka-0 Bound pvc-abd6b6cf 1Gi RWO
You can inspect the PersitentVolume with:
bash
kubectl get persistentvolume pvc-018e8d78
apiVersion: v1
kind: PersistentVolume
metadata:
name: pvc-018e8d78
spec:
accessModes:
- ReadWriteOnce
capacity:
storage: 1Gi
# truncated
hostPath:
path: /var/lib/rancher/k3s/storage/pvc-018e8d78_default_data-kafka-0
type: DirectoryOrCreate
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- k3d-kube-cluster-agent-0
persistentVolumeReclaimPolicy: Delete
storageClassName: local-path
volumeMode: Filesystem
Only k3d-kube-cluster-agent-0
has the volume that kafka-1
needs.
And the PersistentVolume cannot be moved elsewhere, so any pod that needs access to that volume should do so from k3d-kube-cluster-agent-0
.
Since the node is not available, the scheduler cannot assign the Pod, which stays Pending.
Please note that this volume schedule constraint is imposed by the local-path-provisioner and is not common to all provisioners.
In other words, you might find that another provisioner can attach the PersistentVolume to a different node, and the Pod can be rescheduled on the same node as another broker.
But that's not great — losing a single node to a failure could compromise the availability of the Kafka cluster.
Let's fix this by introducing a constraint on where Pods can be placed: a topology constraint.
Pod Topology Constraints help you spread the pods across failure domains
In any public cloud, a zone groups together resources that may fail together, for example, because of a power outage.
However, resources in different zones are unlikely to fail together.
This is useful for ensuring resilience since a power outage in one zone won't affect another.
Although the exact definition of a zone is left to infrastructure implementations, you can imagine two or three computer rooms, each with separate aircon, power supplies, network switches, racks, etc.
A zone is one example of a failure domain.
Another might be a region.
It's improbable that UK South and East US regions might fail simultaneously.
In Kubernetes, you can use this information to set constraints on where the Pod should be placed.
For example, you might constrain your Kafka brokers to be in different zones.
Here's an example of how to do that:
kafka.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/zone
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: kafka-app
containers:
- name: kafka-container
# truncated output
The topologySpreadConstraints
reads as follow:
maxSkew
describes the allowed drift for the pod distribution across the specified topology.topologyKey
is the name of the node label. All the nodes with the same label are considered in the same topology.whenUnsatisfiable
indicates what happens to a Pod that doesn't satisfy the spread constraint.labelSelector
is used to find matching Pods.
With the topology in place, the Pods will always spread among all availability zones — regardless of the node affinity in the PersistentVolume.
Should you use Pod Topology Constraints or Node Affinity?
Is the idea of assigning Volumes (and Pods) to the Kubernetes node legit?
Isn't the idea behind Kubernetes that pods can be rescheduled anywhere in the cluster?
While this might be true for stateless applications, it's more complex when it comes to stateful apps such as Kafka.
Stateful applications have unique requirements such as:
- You want to control what applications share the compute resources with the stateful app (if it all) to avoid "noisy neighbours".
- Ideally, the data doesn't move from one node to another, as copying terabytes across nodes is slow and error-prone.
- You want to provision Kubernetes nodes that are I/O optimized.
So it's usually an excellent idea to assign nodes to StatefulSets and ensure that they have a dedicated node pool with the right instance type.
Depending on your setup, you might want to assign NodeAffinity to your volumes and your Pods.
Return to full strength
Let's assume you completed the maintenance on the node, and you are ready to uncordon it.
bash
kubectl uncordon k3d-kube-cluster-agent-0
node/k3d-kube-cluster-agent-0 uncordoned
Now Kubernetes can schedule the kafka-1
Pod onto k3d-kube-cluster-agent-0
to be reunited with its Persistent Volume.
After a short while, you should notice that the pod is Running:
bash
kubectl get pod kafka-1 -o wide
NAME READY STATUS IP NODE
kafka-1 1/1 Running 10.42.0.14 k3d-kube-cluster-agent-0
What about the Kafka cluster?
Does the broker resume being part of the cluster?
Since the pod was recreated and assigned a different IP address, you should retrieve the new list of endpoints with:
bash
kubectl describe service kafka-svc
Name: kafka-svc
Namespace: default
Labels: app=kafka-app
Selector: app=kafka-app
Type: ClusterIP
Port: 9092 9092/TCP
TargetPort: 9092/TCP
Endpoints: 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092
Update the bootstrap broker endpoints list and query the test
topic with:
bash@kafka-client
kafka-topics.sh --describe \
--topic test \
--bootstrap-server 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092
Topic: test
TopicId: QqrcLtJSRoufzOZqNc9KcQ
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test
Partition: 0
Leader: 2
Replicas: 1,2,0
Isr: 2,0,1
Notice how the list of in-sync replicas is 2,0,1
.
So the kafka-1
pod (broker 1) could re-join the Kafka cluster and catch up on the additional message hello again, world
!
Excellent!
However, in this example, you removed only one node.
What happens if multiple nodes are cordoned at the same time?
Surviving multiple nodes down for maintenance
Imagine a cluster administrator or automated maintenance operation trying to drain another node while one is already offline.
What happens?
Let's test it:
bash
kubectl drain k3d-kube-cluster-agent-0 \
--delete-emptydir-data \
--force \
--ignore-daemonsets
node/k3d-kube-cluster-agent-0 cordoned
evicting pod default/kafka-1
pod/kafka-1 evicted
node/k3d-kube-cluster-agent-0 evicted
kubectl drain k3d-kube-cluster-agent-1 \
--delete-emptydir-data \
--force \
--ignore-daemonsets
node/k3d-kube-cluster-agent-1 cordoned
evicting pod default/kafka-2
pod/kafka-2 evicted
node/k3d-kube-cluster-agent-1 evicted
The nodes k3d-kube-cluster-agent-0
and k3d-kube-cluster-agent-1
were drained and kafka-1
and kafka-2
were evicted.
Now, if you query the pods:
bash
kubectl get pod -l app=kafka-app
NAME READY STATUS RESTARTS
kafka-1 1/1 Pending 1
kafka-0 0/1 Running 0
kafka-2 0/1 Pending 0
With only one broker running, can producers and consumers continue with business as usual?
Let's run the kafka console producer script and produce another message:
bash@kafka-client
kafka-console-producer.sh \
--topic test \
--bootstrap-server 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092
At the >
prompt, we can produce another hello?, world?!
message.
prompt
kafka-console-producer.sh \
--topic test \
--request-required-acks all \
--bootstrap-server 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092
>hello? world?!
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
ERROR Messages are rejected since there are fewer in-sync replicas than required.
The producer is blocked because there are fewer in-sync replicas than required.
How about the consumer?
Let's test it:
bash@kafka-client
kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
ERROR Messages are rejected since there are fewer in-sync replicas than required.
It also seems to be blocked on reading.
Kafka is officially unavailable.
Let's fix it so that failures of this kind can't happen again.
Pod Disruption Budget
You can limit disruptions due to maintenance by using a Pod Disruption Budget (PDB).
PodDisruptionBudgets defines the minimum number of replicas that must be available for that application to operate.
If you say that you need at least two pods and an event takes that number to one, Kubernetes will stop the operation to prevent decreasing the number of replicas.
A PodDisruptionBudget looks like this:
kafka-pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
spec:
minAvailable: 2
selector:
matchLabels:
app: kafka-app
There are (only) three fields:
- A label selector that matches the Pods.
- A
minAvailable
field defines the minimum number of replicas that should be running at all times. - A
maxUnavailable
field describes the maximum number of replicas that can be lost in a single time.
Since the Kafka cluster should keep the number of instances above quorum, you will set the minAvailable
to 2.
Let's create the object with:
bash
kubectl apply -f kafka-pdb.yaml
poddisruptionbudget.policy/kafka-pdb created
To test the PodDisruptionBudget, you should have at least two pods running and try to reduce the number by at least one.
Since you have a single Pod running, let's uncordon one of the nodes with:
bash
kubectl uncordon k3d-kube-cluster-agent-0
node/k3d-kube-cluster-agent-0 uncordoned
You should have two Pods Running and one Pending:
bash
kubectl get pod -l app=kafka-app
NAME READY STATUS RESTARTS
kafka-1 1/1 Running 1
kafka-0 1/1 Running 0
kafka-2 0/1 Pending 0
If an eviction event exceeds the pod disruption budget, the disruption will be prevented.
Let's test that by draining the node again:
bash
kubectl drain k3d-kube-cluster-agent-0 \
--delete-emptydir-data \
--force \
--ignore-daemonsets
node/k3d-kube-cluster-agent-0 cordoned
evicting pod default/kafka-0
error when evicting pods/"kafka-0" -n "default" (will retry after 5s):
Cannot evict pod as it would violate the pod's disruption budget.
Despite the error, notice that the node is still cordoned to prevent kubernetes scheduling new pods on it:
bash
kubectl get nodes
NAME STATUS ROLES VERSION
k3d-kube-cluster-server-0 Ready control-plane,master v1.22.7+k3s1
k3d-kube-cluster-agent-1 Ready,SchedulingDisabled <none> v1.22.7+k3s1
k3d-kube-cluster-agent-0 Ready,SchedulingDisabled <none> v1.22.7+k3s1
k3d-kube-cluster-agent-2 Ready <none> v1.22.7+k3s1
But the Kafka pod is still running:
bash
kubectl get pods -o wide
NAME READY STATUS IP NODE
kafka-1 1/1 Running 10.42.0.15 k3d-kube-cluster-agent-0
kafka-0 1/1 Running 10.42.0.13 k3d-kube-cluster-agent-2
kafka-2 1/1 Pending <none> <none>
Also, pods not constrained by the PodDisruptionBudget will still be evicted and rescheduled.
The node k3d-kube-cluster-agent-1
is still unavailable; what if it doesn't come back?
Breaking badly: the node ain't coming back!
If all partitions are replicated with two in-sync replicas, no data should be lost if a Kubernetes node is removed permanently.
However, the broker pod will never be rescheduled due to the node affinity constraint on its persistent volume.
Let's explore what happens.
You can remove a node completely with:
bash
kubectl delete node k3d-kube-cluster-agent-1
node "k3d-kube-cluster-agent-1" deleted
In this case, kafka-2
is pending because k3d-kube-cluster-agent-1
has gone, and along with it, kafka-2's local data volume.
bash
kubectl get pods kafka-1 -o wide
NAME READY STATUS RESTARTS
kafka-2 0/1 Pending 0
It can't be rescheduled onto another node because no other node can satisfy the nodeAffinity constraint on the Volume.
Do producers and consumers still work?
Can the cluster live with this?
With two brokers running, you should expect Kafka to be available for producers and consumers.
Let's make a quick sanity check.
Since most of the broker IP addresses have been rotated, let's retrieve them with:
bash
kubectl describe service kafka-svc
Name: kafka-svc
Namespace: default
Labels: app=kafka-app
Selector: app=kafka-app
Type: ClusterIP
Port: 9092 9092/TCP
TargetPort: 9092/TCP
Endpoints: 10.42.0.15:9092,10.42.0.13:9092
Produce another message, "Hello World. Do you copy?":
bash@kafka-client
kafka-console-producer.sh \
--topic test \
--request-required-acks all \
--bootstrap-server 10.42.0.15:9092,10.42.0.13:9092
>Hello World. Do you copy?
The message seems to be committed.
How about consuming the messages, both new and historical ones?
bash@kafka-client
kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server 10.42.0.15:9092,10.42.0.13:9092
hello world
hello again, world
Hello World. Do you copy?
Excellent!
All messages were retrieved!
Let's also examine the "test" topic with:
bash@kafka-client
kafka-topics.sh --describe \
--topic test \
--bootstrap-server 10.42.0.15:9092,10.42.0.13:9092
Topic: test
TopicId: QqrcLtJSRoufzOZqNc9KcQ
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test
Partition: 0
Leader: 1
Replicas: 1,2,0
Isr: 1,2
So, producers and consumers are still available, but can we live with only two broker nodes in your Kafka cluster?
No, not really.
The current state prohibits any voluntary maintenance.
Like this drain
operation, for example:
bash
kubectl drain k3d-kube-cluster-agent-2 --ignore-daemonsets
node/k3d-kube-cluster-agent-2 cordoned
evicting pod default/kafka-0
error when evicting pods/"kafka-0" -n "default" (will retry after 5s):
Cannot evict pod as it would violate the pod's disruption budget.
So, how might we recover from this scenario?
Kafka-2 is dead. Long live its successor, kafka-2.
You can add a new Kubernetes worker in the same zone (zone-a) from which k3d-kube-cluster-agent-1
departed.
In this tutorial, using k3d, adding a new node looks like this:
bash
k3d node create kube-cluster-new-agent \
--cluster kube-cluster \
--k3s-node-label topology.kubernetes.io/zone=zone-b
INFO[0000] Adding 1 node(s) to the runtime local cluster 'kube-cluster'...
INFO[0000] Starting Node 'k3d-kube-cluster-new-agent-4'
INFO[0008] Successfully created 1 node(s)!
You can use kubectl get nodes
to see it joined the cluster:
bash
kubectl get nodes
NAME STATUS VERSION
k3d-kube-cluster-new-agent-4 Ready v1.21.5+k3s2
# truncated output
Yup, there it is — joined and Ready.
You can clean up the old broker by deleting the PVC:
bash
kubectl delete pvc data-kafka-2
persistentvolumeclaim "data-kafka-0" deleted
When we delete the kafka-2
pod, kubernetes can reschedule it to a new node.
bash
kubectl delete po kafka-2
pod "kafka-2" deleted
You can observe the new Kafka broker pod bootstrap as it happens:
bash
kubectl get pods --watch
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 1 4d23h
kafka-1 1/1 Running 8 14d
kafka-2 0/1 ContainerCreating 0 14s
kafka-2 1/1 Running 0 51s
If you examine the status, you will notice that all Kafka brokers are running, and a new PersistentVolumeClaim and PersistentVolume is created:
bash
kubectl get pods,pvc,pv
NAME READY STATUS
pod/kafka-2 1/1 Running
pod/kafka-1 1/1 Running
pod/kafka-0 1/1 Running
NAME STATUS VOLUME CAPACITY ACCESS MODES
persistentvolumeclaim/data-kafka-1 Bound pvc-018e8d78 1Gi RWO
persistentvolumeclaim/data-kafka-2 Bound pvc-455a7f5b 1Gi RWO
persistentvolumeclaim/data-kafka-0 Bound pvc-abd6b6cf 1Gi RWO
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM
persistentvolume/pvc-018e8d78 1Gi RWO Delete Bound default/data-kafka-1
persistentvolume/pvc-455a7f5b 1Gi RWO Delete Bound default/data-kafka-2
persistentvolume/pvc-fe291ef2 1Gi RWO Delete Released default/data-kafka-0
persistentvolume/pvc-abd6b6cf 1Gi RWO Delete Bound default/data-kafka-0
Is the replacement broker in sync?
With the test topic partitions replicated three times, you should expect kafka-0
to sync with the other brokers eventually.
Retrieve the new endpoints with:
bash
kubectl describe service kafka-svc
Name: kafka-svc
Namespace: default
Labels: app=kafka-app
Selector: app=kafka-app
Type: ClusterIP
Port: 9092 9092/TCP
TargetPort: 9092/TCP
Endpoints: 10.42.0.15:9092,10.42.0.13:9092,10.42.1.16:9092
Let's query the test topic and inspect the status:
bash@kafka-client
kafka-topics.sh --describe \
--topic test \
--bootstrap-server 10.42.0.15:9092,10.42.0.13:9092,10.42.1.16:9092
Topic: test
TopicId: QqrcLtJSRoufzOZqNc9KcQ
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test
Partition: 0
Leader: 2
Replicas: 1,2,0
Isr: 2,0,1
Yep, the test topic has "2,0,1" as in-sync replicas, meaning all brokers are in sync for the test topic.
Let's consume the messages from the new broker to validate that.
First, let's get the pod IP address of kafka-0
, our newest broker.
bash
kubectl get pod kafka-0 -o jsonpath='{.status.podIP}'
10.42.0.13
Second, let's run the console consumer from the Kafka client pod, specifying only kafka-0
's pod IP address:
bash
kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server 10.42.0.13:9092
hello world
hello again, world
Hello World. Do you copy?
Great!
It's in sync!
Let's recap.
Summary
In this article, you designed and tested a Kafka cluster for high availability so that producers and consumers can continue to flow during a failure.
- We acknowledged that 100% availability is not possible and that maintenance and accidental disruption are more likely than infrastructure failure.
- Kubernetes lost a node and its persistent volume.
- Kafka eventually recovered, without data loss, and without blocking producers and consumers.
- Kafka can leverage local storage for its data because it looks after topic partitions and replicas.
You also learned how you could leverage Kubernetes features to make your Kafka cluster highly available:
- You used pod topology constraints to spread your nodes across nodes and availability zones.
- You learn how the local path provisioner creates persistent volumes with nodeAffinity.
- You defined pod disruptions budgets to prevent administration events that could cause unplanned downtime.
Please note that this article used Kafka in Kraft mode (aka Zookeeperless) for simplicity so that we could focus on the availability of a single stateful service in kubernetes.
However, KRaft is not production-ready yet.
In particular, partition re-assignments, unclean leader election, dynamically changing broker endpoints, and any kind of upgrade are unsupported in Kraft mode.