Apache Kafka is an open-source distributed event and stream-processing platform written in Java. It is built to process demanding real-time data feeds and is inherently scalable, with high throughput and availability. The project provides shell scripts for producing and consuming messages to and from a Kafka cluster and administrative tasks, such as managing topics and partitions. While those are useful for exploring and experimenting, real-world applications access Kafka programmatically. For this purpose, Kafka offers many client libraries for widely used programming languages and environments.
In this tutorial, you’ll learn how to manage resources in a Kafka cluster using the KafkaAdminClient
API. You’ll also learn how to retrieve information about the cluster programmatically and how to create, list, and delete topics. You’ll also learn about the kcat CLI utility, which allows you to access your Kafka cluster without depending on Java.
Finally, you’ll learn how to set up Kafka Cruise Control, a daemon for automatically optimizing the inner processes of a Kafka cluster, yielding higher efficiency and reliability.
To complete this tutorial, you’ll need:
A machine with at least 4GB RAM and 2 CPUs. In case of an Ubuntu server, follow the Initial Server Setup for setup instructions.
Install Java Development Kit (JDK) 11 on your droplet or local machine. For instructions on installing Java on Ubuntu, see the How To Install Java with Apt on Ubuntu tutorial.
An Apache Kafka cluster with three brokers. You can follow the How To Set Up a Multi-Node Kafka Cluster using KRaft tutorial for setup instructions.
A Java project with a Kafka producer set up according to the How To Set Up a Kafka Producer to Source Data Through CLI tutorial.
Familiarity with standard directory layout of Java projects. For more information, see the Introduction to the Standard Directory Layout topic in the official Maven documentation.
Python 3 installed on your Droplet or local machine, with a new virtual environment set up. In case of an Ubuntu server, visit the How To Install Python 3 and Set Up a Programming Environment on an Ubuntu Server tutorial. You must only complete Step 1 and Step 2. In this tutorial, the virtual environment will be regarded as ~/venv
.
As part of the prerequisites, you have created a Java project with the necessary dependencies for programmatically accessing Kafka and producing messages into the java_demo
topic. In this step, you’ll create a class that utilizes Kafka’s AdminClient
class to manage the cluster programmatically.
Navigate to the directory where the dokafka
project is stored. The project structure specifies that the source code is stored under src/main/java/com/dokafka
.
You’ll store the class in a file named AdminClientDemo.java
. Create and open it for editing by running:
nano src/main/java/com/dokafka/AdminClientDemo.java
Add the following lines:
package com.dokafka;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class AdminClientDemo {
private static final Logger log = LoggerFactory.getLogger(AdminClientDemo.class);
public static void main(String[] args) {
String bootstrapServers = "kafka1.your_domain:9092";
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
final KafkaAdminClient client = AdminClient.create(properties);
try {
Collection<Node> nodes = client.describeCluster().nodes().get();
if (nodes == null)
log.info("There seem to be no nodes in the cluster!");
else
log.info(String.format("Count of nodes: %s\n", nodes.size()));
} catch (Exception e) {
log.error("An error occurred", e);
}
}
}
First, you define the AdminClientDemo
class and import the classes you’ll use. You also instantiate a Logger
as a member of the class. Similarly to ProducerDemo
, in the main()
method you first declare the Kafka cluster address (bootstrapServers
). Remember to replace kafka1.your_domain
with your actual domain name.
Then, you instantiate a Properties object, which holds pairs of keys and values representing the configuration for operating your Kafka consumer. You set the bootstrap.servers
property to the address of the Kafka cluster.
After that, you declare and instantiate an AdminClient
:
final KafkaAdminClient client = AdminClient.create(properties);
The client
can perform administrative actions in the cluster, such as listing, creating, and deleting topics, partitions, and offsets. Here, you use it to retrieve the number of nodes present in the cluster:
...
try {
Collection<Node> nodes = client.describeCluster().nodes().get();
if (nodes == null)
log.info("There seem to be no nodes in the cluster!");
else
log.info(String.format("Count of nodes: %s\n", nodes.size()));
} catch (Exception e) {
log.error("An error occurred", e);
}
...
If no Node
objects are returned, you log an appropriate message. Otherwise, you output the received number of Kafka nodes. When you’re done, save and close the file.
Next, you’ll create a script to compile and run AdminClientDemo
. You’ll store it in a file called run-adminclient.sh
. Create and open it for editing:
nano run-adminclient.sh
Add the following lines:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.AdminClientDemo
Save and close the file, then mark it as executable:
chmod +x run-adminclient.sh
Finally, try running it:
./run-adminclient.sh
The output will be long, and its end should look similar to this:
Output...
[main] INFO com.dokafka.AdminClientDemo - Count of nodes: 3
AdminClientDemo
has successfully connected to your Kafka installation and retrieved the count of nodes.
KafkaAdminClient
provides the createTopics()
and listTopics()
methods, which you’ll now use to create a topic and subsequently list all that are present in the cluster.
First, open AdminClientDemo
for editing by running:
nano src/main/java/com/dokafka/AdminClientDemo.java
Modify the code to look like this:
...
try {
NewTopic newTopic = new NewTopic("newTopic", 1, (short) 1);
CreateTopicsResult result = client.createTopics(
Collections.singleton(newTopic)
);
result.all().get();
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
Collection<TopicListing> topics = client.listTopics(options).listings().get();
for (TopicListing topic: topics) {
log.info("Topic: " + topic.name());
}
} catch (Exception e) {
log.error("An error occurred", e);
}
...
You first create an instance of NewTopic
, which describes a topic that should be created in the cluster. You pass in its name ("newTopic"
) and the number of partitions and replicas, which you must cast to short
. The createTopics()
method accepts a Collection
of NewTopic
instances, so you construct a list with a single element containing only newTopic
.
Since this method is asynchronous, the flow of execution will return to main()
immediately after it is called while it continues running in the background. Since you’ll list the topics next, you need to wait for it to complete with result.all().get()
. result
contains a list of created topics, and getting all()
of them requires the operation to complete before proceeding.
Next, you construct a ListTopicsOptions
instance, which holds the configuration for the process of retrieving topics. You call it the listInteral()
method and pass it in as true
to allow fetching topics that Kafka considers for internal use only. Then, you pass it in to client.listTopics()
and retrieve the listings()
, through which you iterate in a loop and log the topic names.
Save and close the file when you’re done, then run the script:
./run-adminclient.sh
All topics in the cluster will be listed at the end of the output:
Output...
[main] INFO com.dokafka.AdminClientDemo - Topic: newTopic
[main] INFO com.dokafka.AdminClientDemo - Topic: java_demo
To delete a topic (or multiple), you can use the deleteTopics()
method of KafkaAdminClient
. You’ll now use it to delete the newTopic
you’ve just created. Open AdminClientDemo
for editing:
nano src/main/java/com/dokafka/AdminClientDemo.java
Replace the code for topic creation with the highlighted lines:
...
DeleteTopicsResult deleted = client.deleteTopics(Collections.singleton("newTopic"));
deleted.all().get();
log.info("Topic newTopic deleted!");
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
Collection<TopicListing> topics = client.listTopics(options).listings().get();
for (TopicListing topic: topics) {
log.info("Topic: " + topic.name());
}
...
Here, you pass in a Collection
of strings representing topic names to client.deleteTopics()
. Then, you wait for it to finish by getting deleted.all()
, which returns only when all topics have been processed. Save and close the file when you’re done, then execute the run-adminclient.sh
script:
./run-adminclient.sh
You’ll see that the new topic is not present in the list of topics:
Output[main] INFO com.dokafka.AdminClientDemo - Topic newTopic deleted!
[main] INFO com.dokafka.AdminClientDemo - Topic: java_demo
In this step, you’ve used the KafkaAdminClient
to access your cluster and retrieve information about it programmatically. You’ve also seen how to list, create, and delete topics in the cluster. You’ll now learn how to use the kcat
CLI tool to access your cluster.
In this step, you’ll learn how to download and install kcat, a command line tool for accessing and configuring Kafka clusters without the dependency on Java.
First, you’ll need to install the appropriate package for your OS. If you’re on MacOS, you can download kcat
with Brew:
brew install kcat
On Debian and Ubuntu systems, you can install it as usual through apt
:
sudo apt install kafkacat
kafkacat
is an old name for kcat
, but is kept in the package manager for compatibility reasons. For installation instructions on other distributions, visit the official docs.
One of the basic operations that the scripts included with Kafka allow is to stream data from a topic. With kcat
, you can stream from multiple topics at once:
kcat -b your_broker_address:9092 first_topic second_topic ...
Running the following command will stream messages from the java_demo
topic into the console:
kcat -b kafka1.your_domain:9092 java_demo
The Kafka-console-consumer.sh
script also allows you to read all messages on the topic, starting from the beginning. Pass in -t
to achieve the same with kcat
:
kcat -b kafka1.your_domain:9092 -t java_demo
The output will contain the messages you’ve produced as part of the prerequisites:
Output% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic java_demo [1] at offset 0
% Reached end of topic java_demo [2] at offset 0
Hello World!
% Reached end of topic java_demo [0] at offset 0
% Reached end of topic java_demo [3] at offset 0
% Reached end of topic java_demo [4] at offset 1
% Reached end of topic java_demo [5] at offset 0
You can also output the consumed messages as JSON by passing in -J
:
kcat -b kafka1.your_domain:9092 -t java_demo -J
The output will look similar to this:
Output% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic java_demo [2] at offset 0
% Reached end of topic java_demo [0] at offset 0
% Reached end of topic java_demo [1] at offset 0
{"topic":"java_demo","partition":4,"offset":0,"tstype":"create","ts":1714922509999,"broker":1,"key":null,"payload":"Hello World!"}
% Reached end of topic java_demo [3] at offset 0
% Reached end of topic java_demo [5] at offset 0
% Reached end of topic java_demo [4] at offset 1
To produce messages to a topic, pass in -P
to switch to producer mode:
kcat -b kafka1.your_domain:9092 -t java_demo -P
Similarly to kafka-console-producer.sh
, kcat
will accept messages separated by ENTER
. To exit the prompt, you can press CTRL + C
and follow with ENTER
.
You can use the templating ability of kcat
to output more information about a message along with it. To pass in a format of your choosing, use -f
as shown in the following command:
kcat -b kafka1.your_domain:9092 -t java_demo -f 'Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n'
Here, you instruct it to read all messages from the beginning of the java_demo
topic and to output the topic name (%t
), partition number (%p
), offset (%o
), and message key (%k
), as well as the length of the value (%S
) and the message itself (%s
).
The output will look similar to this:
% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic java_demo [2] at offset 0
% Reached end of topic java_demo [1] at offset 0
Topic java_demo[4], offset: 0, key: , payload: 12 bytes: Hello World!
% Reached end of topic java_demo [0] at offset 0
% Reached end of topic java_demo [3] at offset 0
% Reached end of topic java_demo [4] at offset 1
% Reached end of topic java_demo [5] at offset 0
kcat used the string template and outputted additional information about the message record.
You can list cluster metadata in an organized fashion by passing in -L
:
kcat -b kafka1.your_domain:9092 -L
The output will list brokers in the cluster, as well as all topics and their partitions:
Output 3 brokers:
broker 1 at kafka1.your_domain:9092
broker 2 at kafka2.your_domain:9092 (controller)
broker 3 at kafka3.your_domain:9092
1 topics:
topic "java_demo" with 6 partitions:
partition 0, leader 3, replicas: 3,1 isrs: 3,1
partition 1, leader 1, replicas: 1,2 isrs: 1,2
partition 2, leader 2, replicas: 2,3 isrs: 2,3
partition 3, leader 2, replicas: 2,1 isrs: 2,1
partition 4, leader 1, replicas: 1,3 isrs: 1,3
partition 5, leader 3, replicas: 3,2 isrs: 3,2
Similarly to the Kafka-topics.sh
script, kcat
lists the partition leaders, replicas, and in-sync replica sets.
You can also pass in -J
to get the output as JSON:
kcat -b kafka1.your_domain:9092 -L -J
The output will look like this:
Output{
"originating_broker": {
"id": 2,
"name": "kafka2.your_domain:9092/2"
},
"query": {
"topic": "*"
},
"controllerid": 3,
"brokers": [
{
"id": 1,
"name": "kafka1.your_domain:9092"
},
{
"id": 2,
"name": "kafka2.your_domain:9092"
},
{
"id": 3,
"name": "kafka3.your_domain:9092"
}
],
"topics": [
{
"topic": "java_demo",
"partitions": [
{
"partition": 0,
"leader": 3,
"replicas": [
{
"id": 3
}
],
"isrs": [
{
"id": 3
}
]
},
...
]
}
]
}
In this step, you’ve installed kcat, a tool for easily accessing and managing Kafka clusters without relying on Java. You’ve seen how to retrieve information about the cluster and its topics, as well as how to produce and consume messages. You’ll now learn how to automate appropriate in-cluster rebalances with Cruise Control.
Cruise Control is an open-source project developed by LinkedIn that continuously monitors the activity of Kafka brokers in a cluster and rebalances the workloads to optimize resource usage and throughput. In this step, you’ll learn how to enable it for your cluster and monitor its operations.
By default, Cruise Control has sensible targets for ensuring the cluster behaves optimally, called goals. Some main goals include ensuring that each topic has a proper number of replicas, keeping CPU, network, and disk usage balanced and under specified limits, and having enough replicas of topic partitions. Implementing custom goals is also supported.
You’ll need to compile Cruise Control from the source. First, clone the official Git repository by running:
git clone https://github.com/linkedin/cruise-control.git
Navigate to it:
cd cruise-control
Then, use Gradle to compile the project:
./gradlew jar
The build process will take some time. The end of the output will look similar to this:
Output...
BUILD SUCCESSFUL in 2m 41s
17 actionable tasks: 17 executed
Cruise Control is now compiled along with its metrics reporter, which is located under cruise-control-metrics-reporter/build/libs/
in a JAR file. The reporter sends metrics about a broker into a topic on the cluster that Cruise Control can monitor.
Then, run the following command to copy all dependencies to the target directory:
./gradlew jar copyDependantLibs
The output will be:
OutputBUILD SUCCESSFUL in 15s
17 actionable tasks: 1 executed, 16 up-to-date
Next, you’ll need to configure every Kafka broker in the cluster to use the special metrics reporter. Copy it into the libs/
directory where Kafka is installed by running:
cp cruise-control-metrics-reporter/build/libs/* /home/kafka/kafka/libs/
Then, you’ll need to modify the Kafka broker configuration to utilize the new reporter. Open the server.properties
file of the broker for editing:
nano /home/kafka/kafka/config/kraft/server.properties
Add the following line to the end of the file:
...
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
Save and close the file when you’re done. Restart the broker by running:
sudo systemctl restart kafka
All brokers in the cluster must have the Cruise Control metric reporter activated, so repeat the above steps on the other nodes to obtain the metric reporter JAR. After a few minutes, you can list the topics in the cluster using kcat
to verify that the metrics reporter has started working:
Output...
topic "__CruiseControlMetrics" with 6 partitions:
partition 0, leader 3, replicas: 3,2 isrs: 3,2
partition 1, leader 2, replicas: 2,3 isrs: 2,3
partition 2, leader 3, replicas: 3,2 isrs: 3,2
partition 3, leader 2, replicas: 2,3 isrs: 2,3
partition 4, leader 2, replicas: 2,3 isrs: 2,3
partition 5, leader 3, replicas: 3,2 isrs: 3,2
In order to optimize brokers correctly, Cruise Control needs to be aware of the hardware specs of the individual brokers. This configuration is stored in a file called capacity.json
, located under config
. Open it for editing:
nano config/capacity.json
In its default state, the file looks like this:
{
"brokerCapacities":[
{
"brokerId": "-1",
"capacity": {
"DISK": "100000",
"CPU": "100",
"NW_IN": "10000",
"NW_OUT": "10000"
},
"doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
},
{
"brokerId": "0",
"capacity": {
"DISK": "500000",
"CPU": "100",
"NW_IN": "50000",
"NW_OUT": "50000"
},
"doc": "This overrides the capacity for broker 0."
}
]
}
The brokerCapacities
array contains elements tied to broker IDs. Each contains fields for disk space, CPU allowance, and network throughput to and from the broker.
You deployed three Kafka brokers as part of the prerequisites. Modify the file by adding the correct specifications for each of them:
{
"brokerCapacities":[
{
"brokerId": "-1",
"capacity": {
"DISK": "100000",
"CPU": "100",
"NW_IN": "10000",
"NW_OUT": "10000"
},
"doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
},
{
"brokerId": "1",
"capacity": {
"DISK": "100000",
"CPU": "100",
"NW_IN": "10000",
"NW_OUT": "10000"
},
"doc": ""
},
{
"brokerId": "2",
"capacity": {
"DISK": "100000",
"CPU": "100",
"NW_IN": "10000",
"NW_OUT": "10000"
},
"doc": ""
},
{
"brokerId": "3",
"capacity": {
"DISK": "100000",
"CPU": "100",
"NW_IN": "10000",
"NW_OUT": "10000"
},
"doc": ""
}
]
}
Set the disk capacities according to your Droplet sizes, then save and close the file.
Next, you’ll configure Cruise Control to connect to your cluster in KRaft mode. You’ll modify the cruisecontrol.properties
file under config
. Open it for editing by running the following:
nano config/cruisecontrol.properties
Find the following lines:
...
# The Kafka cluster to control.
bootstrap.servers=localhost:9092
...
The bootstrap.servers
parameter specifies which broker, and by the extent of its cluster, to connect. Modify it to look like this:
...
# The Kafka cluster to control.
bootstrap.servers=kafka1.your_domain:9092
# Switch to KRaft mode
kafka.broker.failure.detection.enable=true
...
The kafka.broker.failure.detection.enable
parameter instructs Cruise Control to disregard ZooKeeper settings and use KRaft mode natively. Replace kafka1.your_domain
with your Kafka broker domain name.
Then, find the following lines:
...
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
#capacity.config.file=config/capacity.json
capacity.config.file=config/capacityJBOD.json
...
Make the first parameter config active:
...
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
capacity.config.file=config/capacity.json
#capacity.config.file=config/capacityJBOD.json
...
Here, you set the capacity.json
file (residing under config/
) as the specification for broker capacities.
Save and close the file when you’re done. Now that Cruise Control is configured, run it in a separate terminal using the following command:
./kafka-cruise-control-start.sh config/cruisecontrol.properties
There will be a lot of streaming output as Cruise Control monitors and optimizes your cluster continually.
Cruise Control exposes a REST API at port 9090
for configuration and administrative tasks. However, the project also maintains cccli
, a Python tool that wraps the API in an intuitive command-line interface. In this section, you’ll install it and learn how to fetch information about your cluster.
Navigate to the Python 3 virtual environment you’ve created as part of the prerequisites:
cd ~/venv
Activate it by running:
./bin/activate
Then, use pip
to install the tool:
pip install cruise-control-client
The end of the output will be similar to this:
Output...
Installing collected packages: urllib3, idna, charset-normalizer, certifi, requests, cruise-control-client
Successfully installed certifi-2024.2.2 charset-normalizer-3.3.2 cruise-control-client-1.1.3 idna-3.7 requests-2.31.0 urllib3-2.2.1
The installer has bound the cccli
command to the appropriate Python package.
To fetch statistics about the current cluster load, run the following command, passing in the address of your Cruise Control instance to -a
:
cccli -a localhost:9090 load
The output will detail the parameters it is watching over, supplied by the metrics reporter:
OutputStarting long-running poll of http://localhost:9090/kafkacruisecontrol/load?allow_capacity_estimation=False
HOST BROKER RACK DISK_CAP(MB) DISK(MB)/_(%)_ CORE_NUM CPU(%) NW_IN_CAP(KB/s) LEADER_NW_IN(KB/s) FOLLOWER_NW_IN(KB/s) NW_OUT_CAP(KB/s) NW_OUT(KB/s) PNW_OUT(KB/s) LEADERS/REPLICAS
kafka1.your_domain, 1,kafka1.your_domain, 100000.000, 0.023/00.00, 1, 0.999, 10000.000, 0.012, 0.002, 10000.000, 0.028, 0.080, 23/45
kafka2.your_domain, 2,kafka2.your_domain, 100000.000, 0.238/00.00, 1, 0.819, 10000.000, 0.028, 0.009, 10000.000, 0.066, 0.105, 27/48
kafka3.your_domain, 3,kafka3.your_domain, 100000.000, 0.383/00.00, 1, 1.292, 10000.000, 0.041, 0.006, 10000.000, 0.081, 0.108, 28/49
Cruise Control can be configured to auto-heal the cluster in case of broker failure, goal violation, or metric anomaly. Run the following command to enable the reparation process in case of a broker failure:
cccli -a localhost:9090 admin --enable-self-healing-for broker_failure
The will show the old and new state of the touched setting:
Output{
selfHealingEnabledBefore: {BROKER_FAILURE=false}, selfHealingEnabledAfter: {BROKER_FAILURE=true}
}
In this step, you’ve downloaded and set up the Cruise Control metrics reporter on each Kafka broker node in your cluster. Then, you’ve configured and started Cruise Control, which continually monitors and optimizes your cluster’s operations. You’ve also seen how to install and use cccli
, a tool for monitoring and configuring Cruise Control on the fly from the command line. For more information about what it offers, visit the official docs.
You can now manage a Kafka cluster from your code using the KafkaAdminClient
API. You’ve also learned how to use kcat
and set up Cruise Control to increase efficiency and reliability in your Kafka cluster.
The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!