Apache Kafka is an open-source distributed event and stream-processing platform written in Java, built to process demanding real-time data feeds. It is inherently scalable, with high throughput and availability. It is designed to be fault-tolerant with support for hundreds of nodes per cluster.
In this tutorial, you’ll create a Kafka cluster that uses the KRaft consensus protocol. You’ll learn how to configure nodes to be a part of a cluster and observe how topic partitions are assigned to different nodes. You’ll also learn how to assign topics to specific brokers in the cluster.
To complete this tutorial, you’ll need:
kafkaX.your_domain
throughout. You can purchase a domain name on Namecheap, get one for free on Freenom, or use the domain registrar of your choice.In this step, you’ll configure the three Kafka servers you’ve created as part the prerequisites to be a part of the same KRaft cluster. With KRaft, the nodes themselves can organize and perform administrative tasks without the overhead of depending on Apache ZooKeeper.
You’ll start by configuring the first node. First, stop the service on the first Droplet by running:
sudo systemctl stop kafka
As user kafka
, navigate to the directory where Kafka resides and open its configuration file for editing by running:
vi /config/kraft/server.properties
Find the following lines:
...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...
These three parameters configure the Kafka node to act as both broker and controller, meaning that it will receive and consume data (broker) and perform administrative tasks (controller). This separation is useful in big deployments where controllers can be kept separate for increased efficiency and redundancy.
node.id
specified the node’s ID in the cluster. This is the first node, so it should be left at 1
. All nodes must have unique node IDs, so the second and third nodes will have an ID of 2
and 3
, respectively.
controller.quorum.voters
maps node IDs to their respective addresses and ports for communication. This is where you’ll specify the addresses of all cluster nodes so that each node is aware of all others. Modify the line to look like this:
...
controller.quorum.voters=1@kafka1.your_domain:9093,2@kafka2.your_domain:9093,3@kafka3.your_domain:9093
...
Here, you list all three nodes in the cluster with their respective IDs. Remember to replace your_domain
with the address of your domain, which you’ve set up during the prerequisites.
Next, find the following lines in the file:
...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...
listeners
defines the addresses the Kafka node listens on, while advertised.listeners
specifies the addresses that will be passed on to clients to connect to the node. This allows you to specify a subset of actual addresses clients should use.
Modify the lines to look like the following, replacing your_domain
with your actual domain name:
...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...
Since this node will be in a cluster, you’ve explicitly made the addresses point to the droplet on which it’s running.
Then, find the num.partitions
setting:
...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...
As the comment states, this configures each new topic’s default number of partitions. Since you have three nodes, set it to a multiple of two:
...
num.partitions=6
...
A value of 6
here ensures that each node will hold two topic partitions by default.
Next, you’ll configure the replication factor for internal topics, which keeps the consumer offsets and transaction states. Find the following lines:
...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...
Set them to the following values:
...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...
Here, you specify that at least two nodes must be in sync regarding the internal metadata. When you’re done, save and close the file.
After setting the default partition number, you must reinitialize the log storage. First, delete the existing log files by running:
rm -rf /home/kafka/kafka-logs/*
Then, generate a new cluster ID and store it an environment variable:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
Show it in the terminal:
echo $KAFKA_CLUSTER_ID
The output will be the ID:
OutputMjj4bch9Q3-B0TEXv8_zPg
Note that value; you’ll need it to configure the second and third node.
Finally, run the following command to generate the log storage:
- ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
The output will be similar to this:
Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
Configuring other nodes is very similar to what you’ve just done for the first node. Take note to also update the node.id
:
...
node.id=node_number
...
The appropriate values are 2
and 3
for the second and third node, respectively, and set the appropriate addresses for listeners
and advertised.listeners
.
When regenerating the log storage, reuse the cluster ID from the first node:
KAFKA_CLUSTER_ID="your_cluster_id"
When you’re done, start the Kafka service on all three nodes by running:
sudo systemctl start kafka
In this step, you’ve configured the three Kafka nodes to be a part of a KRaft cluster. You’ll create a topic and produce and consume messages on your cluster.
In this step, you’ll connect to the Kafka cluster using the shell scripts bundled with Kafka. You’ll also create a topic and try producing and consuming data from the cluster. Then, you’ll bring one of the nodes down and observe how Kafka mitigates the loss.
Kafka provides the kafka-metadata-quorum.sh
script, which shows information about the cluster and its members. Run the following command to execute it:
- ./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status
You connect to one of the nodes at port 9093
, which is the endpoint for the controller (but not for the broker). Remember to replace kafka1.your_domain
with a domain pointing to one of your Kafka nodes.
The output will be similar to this:
OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []
The script lists basic information about the state of the cluster. In the shown output, you see that node 3
is elected as the leader, and all three nodes ([1,2,3]
) are in the voting pool and agree on that decision.
Create a topic called first-topic
by running:
- ./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2
The output will be:
Created topic first-topic.
Then, run the kafka-topics.sh
script to see how the partitions are arranged on the nodes:
- ./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic
Setting the replication factor to 2
ensures the topic will be available on at least two nodes.
The output will be similar to this:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
You can see that each partition has its leader, two replicas, and two in-sync replica sets (ISR). The partition leader is a broker node that serves the partition data to the clients, while replicas only keep copies. A replica node is considered ISR if it’s caught up with the leader in the past ten seconds by default. This time interval is configurable on a per-topic basis.
Now that you’ve created a topic, you’ll produce its messages using the kafka-console-producer.sh
script. Run the following command to start the producer:
- ./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092
You’ll see an empty prompt:
>
The producer is waiting for you to enter a textual message. Input test
and press ENTER
. The prompt will look like this:
>Hello World!
>
The producer is now waiting for the next message, meaning the previous one was successfully communicated to Kafka. You can input as many messages as you want for testing. To exit the producer, press CTRL+C
.
You’ll need a consumer to read back the messages from the topic. Kafka provides a simple consumer as kafka-console-consumer.sh
. Execute it by running:
- ./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092
You’ll see the messages being read from the topic:
OutputHello World!
...
On the third Kafka node, stop the service by running:
sudo systemctl stop kafka
Then, describe the topic by running:
- ./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic
The output will be similar to this:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
Even though node 3
is listed as a replica, it’s missing from ISR sets because it’s unavailable. Once it rejoins the cluster, it will sync with other nodes and try to regain its previous place.
Try reading the messages from first-topic
again:
- ./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092
You’ll see that they are accessible as usual:
OutputHello World!
...
Thanks to the presence of replicas, the first two nodes take over and serve the consumer. You can now start Kafka on the third server:
sudo systemctl start kafka
In this step, you’ve seen how Kafka mitigates the unavailability of a node in the cluster. You’ll now learn how to exclude a node from the cluster gracefully.
In this step, you’ll learn how to migrate topics between nodes in a Kafka cluster. When adding nodes to an existing cluster with topics, Kafka won’t automatically transfer any partitions to it, which you may want to do. This is also useful for removing nodes, as existing partitions won’t automatically move to the remaining nodes.
Kafka provides a script called kafka-reassign-partitions.sh
, which can generate, execute and verify transition plans. You’ll use it to create a plan for moving partitions of first-topic
to the first two nodes.
First, you’ll need to define which topics should be moved. The script accepts a JSON file with the definition for topics, so create and open it for editing:
vi topics-to-move.json
Add the following lines:
{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}
Under topics
, you define an object referencing the first-topic
. When you’re done, save and close the file.
Run the following command to generate the migration plan, replacing kafka1.your_domain
with a domain pointing to one of your Kafka nodes:
- ./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
You pass in "1,2"
to --broker-list
, signifying the IDs of the target brokers.
The output will be similar to this:
OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}
The script generated two plans in total, describing the current and proposed partition layouts, respectively. The first plan is provided if you need to revert the changes later. Note the second plan, which you’ll store in a separate file called migration-plan.json
. Create and open it for editing:
vi migration-plan.json
Add the second execution plan:
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}
Save and close the file. Then, run the following command to execute it:
- ./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute
The output will be:
OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5
The script noted that the migration had started. To see the progress of the migration, pass in --verify
instead:
- ./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify
After some time, the output will look similar to this:
OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic
You can now describe first-topic
to verify that no partitions are on broker 3
:
- ./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic
The output will look like this:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1
Only brokers 1
and 2
are present as replicas and ISRs, meaning the migration succeeded.
In this step, you’ve created a migration plan for moving the first topic
from broker 3
to the remaining ones and learned how to verify that the migration went smoothly.
You now have a Kafka cluster, consisting of three nodes that communicate using the KRaft protocol. You’ve also learned how to inspect the cluster and the layout of partitions. You’ve tested cluster redundancy by bringing down a node and reading from a topic. Finally, you’ve learned how to reassign topics to nodes in the cluster.
The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!