Apache Kafka is an open-source distributed event and stream-processing platform written in Java. It is built to process demanding real-time data feeds and is inherently scalable, with high throughput and availability. The project provides shell scripts for producing and consuming messages to and from a Kafka cluster and administrative tasks, such as managing topics and partitions. While those are useful for exploring and experimenting, real-world applications access Kafka programmatically. For this purpose, Kafka offers many client libraries for widely used programming languages and environments.
In this tutorial, you’ll learn how to manage resources in a Kafka cluster using the KafkaAdminClient
API. You’ll also learn how to retrieve information about the cluster programmatically and how to create, list, and delete topics. You’ll also learn about the kcat CLI utility, which allows you to access your Kafka cluster without depending on Java.
Finally, you’ll learn how to set up Kafka Cruise Control, a daemon for automatically optimizing the inner processes of a Kafka cluster, yielding higher efficiency and reliability.
To complete this tutorial, you’ll need:
A machine with at least 4GB RAM and 2 CPUs. In case of an Ubuntu server, follow the Initial Server Setup for setup instructions.
Install Java Development Kit (JDK) 11 on your droplet or local machine. For instructions on installing Java on Ubuntu, see the How To Install Java with Apt on Ubuntu tutorial.
An Apache Kafka cluster with three brokers. You can follow the How To Set Up a Multi-Node Kafka Cluster using KRaft tutorial for setup instructions.
A Java project with a Kafka producer set up according to the How To Set Up a Kafka Producer to Source Data Through CLI tutorial.
Familiarity with standard directory layout of Java projects. For more information, see the Introduction to the Standard Directory Layout topic in the official Maven documentation.
Python 3 installed on your Droplet or local machine, with a new virtual environment set up. In case of an Ubuntu server, visit the How To Install Python 3 and Set Up a Programming Environment on an Ubuntu Server tutorial. You must only complete Step 1 and Step 2. In this tutorial, the virtual environment will be regarded as ~/venv
.
As part of the prerequisites, you have created a Java project with the necessary dependencies for programmatically accessing Kafka and producing messages into the java_demo
topic. In this step, you’ll create a class that utilizes Kafka’s AdminClient
class to manage the cluster programmatically.
Navigate to the directory where the dokafka
project is stored. The project structure specifies that the source code is stored under src/main/java/com/dokafka
.
You’ll store the class in a file named AdminClientDemo.java
. Create and open it for editing by running:
Add the following lines:
First, you define the AdminClientDemo
class and import the classes you’ll use. You also instantiate a Logger
as a member of the class. Similarly to ProducerDemo
, in the main()
method you first declare the Kafka cluster address (bootstrapServers
). Remember to replace kafka1.your_domain
with your actual domain name.
Then, you instantiate a Properties object, which holds pairs of keys and values representing the configuration for operating your Kafka consumer. You set the bootstrap.servers
property to the address of the Kafka cluster.
After that, you declare and instantiate an AdminClient
:
The client
can perform administrative actions in the cluster, such as listing, creating, and deleting topics, partitions, and offsets. Here, you use it to retrieve the number of nodes present in the cluster:
If no Node
objects are returned, you log an appropriate message. Otherwise, you output the received number of Kafka nodes. When you’re done, save and close the file.
Next, you’ll create a script to compile and run AdminClientDemo
. You’ll store it in a file called run-adminclient.sh
. Create and open it for editing:
Add the following lines:
Save and close the file, then mark it as executable:
Finally, try running it:
The output will be long, and its end should look similar to this:
AdminClientDemo
has successfully connected to your Kafka installation and retrieved the count of nodes.
KafkaAdminClient
provides the createTopics()
and listTopics()
methods, which you’ll now use to create a topic and subsequently list all that are present in the cluster.
First, open AdminClientDemo
for editing by running:
Modify the code to look like this:
You first create an instance of NewTopic
, which describes a topic that should be created in the cluster. You pass in its name ("newTopic"
) and the number of partitions and replicas, which you must cast to short
. The createTopics()
method accepts a Collection
of NewTopic
instances, so you construct a list with a single element containing only newTopic
.
Since this method is asynchronous, the flow of execution will return to main()
immediately after it is called while it continues running in the background. Since you’ll list the topics next, you need to wait for it to complete with result.all().get()
. result
contains a list of created topics, and getting all()
of them requires the operation to complete before proceeding.
Next, you construct a ListTopicsOptions
instance, which holds the configuration for the process of retrieving topics. You call it the listInteral()
method and pass it in as true
to allow fetching topics that Kafka considers for internal use only. Then, you pass it in to client.listTopics()
and retrieve the listings()
, through which you iterate in a loop and log the topic names.
Save and close the file when you’re done, then run the script:
All topics in the cluster will be listed at the end of the output:
To delete a topic (or multiple), you can use the deleteTopics()
method of KafkaAdminClient
. You’ll now use it to delete the newTopic
you’ve just created. Open AdminClientDemo
for editing:
Replace the code for topic creation with the highlighted lines:
Here, you pass in a Collection
of strings representing topic names to client.deleteTopics()
. Then, you wait for it to finish by getting deleted.all()
, which returns only when all topics have been processed. Save and close the file when you’re done, then execute the run-adminclient.sh
script:
You’ll see that the new topic is not present in the list of topics:
In this step, you’ve used the KafkaAdminClient
to access your cluster and retrieve information about it programmatically. You’ve also seen how to list, create, and delete topics in the cluster. You’ll now learn how to use the kcat
CLI tool to access your cluster.
In this step, you’ll learn how to download and install kcat, a command line tool for accessing and configuring Kafka clusters without the dependency on Java.
First, you’ll need to install the appropriate package for your OS. If you’re on MacOS, you can download kcat
with Brew:
On Debian and Ubuntu systems, you can install it as usual through apt
:
kafkacat
is an old name for kcat
, but is kept in the package manager for compatibility reasons. For installation instructions on other distributions, visit the official docs.
One of the basic operations that the scripts included with Kafka allow is to stream data from a topic. With kcat
, you can stream from multiple topics at once:
Running the following command will stream messages from the java_demo
topic into the console:
The Kafka-console-consumer.sh
script also allows you to read all messages on the topic, starting from the beginning. Pass in -t
to achieve the same with kcat
:
The output will contain the messages you’ve produced as part of the prerequisites:
You can also output the consumed messages as JSON by passing in -J
:
The output will look similar to this:
To produce messages to a topic, pass in -P
to switch to producer mode:
Similarly to kafka-console-producer.sh
, kcat
will accept messages separated by ENTER
. To exit the prompt, you can press CTRL + C
and follow with ENTER
.
You can use the templating ability of kcat
to output more information about a message along with it. To pass in a format of your choosing, use -f
as shown in the following command:
Here, you instruct it to read all messages from the beginning of the java_demo
topic and to output the topic name (%t
), partition number (%p
), offset (%o
), and message key (%k
), as well as the length of the value (%S
) and the message itself (%s
).
The output will look similar to this:
kcat used the string template and outputted additional information about the message record.
You can list cluster metadata in an organized fashion by passing in -L
:
The output will list brokers in the cluster, as well as all topics and their partitions:
Similarly to the Kafka-topics.sh
script, kcat
lists the partition leaders, replicas, and in-sync replica sets.
You can also pass in -J
to get the output as JSON:
The output will look like this:
In this step, you’ve installed kcat, a tool for easily accessing and managing Kafka clusters without relying on Java. You’ve seen how to retrieve information about the cluster and its topics, as well as how to produce and consume messages. You’ll now learn how to automate appropriate in-cluster rebalances with Cruise Control.
Cruise Control is an open-source project developed by LinkedIn that continuously monitors the activity of Kafka brokers in a cluster and rebalances the workloads to optimize resource usage and throughput. In this step, you’ll learn how to enable it for your cluster and monitor its operations.
By default, Cruise Control has sensible targets for ensuring the cluster behaves optimally, called goals. Some main goals include ensuring that each topic has a proper number of replicas, keeping CPU, network, and disk usage balanced and under specified limits, and having enough replicas of topic partitions. Implementing custom goals is also supported.
You’ll need to compile Cruise Control from the source. First, clone the official Git repository by running:
Navigate to it:
Then, use Gradle to compile the project:
The build process will take some time. The end of the output will look similar to this:
Cruise Control is now compiled along with its metrics reporter, which is located under cruise-control-metrics-reporter/build/libs/
in a JAR file. The reporter sends metrics about a broker into a topic on the cluster that Cruise Control can monitor.
Then, run the following command to copy all dependencies to the target directory:
The output will be:
Next, you’ll need to configure every Kafka broker in the cluster to use the special metrics reporter. Copy it into the libs/
directory where Kafka is installed by running:
Then, you’ll need to modify the Kafka broker configuration to utilize the new reporter. Open the server.properties
file of the broker for editing:
Add the following line to the end of the file:
Save and close the file when you’re done. Restart the broker by running:
All brokers in the cluster must have the Cruise Control metric reporter activated, so repeat the above steps on the other nodes to obtain the metric reporter JAR. After a few minutes, you can list the topics in the cluster using kcat
to verify that the metrics reporter has started working:
In order to optimize brokers correctly, Cruise Control needs to be aware of the hardware specs of the individual brokers. This configuration is stored in a file called capacity.json
, located under config
. Open it for editing:
In its default state, the file looks like this:
The brokerCapacities
array contains elements tied to broker IDs. Each contains fields for disk space, CPU allowance, and network throughput to and from the broker.
You deployed three Kafka brokers as part of the prerequisites. Modify the file by adding the correct specifications for each of them:
Set the disk capacities according to your Droplet sizes, then save and close the file.
Next, you’ll configure Cruise Control to connect to your cluster in KRaft mode. You’ll modify the cruisecontrol.properties
file under config
. Open it for editing by running the following:
Find the following lines:
The bootstrap.servers
parameter specifies which broker, and by the extent of its cluster, to connect. Modify it to look like this:
The kafka.broker.failure.detection.enable
parameter instructs Cruise Control to disregard ZooKeeper settings and use KRaft mode natively. Replace kafka1.your_domain
with your Kafka broker domain name.
Then, find the following lines:
Make the first parameter config active:
Here, you set the capacity.json
file (residing under config/
) as the specification for broker capacities.
Save and close the file when you’re done. Now that Cruise Control is configured, run it in a separate terminal using the following command:
There will be a lot of streaming output as Cruise Control monitors and optimizes your cluster continually.
Cruise Control exposes a REST API at port 9090
for configuration and administrative tasks. However, the project also maintains cccli
, a Python tool that wraps the API in an intuitive command-line interface. In this section, you’ll install it and learn how to fetch information about your cluster.
Navigate to the Python 3 virtual environment you’ve created as part of the prerequisites:
Activate it by running:
Then, use pip
to install the tool:
The end of the output will be similar to this:
The installer has bound the cccli
command to the appropriate Python package.
To fetch statistics about the current cluster load, run the following command, passing in the address of your Cruise Control instance to -a
:
The output will detail the parameters it is watching over, supplied by the metrics reporter:
Cruise Control can be configured to auto-heal the cluster in case of broker failure, goal violation, or metric anomaly. Run the following command to enable the reparation process in case of a broker failure:
The will show the old and new state of the touched setting:
In this step, you’ve downloaded and set up the Cruise Control metrics reporter on each Kafka broker node in your cluster. Then, you’ve configured and started Cruise Control, which continually monitors and optimizes your cluster’s operations. You’ve also seen how to install and use cccli
, a tool for monitoring and configuring Cruise Control on the fly from the command line. For more information about what it offers, visit the official docs.
You can now manage a Kafka cluster from your code using the KafkaAdminClient
API. You’ve also learned how to use kcat
and set up Cruise Control to increase efficiency and reliability in your Kafka cluster.
The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!