Douglas Hellinger
Douglas Hellinger

Designing and testing a highly available Kafka cluster on Kubernetes

April 2022


Designing and testing a highly available Kafka cluster on Kubernetes

TL;DR: In this article, you'll look at Kafka's architecture and how it supports high availability with replicated partitions. Then, you will design a Kafka cluster to achieve high availability using standard Kubernetes resources and see how it tolerates node maintenance and total node failure.

In its simplest form, the architecture of Kafka consists of a single Broker server and its Producers and Consumers as clients.

  • Producers create records and publish them to the Kafka broker.
    1/2

    Producers create records and publish them to the Kafka broker.

  • A consumer consumes records from the broker.
    2/2

    A consumer consumes records from the broker.

Although this Kafka cluster can support typical Kafka use cases, it is too simplistic for most practical cases.

Kafka is typically run as a cluster of three or more brokers that can span multiple data centers or cloud regions.

A highly available Kafka cluster with three nodes

This cluster architecture supports the need for scalability, consistency, availability, partition tolerance and performance.

Like any engineering endeavour, there are trade-offs to be made between these qualities.

In this article, your learning goals are to explore the availability of Kafka on Kubernetes.

In particular, we will design a Kafka cluster that:

  1. Prefers availability over consistency, which is a trade-off you may want to make for a use case such as real-time metrics collection, where, in case of failure, availability to write new data is more important than losing some historical data points.
  2. Chooses simplicity over other non-functional requirements (e.g. security, performance, efficiency, etc.) to focus on learning Kafka and Kubernetes.
  3. Assumes that maintenance and unplanned disruptions are more likely than infrastructure failure.

With those goals in mind, let's first discuss a typical highly available Kafka cluster — without Kubernetes.

Table of content

Kafka partitions and replication-factor

In Kafka, messages are categorized into topics, and each topic has a name that is unique across the entire cluster.

For example, if you build a chat app, you might have a topic for each room (e.g. "dave-tom-chat").

But what happens when the number of messages outgrows the size of the broker?

Topics are broken down into partitions, each of which can live on a separate node in the Kafka cluster.

In other words, all messages from a single topic could be stored in different brokers, but all the messages from a single partition can only be found on the same node.

  • If a topic contains all messages, how does it work when there is no space on the device?
    1/3

    If a topic contains all messages, how does it work when there is no space on the device?

  • Kafka uses partitions to distribute records to multiple brokers.
    2/3

    Kafka uses partitions to distribute records to multiple brokers.

  • Each topic can have a different number of partitions. All the records from a single partition are always stored together on the node.
    3/3

    Each topic can have a different number of partitions. All the records from a single partition are always stored together on the node.

This design choice enables parallelization of topics, scalability and high message throughput.

But there's more.

Topics are configured with a replication factor, which determines the number of copies for each partition.

If a cluster has a single topic with one partition, a replication factor of three means that there are three partitions: one copy for each partition.

A Kafka cluster with a topic with a replication factor of 3

All replicas of a partition exist on separate brokers, so you cannot have more partition copies than nodes in the cluster.

In the previous example, with a replication factor of three, you should expect at least three nodes in your Kafka cluster.

But how does Kafka keep those copies in sync?

Partitions are organized into leaders and followers, where the partition leader handles all writes and reads, and followers are purely for failover.

A follower can either be in-sync with the leader (containing all the partition leader's messages, except for messages within a small buffer window) or out of sync.

Kafka partitions are organized in follower and leaders

The set of all in-sync replicas is referred to as the ISR (in-sync replicas).

Those are the basics of Kafka and replication; let's see what happens when it breaks.

Understanding broker outages

Let's imagine the Kafka cluster has three brokers and a replication factor of 1.

There's a single topic in the cluster with a single partition.

When the broker becomes unavailable, the partition is unavailable too, and the cluster can't serve consumers or producers.

A Kafka cluster with a single partition cannot cope with losing a node

Let's change this by setting the replication factor to 3.

In this scenario, each broker has a copy of a partition.

What happens when a broker is made unavailable?

What happens when you lose a broker in a Kafka cluster with a replicator factor of 3

If the partition has additional in-sync replicas, one of those will become the interim partition leader.

The cluster can operate as usual, and there's no downtime for consumers or producers.

  • A Kafka cluster with all partitions in sync loses a broker.
    1/2

    A Kafka cluster with all partitions in sync loses a broker.

  • One of the two partitions will be promoted as the leader, and the cluster will keep operating as usual.
    2/2

    One of the two partitions will be promoted as the leader, and the cluster will keep operating as usual.

What about when there are partition copies, but they are not in sync?

In this case, there are two options:

  1. Either we choose to wait for the partition leader to come back online–sacrificing availability or
  2. Allow an out-of-sync replica to become the interim partition leader–sacrificing consistency.
  • A Kafka cluster with partitions not in sync loses a broker.
    1/3

    A Kafka cluster with partitions not in sync loses a broker.

  • The cluster can promote one of the out of sync replicas to be the leader. However, you might miss some records.
    2/3

    The cluster can promote one of the out of sync replicas to be the leader. However, you might miss some records.

  • Alternatively, you can wait for the broker to return and thus compromise your availability to dispatch events.
    3/3

    Alternatively, you can wait for the broker to return and thus compromise your availability to dispatch events.

Now that we've discussed a few failure scenarios let's see how you could mitigate them.

Requirements to mitigate common failures

You probably noticed that a partition should have an extra in-sync replica (ISR) available to survive the loss of the partition leader.

So a naive cluster size could have two brokers with a minimum in-sync replica size of 2.

However, that's not enough.

If you only have two replicas and then lose a broker, the in-sync replica size decreases to 1 and neither the producer nor consumer can work (i.e. minimum in-sync replica is 2).

Therefore, the number of brokers should be greater than the minimum in-sync replica size (i.e. at least 3).

  • You could set up a Kafka cluster with only two brokers and a minimum in-sync replica size of 2.
    1/4

    You could set up a Kafka cluster with only two brokers and a minimum in-sync replica size of 2.

  • However, when a broker is lost, the cluster becomes unavailable because a single replica is in sync.
    2/4

    However, when a broker is lost, the cluster becomes unavailable because a single replica is in sync.

  • You should provision a Kafka cluster that has one broker more than the size of the in-sync replica.
    3/4

    You should provision a Kafka cluster that has one broker more than the size of the in-sync replica.

  • In this case, the Kafka cluster can still carry on if one broker is lost.
    4/4

    In this case, the Kafka cluster can still carry on if one broker is lost.

But where should you place those broker nodes?

Considering that you will have to host the Kafka cluster, it's good to spread brokers among failure-domains such as regions, zones, nodes, etc.

So, if you wish to design a Kafka cluster that can tolerate one planned and one unplanned failure, you should consider the following requirements:

  1. A minimum in-sync replicas of 2.
  2. A replication factor of 3 for topics.
  3. At least 3 Kafka brokers, each running on different nodes.
  4. Nodes spread across three availability zones.

In the remaining part of the article, you will build and break a Kafka cluster on Kubernetes to validate those assumptions.

Deploying a 3-node Kafka cluster on Kubernetes

Let's create a three-node cluster that spans three availability zones with:

bash

k3d cluster create kube-cluster \
  --agents 3 \
  --k3s-node-label topology.kubernetes.io/zone=zone-a@agent:0 \
  --k3s-node-label topology.kubernetes.io/zone=zone-b@agent:1 \
  --k3s-node-label topology.kubernetes.io/zone=zone-c@agent:2
INFO[0000] Created network 'k3d-kube-cluster'
INFO[0000] Created image volume k3d-kube-cluster-images
INFO[0000] Starting new tools node...
INFO[0001] Creating node 'k3d-kube-cluster-server-0'
INFO[0003] Starting Node 'k3d-kube-cluster-tools'
INFO[0012] Creating node 'k3d-kube-cluster-agent-0'
INFO[0012] Creating node 'k3d-kube-cluster-agent-1'
INFO[0012] Creating node 'k3d-kube-cluster-agent-2'
INFO[0012] Creating LoadBalancer 'k3d-kube-cluster-serverlb'
INFO[0017] Starting new tools node...
INFO[0017] Starting Node 'k3d-kube-cluster-tools'
INFO[0018] Starting cluster 'kube-cluster'
INFO[0018] Starting servers...
INFO[0018] Starting Node 'k3d-kube-cluster-server-0'
INFO[0022] Starting agents...
INFO[0022] Starting Node 'k3d-kube-cluster-agent-1'
INFO[0022] Starting Node 'k3d-kube-cluster-agent-0'
INFO[0022] Starting Node 'k3d-kube-cluster-agent-2'
INFO[0032] Starting helpers...
INFO[0032] Starting Node 'k3d-kube-cluster-serverlb'
INFO[0041] Cluster 'kube-cluster' created successfully!

You can verify that the cluster is ready with:

bash

kubectl get nodes
NAME                        STATUS   ROLES                  VERSION
k3d-kube-cluster-server-0   Ready    control-plane,master   v1.22.7+k3s1
k3d-kube-cluster-agent-1    Ready    <none>                 v1.22.7+k3s1
k3d-kube-cluster-agent-0    Ready    <none>                 v1.22.7+k3s1
k3d-kube-cluster-agent-2    Ready    <none>                 v1.22.7+k3s1

Next, let's deploy a Kafka cluster as a Kubernetes StatefulSet.

Here's a YAML manifest, kafka.yaml, defining the resources required to create a simple Kafka cluster:

kafka.yaml

apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka-app
spec:
  clusterIP: None
  ports:
    - name: '9092'
      port: 9092
      protocol: TCP
      targetPort: 9092
  selector:
    app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka-app
spec:
  serviceName: kafka-svc
  replicas: 3
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      containers:
        - name: kafka-container
          image: doughgle/kafka-kraft
          ports:
            - containerPort: 9092
            - containerPort: 9093
          env:
            - name: REPLICAS
              value: '3'
            - name: SERVICE
              value: kafka-svc
            - name: NAMESPACE
              value: default
            - name: SHARE_DIR
              value: /mnt/kafka
            - name: CLUSTER_ID
              value: oh-sxaDRTcyAr6pFRbXyzA
            - name: DEFAULT_REPLICATION_FACTOR
              value: '3'
            - name: DEFAULT_MIN_INSYNC_REPLICAS
              value: '2'
          volumeMounts:
            - name: data
              mountPath: /mnt/kafka
  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        accessModes:
          - "ReadWriteOnce"
        resources:
          requests:
            storage: "1Gi"

You can apply all the resources in this YAML file with:

bash

kubectl apply -f kafka.yaml
service/kafka-svc created
statefulset.apps/kafka created

Inspect the resources created with:

bash

kubectl get -f kafka.yaml
NAME                TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)
service/kafka-svc   ClusterIP   None         <none>        9092/TCP

NAME                     READY
statefulset.apps/kafka   3/3

There is a StatefulSet with three ready Kafka broker pods and a service.

There are also three independent PersistentVolumeClaims for storing Kafka data, one for each broker:

bash

kubectl get pvc,pv
NAME                                 STATUS   VOLUME         CAPACITY   ACCESS MODES
persistentvolumeclaim/data-kafka-0   Bound    pvc-eec953ae   1Gi        RWO
persistentvolumeclaim/data-kafka-1   Bound    pvc-5544a431   1Gi        RWO
persistentvolumeclaim/data-kafka-2   Bound    pvc-11a64b48   1Gi        RWO

What are all of those resources?

Let's examine some of the highlights of the configuration in the kafka.yaml manifest.

There are two resources defined:

  1. A StatefulSet.
  2. A Headless service.

The Kafka StatefulSet

A StatefulSet is an object designed to create pod replicas — just like a Deployment.

But unlike a Deployment, a StatefulSet provides guarantees about the ordering and uniqueness of these Pods.

Each Pod in a StatefulSet derives its hostname from the name of the StatefulSet and the ordinal of the Pod.

The pattern is $(statefulset name)-$(ordinal).

In your case, the name of the StatefulSets is kafka, so you should expect three pods with kafka-0, kafka-1, kafka-2.

A Kafka cluster deployed as a StatefulSet

Let's verify that with:

bash

kubectl get pods
NAME      READY   STATUS    RESTARTS
kafka-0   1/1     Running   0
kafka-1   1/1     Running   0
kafka-2   1/1     Running   0

What happens when you delete kafka-0?

Does Kubernetes spawn kafka-3?

Let's test it with:

bash

kubectl delete pod kafka-0
pod "kafka-0" deleted

List the running pods with:

bash

kubectl get pods
NAME      READY   STATUS    RESTARTS
kafka-1   1/1     Running   0
kafka-2   1/1     Running   0
kafka-0   1/1     Running   0

Kubernetes recreated the Pod with the same name!

Let's inspect the rest of the StatefulSet YAML definition.

yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka-app
spec:
  serviceName: kafka-svc
  replicas: 3
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      containers:
        - name: kafka-container
          image: doughgle/kafka-kraft
          ports:
            - containerPort: 9092
# truncated output

The StatefulSet defines three replicas so that three pods will be created from the pod spec template.

There's a container image that, when it starts, it:

  1. Configures the broker's server.properties with its unique broker id, internal and external listeners, and quorum voters list.
  2. Formats the log directory.
  3. Starts the Kafka Java process.

If you are interested in the details of those actions, you can find the script in this repository.

The container image exposes two ports:

In the next part of the YAML, there is a long list of environment variables:

kafka.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka-app
spec:
  serviceName: kafka-svc
  replicas: 3
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      containers:
        - name: kafka-container
          image: doughgle/kafka-kraft
          ports:
            - containerPort: 9092
            - containerPort: 9093
          env:
            - name: REPLICAS
              value: '3'
            - name: SERVICE
              value: kafka-svc
            - name: NAMESPACE
              value: default
            - name: SHARE_DIR
              value: /mnt/kafka
            - name: CLUSTER_ID
              value: oh-sxaDRTcyAr6pFRbXyzA
            - name: DEFAULT_REPLICATION_FACTOR
              value: '3'
            - name: DEFAULT_MIN_INSYNC_REPLICAS
              value: '2'
          volumeMounts:
            - name: data
              mountPath: /mnt/kafka
  volumeClaimTemplates:
# truncated output

Those are used in the entry point script to derive values for broker settings in server.properties:

In the rest of the YAML, there's the definition for a PersitentVolumeClaim template and the volumeMounts:

kafka.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka-app
spec:
  serviceName: kafka-svc
  replicas: 3
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      containers:
        - name: kafka-container
          image: doughgle/kafka-kraft
          ports:
            - containerPort: 9092
            - containerPort: 9093
          env:
            # truncated output
          volumeMounts:
            - name: data
              mountPath: /mnt/kafka
  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        accessModes:
          - "ReadWriteOnce"
        resources:
          requests:
            storage: "1Gi"

For each pod, the StatefulSet creates a PersistentVolumeClaim using the details in the volumeClaimTemplates.

Each Pod in a StatefulSet has a Persistent Volume Claim and Persistent Volume

In this case, it creates a PersistentVolumeClaim with:

The PersistentVolumeClaim is then bound to the underlying storage via a PersistentVolume.

The claim is mounted as a volume in the container at /mnt/kafka.

This is where the Kafka broker stores data in files organised by topic and partition.

It's important to notice that the StatefulSet guarantees that a given Pod will always map to the same storage identity.

If the pod kafka-0 is deleted, Kubernetes will recreate one with the same name and mount the same PersistentVolumeClaim and PersistentVolume.

Keep this in mind as it will become useful later.

Combining a StatefulSet with a Headless Service

At the beginning of the YAML definition for your Kafka cluster, there is a Service definition:

kafka.yaml

apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka-app
spec:
  clusterIP: None
  ports:
    - name: '9092'
      port: 9092
      protocol: TCP
      targetPort: 9092
  selector:
    app: kafka-app

A Service with clusterIP: None is usually called a Headless Service.

But Kubernetes has four types of services:

  1. ClusterIP.
  2. NodePort.
  3. LoadBalancer.
  4. External.

So, what's a Headless Service?

A Headless Service is a variation of the ClusterIP service with no IP address.

So, how do you use it?

A headless service is helpful in combination with CoreDNS.

When you issue a DNS query to a standard ClusterIP service, you receive a single IP address:

bash

dig standard-cluster-ip.default.svc.cluster.local

;; QUESTION SECTION:
;standard-cluster-ip.default.svc.cluster.local. IN    A

;; ANSWER SECTION:
standard-cluster-ip.default.svc.cluster.local. 30 IN A    10.100.0.1

However, when you query a Headless service, the DNS replies with all of the individual IP addresses of the Pods (in this case, the service has two pods):

bash

dig headless.default.svc.cluster.local

;; QUESTION SECTION:
;headless.default.svc.cluster.local. IN A

;; ANSWER SECTION:
headless.default.svc.cluster.local. 13 IN
 A 10.0.0.1
headless.default.svc.cluster.local. 13 IN
 A 10.0.0.2

How does this work with a StatefulSet?

  1. The StatefulSet sets the name of the pods to its hostname (e.g. kafka-0, kafka-1, etc.).
  2. Each Pod has an optional subdomain field which can be used to specify its DNS subdomain.
  3. The StatefulSet assigns a subdomain when the Pod is created in the form of $(podname).$(governing service domain), where the serviceName field defines the governing service on the StatefulSet.
  4. The Pod is now addressable with a fully qualified name of <hostname>.<subdomain>.<namespace>.svc.cluster.local.

For example, if Pod with hostname set to kafka-1, and subdomain set to kafka-svc, in namespace default, will have the fully qualified domain name (FQDN) kafka-1.kafka-svc.default.svc.cluster.local.

When the Headless is used in conjunction with a StatefulSet, individual Pods entries are added to the DNS

Now that we've covered the theory let's test the Kafka cluster by sending messages.

Producing an event

In Kafka terminology, Producers can publish Events to Topics.

Consumers can subscribe to those Topics and consume those Events.

Let's publish a simple event to a topic and consume it.

Before you interact with the container, let's find the IP addresses of the brokers by describing the headless service:

bash

kubectl describe service kafka-svc
Name:              kafka-svc
Namespace:         default
Labels:            app=kafka-app
Selector:          app=kafka-app
Type:              ClusterIP
Port:              9092  9092/TCP
TargetPort:        9092/TCP
Endpoints:         10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092

Now, let's create a pod that you can use as a Kafka client:

bash

kubectl run kafka-client --rm -ti --image bitnami/kafka:3.1.0 -- bash
I have no name!@kafka-producer:/$

Inside the Kafka client container, there are a collection of scripts that make it easier to:

And more.

You can list them all with:

bash@kafka-client

ls /opt/bitnami/kafka/bin
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-cluster.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-consumer-perf-test.sh
kafka-delegation-tokens.sh
kafka-delete-records.sh
# truncated output

Using a "test" topic, let's run the example console producer script kafka-console-producer:

bash@kafka-client

kafka-console-producer.sh \
  --topic test \
  --request-required-acks all \
  --bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092

When the > prompt becomes visible, you can produce a "hello world" event:

prompt

>hello world

Notice how the script:

  1. Requires acknowledgements from all in-sync replicas to commit a batch of messages.
  2. There is a comma-separated list of Kafka broker IP addresses and port numbers.
An event is Stored in the Kafka cluster

The event is stored in Kafka, but how should a consumer retrieve it?

Consume the events on the "test" topic

In the same terminal session, terminate the script with Ctrl+C and run the consumer script:

@kafka-client

kafka-console-consumer.sh \
  --topic test \
  --from-beginning \
  --bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
hello world
^CProcessed a total of 1 messages

The consumer continues to poll the broker for more events on the test topic and process them as they happen.

Excellent!

You published a "hello world" event to the test topic, and another process consumed it.

Let's move on to something more interesting.

What happens when there's a maintenance activity on a worker node?

How does it affect our Kafka cluster?

Surviving a node down for maintenance: drain the node hosting the leader

Let's simulate replacing a Kubernetes node hosting the broker.

First, from a Kafka client, let's determine which broker is the leader for the test topic.

You can describe a topic using the kafka-topics.sh script:

prompt@kafka-client

kafka-topics.sh --describe \
  --topic test \
  --bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
Topic: test
TopicId: P0SP1tEKTduolPh4apeV8Q
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824

Topic: test
Partition: 0
Leader: 1
Replicas: 1,0,2
Isr: 1,0,2

Leader: 1 means that the leader for the test topic is broker 1.

In this Kafka setup (and by typical convention), its pod name is kafka-1.

So now that you know that the test topic leader is on the kafka-1 pod, you should find out where that pod is deployed with:

bash

kubectl get pod kafka-1 -o wide
NAME      READY   STATUS    RESTARTS   IP           NODE
kafka-1   1/1     Running   0          10.42.0.12   k3d-kube-cluster-agent-0

Broker 1 is on the Kubernetes worker node k3d-kube-cluster-agent-0.

Let's drain it to evict the pods with:

bash

kubectl drain k3d-kube-cluster-agent-0 \
  --delete-emptydir-data \
  --force \
  --ignore-daemonsets
node/k3d-kube-cluster-agent-0 cordoned
evicting pod default/kafka-1
pod/kafka-1 evicted
node/k3d-kube-cluster-agent-0 evicted

The leader, kafka-1 was evicted as intended.

A three broker Kafka cluster just lost a node

Since the brokers were spread equally across Kubernetes worker nodes, maintenance on one node will only bring down a fraction of the total brokers.

Do producers and consumers still work?

Does the Kafka cluster still work?

Can producers and consumers continue with business as usual?

Let's rerun the kafka console producer script with:

bash@kafka-client

kafka-console-producer.sh \
  --topic test \
  --bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092

At the > prompt, you can produce another "hello world" event with:

prompt

WARN Bootstrap broker 10.42.0.10:9092 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
>hello again, world

Notice the warning that one of the bootstrap servers failed to resolve.

Nonetheless, you managed to produce another message.

Producing an event with one unavailable Kafka broker

But can the consumer receive it?

Terminate the command with Ctrl+C and issue the following command:

bash@kafka-client

kafka-console-consumer.sh \
  --topic test \
  --from-beginning \
  --bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
hello world
hello again, world

What happened?

Both messages were retrieved from the Kafka cluster — it worked!

Now stop the interactive session and describe the test topic again with:

bash@kafka-client

kafka-topics.sh --describe \
  --topic test \
  --bootstrap-server 10.42.0.10:9092,10.42.0.12:9092,10.42.0.13:9092
Topic: test
TopicId: QqrcLtJSRoufzOZqNc9KcQ
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824

Topic: test
Partition: 0
Leader: 2
Replicas: 1,2,0
Isr: 2,0

There are a few interesting details:

  1. The topic Leader is now 2 (was 1).
  2. The list of in-sync replicas Isr contains 2,0 - broker 0 and broker 2.
  3. Broker 1, however, is not in-sync.

This makes sense since broker one isn't available anymore.

A Kafka pod is Pending

So a node is down for maintenance, and if you list all the running Pods, you will notice that kafka-0 is Pending.

prompt

kubectl get pod -l app=kafka-app
NAME      READY   STATUS    RESTARTS
kafka-0   1/1     Running   0
kafka-2   1/1     Running   0
kafka-1   0/1     Pending   0

But isn't Kubernetes supposed to reschedule the Pod to another worker node?

Let's investigate by describing the pod:

bash

kubectl describe pod kafka-1
# truncated
Events:
Type     Reason            From               Message
----     ------            ----               -------
Warning  FailedScheduling  default-scheduler  0/3 nodes are available:
                                              1 node(s) were unschedulable,
                                              3 node(s) had volume node affinity conflict.

There are no nodes available for kafka-1.

Although only k3d-kube-cluster-agent-0 is offline for maintenance, the other nodes don't meet the persistent volume's node affinity constraint.

Let's verify that.

First, let's find the PersistentVolume bound to the (defunct) kafka-1:

bash

kubectl get persistentvolumes,persistentvolumeclaims
NAME                            CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS     CLAIM
persistentvolume/pvc-018e8d78   1Gi        RWO            Delete           Bound      default/data-kafka-1
persistentvolume/pvc-455a7f5b   1Gi        RWO            Delete           Bound      default/data-kafka-2
persistentvolume/pvc-abd6b6cf   1Gi        RWO            Delete           Bound      default/data-kafka-0

NAME                                 STATUS   VOLUME         CAPACITY   ACCESS MODES
persistentvolumeclaim/data-kafka-1   Bound    pvc-018e8d78   1Gi        RWO
persistentvolumeclaim/data-kafka-2   Bound    pvc-455a7f5b   1Gi        RWO
persistentvolumeclaim/data-kafka-0   Bound    pvc-abd6b6cf   1Gi        RWO

You can inspect the PersitentVolume with:

bash

kubectl get persistentvolume pvc-018e8d78
apiVersion: v1
kind: PersistentVolume
metadata:
  name: pvc-018e8d78
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 1Gi
  # truncated
  hostPath:
    path: /var/lib/rancher/k3s/storage/pvc-018e8d78_default_data-kafka-0
    type: DirectoryOrCreate
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - k3d-kube-cluster-agent-0
  persistentVolumeReclaimPolicy: Delete
  storageClassName: local-path
  volumeMode: Filesystem

Only k3d-kube-cluster-agent-0 has the volume that kafka-1 needs.

And the PersistentVolume cannot be moved elsewhere, so any pod that needs access to that volume should do so from k3d-kube-cluster-agent-0.

Since the node is not available, the scheduler cannot assign the Pod, which stays Pending.

Please note that this volume schedule constraint is imposed by the local-path-provisioner and is not common to all provisioners.

In other words, you might find that another provisioner can attach the PersistentVolume to a different node, and the Pod can be rescheduled on the same node as another broker.

But that's not great — losing a single node to a failure could compromise the availability of the Kafka cluster.

Let's fix this by introducing a constraint on where Pods can be placed: a topology constraint.

Pod Topology Constraints help you spread the pods across failure domains

In any public cloud, a zone groups together resources that may fail together, for example, because of a power outage.

However, resources in different zones are unlikely to fail together.

This is useful for ensuring resilience since a power outage in one zone won't affect another.

Although the exact definition of a zone is left to infrastructure implementations, you can imagine two or three computer rooms, each with separate aircon, power supplies, network switches, racks, etc.

A zone is one example of a failure domain.

Another might be a region.

It's improbable that UK South and East US regions might fail simultaneously.

In Kubernetes, you can use this information to set constraints on where the Pod should be placed.

For example, you might constrain your Kafka brokers to be in different zones.

Here's an example of how to do that:

kafka.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka-app
spec:
  serviceName: kafka-svc
  replicas: 3
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      topologySpreadConstraints:
        - maxSkew: 1
          topologyKey: kubernetes.io/zone
          whenUnsatisfiable: DoNotSchedule
          labelSelector:
            matchLabels:
              app: kafka-app
      containers:
        - name: kafka-container
# truncated output

The topologySpreadConstraints reads as follow:

With the topology in place, the Pods will always spread among all availability zones — regardless of the node affinity in the PersistentVolume.

Should you use Pod Topology Constraints or Node Affinity?

Is the idea of assigning Volumes (and Pods) to the Kubernetes node legit?

Isn't the idea behind Kubernetes that pods can be rescheduled anywhere in the cluster?

While this might be true for stateless applications, it's more complex when it comes to stateful apps such as Kafka.

Stateful applications have unique requirements such as:

  1. You want to control what applications share the compute resources with the stateful app (if it all) to avoid "noisy neighbours".
  2. Ideally, the data doesn't move from one node to another, as copying terabytes across nodes is slow and error-prone.
  3. You want to provision Kubernetes nodes that are I/O optimized.

So it's usually an excellent idea to assign nodes to StatefulSets and ensure that they have a dedicated node pool with the right instance type.

Depending on your setup, you might want to assign NodeAffinity to your volumes and your Pods.

Return to full strength

Let's assume you completed the maintenance on the node, and you are ready to uncordon it.

bash

kubectl uncordon k3d-kube-cluster-agent-0
node/k3d-kube-cluster-agent-0 uncordoned

Now Kubernetes can schedule the kafka-1 Pod onto k3d-kube-cluster-agent-0 to be reunited with its Persistent Volume.

The Kafka broker rejoined the cluster and is in sync with its peers

After a short while, you should notice that the pod is Running:

bash

kubectl get pod kafka-1 -o wide
NAME      READY   STATUS    IP           NODE
kafka-1   1/1     Running   10.42.0.14   k3d-kube-cluster-agent-0

What about the Kafka cluster?

Does the broker resume being part of the cluster?

Since the pod was recreated and assigned a different IP address, you should retrieve the new list of endpoints with:

bash

kubectl describe service kafka-svc
Name:              kafka-svc
Namespace:         default
Labels:            app=kafka-app
Selector:          app=kafka-app
Type:              ClusterIP
Port:              9092  9092/TCP
TargetPort:        9092/TCP
Endpoints:         10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092

Update the bootstrap broker endpoints list and query the test topic with:

bash@kafka-client

kafka-topics.sh --describe \
 --topic test \
 --bootstrap-server 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092
Topic: test
TopicId: QqrcLtJSRoufzOZqNc9KcQ
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824

Topic: test
Partition: 0
Leader: 2
Replicas: 1,2,0
Isr: 2,0,1

Notice how the list of in-sync replicas is 2,0,1.

So the kafka-1 pod (broker 1) could re-join the Kafka cluster and catch up on the additional message hello again, world!

Excellent!

However, in this example, you removed only one node.

What happens if multiple nodes are cordoned at the same time?

Surviving multiple nodes down for maintenance

Imagine a cluster administrator or automated maintenance operation trying to drain another node while one is already offline.

What happens?

Let's test it:

bash

kubectl drain k3d-kube-cluster-agent-0 \
  --delete-emptydir-data \
  --force \
  --ignore-daemonsets
node/k3d-kube-cluster-agent-0 cordoned
evicting pod default/kafka-1
pod/kafka-1 evicted
node/k3d-kube-cluster-agent-0 evicted

kubectl drain k3d-kube-cluster-agent-1 \
  --delete-emptydir-data \
  --force \
  --ignore-daemonsets
node/k3d-kube-cluster-agent-1 cordoned
evicting pod default/kafka-2
pod/kafka-2 evicted
node/k3d-kube-cluster-agent-1 evicted

The nodes k3d-kube-cluster-agent-0 and k3d-kube-cluster-agent-1 were drained and kafka-1 and kafka-2 were evicted.

Two brokers are made unavailable in the Kafka cluster

Now, if you query the pods:

bash

kubectl get pod -l app=kafka-app
NAME      READY   STATUS    RESTARTS
kafka-1   1/1     Pending   1
kafka-0   0/1     Running   0
kafka-2   0/1     Pending   0

With only one broker running, can producers and consumers continue with business as usual?

Let's run the kafka console producer script and produce another message:

bash@kafka-client

kafka-console-producer.sh \
  --topic test \
  --bootstrap-server 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092

At the > prompt, we can produce another hello?, world?! message.

prompt

kafka-console-producer.sh \
  --topic test \
  --request-required-acks all \
  --bootstrap-server 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092
>hello? world?!
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
ERROR Messages are rejected since there are fewer in-sync replicas than required.

The producer is blocked because there are fewer in-sync replicas than required.

You're unable to produce and consume messages when you have a single broker in your Kafka cluster

How about the consumer?

Let's test it:

bash@kafka-client

kafka-console-consumer.sh \
  --topic test \
  --from-beginning \
  --bootstrap-server 10.42.0.12:9092,10.42.0.14:9092,10.42.0.13:9092
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
WARN [Producer clientId=console-producer] NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
ERROR Messages are rejected since there are fewer in-sync replicas than required.

It also seems to be blocked on reading.

Kafka is officially unavailable.

Let's fix it so that failures of this kind can't happen again.

Pod Disruption Budget

You can limit disruptions due to maintenance by using a Pod Disruption Budget (PDB).

PodDisruptionBudgets defines the minimum number of replicas that must be available for that application to operate.

If you say that you need at least two pods and an event takes that number to one, Kubernetes will stop the operation to prevent decreasing the number of replicas.

A PodDisruptionBudget looks like this:

kafka-pdb.yaml

apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: kafka-pdb
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: kafka-app

There are (only) three fields:

Since the Kafka cluster should keep the number of instances above quorum, you will set the minAvailable to 2.

Let's create the object with:

bash

kubectl apply -f kafka-pdb.yaml
poddisruptionbudget.policy/kafka-pdb created

To test the PodDisruptionBudget, you should have at least two pods running and try to reduce the number by at least one.

Since you have a single Pod running, let's uncordon one of the nodes with:

bash

kubectl uncordon k3d-kube-cluster-agent-0
node/k3d-kube-cluster-agent-0 uncordoned

You should have two Pods Running and one Pending:

bash

kubectl get pod -l app=kafka-app
NAME      READY   STATUS    RESTARTS
kafka-1   1/1     Running   1
kafka-0   1/1     Running   0
kafka-2   0/1     Pending   0

If an eviction event exceeds the pod disruption budget, the disruption will be prevented.

Let's test that by draining the node again:

bash

kubectl drain k3d-kube-cluster-agent-0 \
  --delete-emptydir-data \
  --force \
  --ignore-daemonsets
node/k3d-kube-cluster-agent-0 cordoned
evicting pod default/kafka-0
error when evicting pods/"kafka-0" -n "default" (will retry after 5s):
Cannot evict pod as it would violate the pod's disruption budget.

Despite the error, notice that the node is still cordoned to prevent kubernetes scheduling new pods on it:

bash

kubectl get nodes
NAME                        STATUS                      ROLES                  VERSION
k3d-kube-cluster-server-0   Ready                       control-plane,master   v1.22.7+k3s1
k3d-kube-cluster-agent-1    Ready,SchedulingDisabled    <none>                 v1.22.7+k3s1
k3d-kube-cluster-agent-0    Ready,SchedulingDisabled    <none>                 v1.22.7+k3s1
k3d-kube-cluster-agent-2    Ready                       <none>                 v1.22.7+k3s1

But the Kafka pod is still running:

bash

kubectl get pods -o wide
NAME      READY   STATUS    IP           NODE
kafka-1   1/1     Running   10.42.0.15   k3d-kube-cluster-agent-0
kafka-0   1/1     Running   10.42.0.13   k3d-kube-cluster-agent-2
kafka-2   1/1     Pending   <none>       <none>

Also, pods not constrained by the PodDisruptionBudget will still be evicted and rescheduled.

The pod disruption budget prevented the Kafka cluster from becoming unavailable

The node k3d-kube-cluster-agent-1 is still unavailable; what if it doesn't come back?

Breaking badly: the node ain't coming back!

If all partitions are replicated with two in-sync replicas, no data should be lost if a Kubernetes node is removed permanently.

However, the broker pod will never be rescheduled due to the node affinity constraint on its persistent volume.

Let's explore what happens.

You can remove a node completely with:

bash

kubectl delete node k3d-kube-cluster-agent-1
node "k3d-kube-cluster-agent-1" deleted

In this case, kafka-2 is pending because k3d-kube-cluster-agent-1 has gone, and along with it, kafka-2's local data volume.

bash

kubectl get pods kafka-1 -o wide
NAME      READY   STATUS    RESTARTS
kafka-2   0/1     Pending   0

It can't be rescheduled onto another node because no other node can satisfy the nodeAffinity constraint on the Volume.

Do producers and consumers still work?

Can the cluster live with this?

With two brokers running, you should expect Kafka to be available for producers and consumers.

Producing an event with one unavailable Kafka broker

Let's make a quick sanity check.

Since most of the broker IP addresses have been rotated, let's retrieve them with:

bash

kubectl describe service kafka-svc
Name:              kafka-svc
Namespace:         default
Labels:            app=kafka-app
Selector:          app=kafka-app
Type:              ClusterIP
Port:              9092  9092/TCP
TargetPort:        9092/TCP
Endpoints:         10.42.0.15:9092,10.42.0.13:9092

Produce another message, "Hello World. Do you copy?":

bash@kafka-client

kafka-console-producer.sh \
  --topic test \
  --request-required-acks all \
  --bootstrap-server 10.42.0.15:9092,10.42.0.13:9092
>Hello World. Do you copy?

The message seems to be committed.

How about consuming the messages, both new and historical ones?

bash@kafka-client

kafka-console-consumer.sh \
  --topic test \
  --from-beginning \
  --bootstrap-server 10.42.0.15:9092,10.42.0.13:9092
hello world
hello again, world
Hello World. Do you copy?

Excellent!

All messages were retrieved!

Let's also examine the "test" topic with:

bash@kafka-client

kafka-topics.sh --describe \
  --topic test \
  --bootstrap-server 10.42.0.15:9092,10.42.0.13:9092
Topic: test
TopicId: QqrcLtJSRoufzOZqNc9KcQ
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824

Topic: test
Partition: 0
Leader: 1
Replicas: 1,2,0
Isr: 1,2

So, producers and consumers are still available, but can we live with only two broker nodes in your Kafka cluster?

No, not really.

The current state prohibits any voluntary maintenance.

Like this drain operation, for example:

bash

kubectl drain k3d-kube-cluster-agent-2 --ignore-daemonsets
node/k3d-kube-cluster-agent-2 cordoned
evicting pod default/kafka-0
error when evicting pods/"kafka-0" -n "default" (will retry after 5s):
Cannot evict pod as it would violate the pod's disruption budget.

So, how might we recover from this scenario?

Kafka-2 is dead. Long live its successor, kafka-2.

You can add a new Kubernetes worker in the same zone (zone-a) from which k3d-kube-cluster-agent-1 departed.

In this tutorial, using k3d, adding a new node looks like this:

bash

k3d node create kube-cluster-new-agent \
  --cluster kube-cluster \
  --k3s-node-label topology.kubernetes.io/zone=zone-b
INFO[0000] Adding 1 node(s) to the runtime local cluster 'kube-cluster'...
INFO[0000] Starting Node 'k3d-kube-cluster-new-agent-4'
INFO[0008] Successfully created 1 node(s)!

You can use kubectl get nodes to see it joined the cluster:

bash

kubectl get nodes
NAME                           STATUS     VERSION
k3d-kube-cluster-new-agent-4   Ready      v1.21.5+k3s2
# truncated output

Yup, there it is — joined and Ready.

You can clean up the old broker by deleting the PVC:

bash

kubectl delete pvc data-kafka-2
persistentvolumeclaim "data-kafka-0" deleted

When we delete the kafka-2 pod, kubernetes can reschedule it to a new node.

bash

kubectl delete po kafka-2
pod "kafka-2" deleted

You can observe the new Kafka broker pod bootstrap as it happens:

bash

kubectl get pods --watch
NAME              READY   STATUS              RESTARTS   AGE
kafka-0           1/1     Running             1          4d23h
kafka-1           1/1     Running             8          14d
kafka-2           0/1     ContainerCreating   0          14s
kafka-2           1/1     Running             0          51s

If you examine the status, you will notice that all Kafka brokers are running, and a new PersistentVolumeClaim and PersistentVolume is created:

bash

kubectl get pods,pvc,pv
NAME                  READY   STATUS
pod/kafka-2           1/1     Running
pod/kafka-1           1/1     Running
pod/kafka-0           1/1     Running

NAME                                 STATUS   VOLUME         CAPACITY   ACCESS MODES
persistentvolumeclaim/data-kafka-1   Bound    pvc-018e8d78   1Gi        RWO
persistentvolumeclaim/data-kafka-2   Bound    pvc-455a7f5b   1Gi        RWO
persistentvolumeclaim/data-kafka-0   Bound    pvc-abd6b6cf   1Gi        RWO

NAME                            CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS     CLAIM
persistentvolume/pvc-018e8d78   1Gi        RWO            Delete           Bound      default/data-kafka-1
persistentvolume/pvc-455a7f5b   1Gi        RWO            Delete           Bound      default/data-kafka-2
persistentvolume/pvc-fe291ef2   1Gi        RWO            Delete           Released   default/data-kafka-0
persistentvolume/pvc-abd6b6cf   1Gi        RWO            Delete           Bound      default/data-kafka-0

Is the replacement broker in sync?

With the test topic partitions replicated three times, you should expect kafka-0 to sync with the other brokers eventually.

Retrieve the new endpoints with:

bash

kubectl describe service kafka-svc
Name:              kafka-svc
Namespace:         default
Labels:            app=kafka-app
Selector:          app=kafka-app
Type:              ClusterIP
Port:              9092  9092/TCP
TargetPort:        9092/TCP
Endpoints:         10.42.0.15:9092,10.42.0.13:9092,10.42.1.16:9092

Let's query the test topic and inspect the status:

bash@kafka-client

kafka-topics.sh --describe \
  --topic test \
  --bootstrap-server 10.42.0.15:9092,10.42.0.13:9092,10.42.1.16:9092
Topic: test
TopicId: QqrcLtJSRoufzOZqNc9KcQ
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2,segment.bytes=1073741824

Topic: test
Partition: 0
Leader: 2
Replicas: 1,2,0
Isr: 2,0,1

Yep, the test topic has "2,0,1" as in-sync replicas, meaning all brokers are in sync for the test topic.

Let's consume the messages from the new broker to validate that.

First, let's get the pod IP address of kafka-0, our newest broker.

bash

kubectl get pod kafka-0 -o jsonpath='{.status.podIP}'
10.42.0.13

Second, let's run the console consumer from the Kafka client pod, specifying only kafka-0's pod IP address:

bash

kafka-console-consumer.sh \
  --topic test \
  --from-beginning \
  --bootstrap-server 10.42.0.13:9092
hello world
hello again, world
Hello World. Do you copy?

Great!

It's in sync!

Let's recap.

Summary

In this article, you designed and tested a Kafka cluster for high availability so that producers and consumers can continue to flow during a failure.

You also learned how you could leverage Kubernetes features to make your Kafka cluster highly available:

Please note that this article used Kafka in Kraft mode (aka Zookeeperless) for simplicity so that we could focus on the availability of a single stateful service in kubernetes.

However, KRaft is not production-ready yet.

In particular, partition re-assignments, unclean leader election, dynamically changing broker endpoints, and any kind of upgrade are unsupported in Kraft mode.

Your source for Kubernetes news

You are in!