Tutorial

How To Set Up a Multi-Node Kafka Cluster using KRaft

How To Set Up a Multi-Node Kafka Cluster using KRaft

Introduction

Apache Kafka is an open-source distributed event and stream-processing platform written in Java, built to process demanding real-time data feeds. It is inherently scalable, with high throughput and availability. It is designed to be fault-tolerant with support for hundreds of nodes per cluster.

In this tutorial, you’ll create a Kafka cluster that uses the KRaft consensus protocol. You’ll learn how to configure nodes to be a part of a cluster and observe how topic partitions are assigned to different nodes. You’ll also learn how to assign topics to specific brokers in the cluster.

Prerequisites

To complete this tutorial, you’ll need:

  • Three Droplets available with at least 4GB RAM and 2 CPUs. In the case of an Ubuntu server, follow the Ubuntu Initial Server Setup for setup instructions.
  • A fully registered domain name with three subdomains pointed towards the three droplets. This tutorial will refer to them individually as kafkaX.your_domain throughout. You can purchase a domain name on Namecheap, get one for free on Freenom, or use the domain registrar of your choice.
  • Apache Kafka installed and configured on your Droplets. For setup instructions, you can follow the Introduction to Kafka tutorial. You must only complete Step 1 and Step 2.

Step 1 - Configuring Kafka Nodes

In this step, you’ll configure the three Kafka servers you’ve created as part the prerequisites to be a part of the same KRaft cluster. With KRaft, the nodes themselves can organize and perform administrative tasks without the overhead of depending on Apache ZooKeeper.

Configuring the First Node

You’ll start by configuring the first node. First, stop the service on the first Droplet by running:

sudo systemctl stop kafka

As user kafka, navigate to the directory where Kafka resides and open its configuration file for editing by running:

vi /config/kraft/server.properties

Find the following lines:

config/kraft/server.properties
...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...

These three parameters configure the Kafka node to act as both broker and controller, meaning that it will receive and consume data (broker) and perform administrative tasks (controller). This separation is useful in big deployments where controllers can be kept separate for increased efficiency and redundancy.

node.id specified the node’s ID in the cluster. This is the first node, so it should be left at 1. All nodes must have unique node IDs, so the second and third nodes will have an ID of 2 and 3, respectively.

controller.quorum.voters maps node IDs to their respective addresses and ports for communication. This is where you’ll specify the addresses of all cluster nodes so that each node is aware of all others. Modify the line to look like this:

config/kraft/server.properties
...
controller.quorum.voters=1@kafka1.your_domain:9093,2@kafka2.your_domain:9093,3@kafka3.your_domain:9093
...

Here, you list all three nodes in the cluster with their respective IDs. Remember to replace your_domain with the address of your domain, which you’ve set up during the prerequisites.

Next, find the following lines in the file:

config/kraft/server.properties
...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...

listeners defines the addresses the Kafka node listens on, while advertised.listeners specifies the addresses that will be passed on to clients to connect to the node. This allows you to specify a subset of actual addresses clients should use.

Modify the lines to look like the following, replacing your_domain with your actual domain name:

config/kraft/server.properties
...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...

Since this node will be in a cluster, you’ve explicitly made the addresses point to the droplet on which it’s running.

Then, find the num.partitions setting:

config/kraft/server.properties
...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...

As the comment states, this configures each new topic’s default number of partitions. Since you have three nodes, set it to a multiple of two:

config/kraft/server.properties
...
num.partitions=6
...

A value of 6 here ensures that each node will hold two topic partitions by default.

Next, you’ll configure the replication factor for internal topics, which keeps the consumer offsets and transaction states. Find the following lines:

config/kraft/server.properties
...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...

Set them to the following values:

...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...

Here, you specify that at least two nodes must be in sync regarding the internal metadata. When you’re done, save and close the file.

After setting the default partition number, you must reinitialize the log storage. First, delete the existing log files by running:

rm -rf /home/kafka/kafka-logs/*

Then, generate a new cluster ID and store it an environment variable:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Show it in the terminal:

echo $KAFKA_CLUSTER_ID

The output will be the ID:

Output
Mjj4bch9Q3-B0TEXv8_zPg

Note that value; you’ll need it to configure the second and third node.

Finally, run the following command to generate the log storage:

  1. ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

The output will be similar to this:

Output
... Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.

Configuring Second and Third Node

Configuring other nodes is very similar to what you’ve just done for the first node. Take note to also update the node.id:

config/kraft/server.properties
...
node.id=node_number
...

The appropriate values are 2 and 3 for the second and third node, respectively, and set the appropriate addresses for listeners and advertised.listeners.

When regenerating the log storage, reuse the cluster ID from the first node:

KAFKA_CLUSTER_ID="your_cluster_id"

When you’re done, start the Kafka service on all three nodes by running:

sudo systemctl start kafka

In this step, you’ve configured the three Kafka nodes to be a part of a KRaft cluster. You’ll create a topic and produce and consume messages on your cluster.

Step 2 - Connecting to the Cluster

In this step, you’ll connect to the Kafka cluster using the shell scripts bundled with Kafka. You’ll also create a topic and try producing and consuming data from the cluster. Then, you’ll bring one of the nodes down and observe how Kafka mitigates the loss.

Kafka provides the kafka-metadata-quorum.sh script, which shows information about the cluster and its members. Run the following command to execute it:

  1. ./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status

You connect to one of the nodes at port 9093, which is the endpoint for the controller (but not for the broker). Remember to replace kafka1.your_domain with a domain pointing to one of your Kafka nodes.

The output will be similar to this:

Output
ClusterId: G3TeIZoeTSCvG2YOWvPE2w LeaderId: 3 LeaderEpoch: 2 HighWatermark: 383 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 55 CurrentVoters: [1,2,3] CurrentObservers: []

The script lists basic information about the state of the cluster. In the shown output, you see that node 3 is elected as the leader, and all three nodes ([1,2,3]) are in the voting pool and agree on that decision.

Create a topic called first-topic by running:

  1. ./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2

The output will be:

Created topic first-topic.

Then, run the kafka-topics.sh script to see how the partitions are arranged on the nodes:

  1. ./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Setting the replication factor to 2 ensures the topic will be available on at least two nodes.

The output will be similar to this:

Output
Topic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2 Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

You can see that each partition has its leader, two replicas, and two in-sync replica sets (ISR). The partition leader is a broker node that serves the partition data to the clients, while replicas only keep copies. A replica node is considered ISR if it’s caught up with the leader in the past ten seconds by default. This time interval is configurable on a per-topic basis.

Now that you’ve created a topic, you’ll produce its messages using the kafka-console-producer.sh script. Run the following command to start the producer:

  1. ./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092

You’ll see an empty prompt:

>

The producer is waiting for you to enter a textual message. Input test and press ENTER. The prompt will look like this:

>Hello World!
>

The producer is now waiting for the next message, meaning the previous one was successfully communicated to Kafka. You can input as many messages as you want for testing. To exit the producer, press CTRL+C.

You’ll need a consumer to read back the messages from the topic. Kafka provides a simple consumer as kafka-console-consumer.sh. Execute it by running:

  1. ./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

You’ll see the messages being read from the topic:

Output
Hello World! ...

Simulating Node Failure

On the third Kafka node, stop the service by running:

sudo systemctl stop kafka

Then, describe the topic by running:

  1. ./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

The output will be similar to this:

Output
Topic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1 Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2 Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1 Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2 Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

Even though node 3 is listed as a replica, it’s missing from ISR sets because it’s unavailable. Once it rejoins the cluster, it will sync with other nodes and try to regain its previous place.

Try reading the messages from first-topic again:

  1. ./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

You’ll see that they are accessible as usual:

Output
Hello World! ...

Thanks to the presence of replicas, the first two nodes take over and serve the consumer. You can now start Kafka on the third server:

sudo systemctl start kafka

In this step, you’ve seen how Kafka mitigates the unavailability of a node in the cluster. You’ll now learn how to exclude a node from the cluster gracefully.

Step 3 - Migrating Data Between Nodes

In this step, you’ll learn how to migrate topics between nodes in a Kafka cluster. When adding nodes to an existing cluster with topics, Kafka won’t automatically transfer any partitions to it, which you may want to do. This is also useful for removing nodes, as existing partitions won’t automatically move to the remaining nodes.

Kafka provides a script called kafka-reassign-partitions.sh, which can generate, execute and verify transition plans. You’ll use it to create a plan for moving partitions of first-topic to the first two nodes.

First, you’ll need to define which topics should be moved. The script accepts a JSON file with the definition for topics, so create and open it for editing:

vi topics-to-move.json

Add the following lines:

topics-to-move.json
{
    "topics": [
        {
            "topic": "first-topic"
        }
    ],
    "version": 1
}

Under topics, you define an object referencing the first-topic. When you’re done, save and close the file.

Run the following command to generate the migration plan, replacing kafka1.your_domain with a domain pointing to one of your Kafka nodes:

  1. ./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

You pass in "1,2" to --broker-list, signifying the IDs of the target brokers.

The output will be similar to this:

Output
Current partition replica assignment {"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

The script generated two plans in total, describing the current and proposed partition layouts, respectively. The first plan is provided if you need to revert the changes later. Note the second plan, which you’ll store in a separate file called migration-plan.json. Create and open it for editing:

vi migration-plan.json

Add the second execution plan:

migration-plan.json
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Save and close the file. Then, run the following command to execute it:

  1. ./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

The output will be:

Output
Current partition replica assignment {"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

The script noted that the migration had started. To see the progress of the migration, pass in --verify instead:

  1. ./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

After some time, the output will look similar to this:

Output
Status of partition reassignment: Reassignment of partition first-topic-0 is completed. Reassignment of partition first-topic-1 is completed. Reassignment of partition first-topic-2 is completed. Reassignment of partition first-topic-3 is completed. Reassignment of partition first-topic-4 is completed. Reassignment of partition first-topic-5 is completed. Clearing broker-level throttles on brokers 1,2,3 Clearing topic-level throttles on topic first-topic

You can now describe first-topic to verify that no partitions are on broker 3:

  1. ./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

The output will look like this:

Output
Topic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2 Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1

Only brokers 1 and 2 are present as replicas and ISRs, meaning the migration succeeded.

In this step, you’ve created a migration plan for moving the first topic from broker 3 to the remaining ones and learned how to verify that the migration went smoothly.

Conclusion

You now have a Kafka cluster, consisting of three nodes that communicate using the KRaft protocol. You’ve also learned how to inspect the cluster and the layout of partitions. You’ve tested cluster redundancy by bringing down a node and reading from a topic. Finally, you’ve learned how to reassign topics to nodes in the cluster.


The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about us


About the authors
Default avatar
Savic

author


Default avatar

Sr Technical Writer


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Featured on Community

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel