Tutorial

How To Manage Kafka Programmatically

Published on May 24, 2024
How To Manage Kafka Programmatically

Introduction

Apache Kafka is an open-source distributed event and stream-processing platform written in Java. It is built to process demanding real-time data feeds and is inherently scalable, with high throughput and availability. The project provides shell scripts for producing and consuming messages to and from a Kafka cluster and administrative tasks, such as managing topics and partitions. While those are useful for exploring and experimenting, real-world applications access Kafka programmatically. For this purpose, Kafka offers many client libraries for widely used programming languages and environments.

In this tutorial, you’ll learn how to manage resources in a Kafka cluster using the KafkaAdminClient API. You’ll also learn how to retrieve information about the cluster programmatically and how to create, list, and delete topics. You’ll also learn about the kcat CLI utility, which allows you to access your Kafka cluster without depending on Java.

Finally, you’ll learn how to set up Kafka Cruise Control, a daemon for automatically optimizing the inner processes of a Kafka cluster, yielding higher efficiency and reliability.

Prerequisites

To complete this tutorial, you’ll need:

Step 1 - Utilizing Kafka AdminClient

As part of the prerequisites, you have created a Java project with the necessary dependencies for programmatically accessing Kafka and producing messages into the java_demo topic. In this step, you’ll create a class that utilizes Kafka’s AdminClient class to manage the cluster programmatically.

Navigate to the directory where the dokafka project is stored. The project structure specifies that the source code is stored under src/main/java/com/dokafka.

You’ll store the class in a file named AdminClientDemo.java. Create and open it for editing by running:

nano src/main/java/com/dokafka/AdminClientDemo.java

Add the following lines:

src/main/java/com/dokafka/AdminClientDemo.java
package com.dokafka;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class AdminClientDemo {
  private static final Logger log = LoggerFactory.getLogger(AdminClientDemo.class);

  public static void main(String[] args) {
    String bootstrapServers = "kafka1.your_domain:9092";

    Properties properties = new Properties();
    properties.put("bootstrap.servers", bootstrapServers);
    final KafkaAdminClient client = AdminClient.create(properties);

    try {
      Collection<Node> nodes = client.describeCluster().nodes().get();
      if (nodes == null)
        log.info("There seem to be no nodes in the cluster!");
      else
        log.info(String.format("Count of nodes: %s\n", nodes.size()));
    } catch (Exception e) {
      log.error("An error occurred", e);
    }
  }
}

First, you define the AdminClientDemo class and import the classes you’ll use. You also instantiate a Logger as a member of the class. Similarly to ProducerDemo, in the main() method you first declare the Kafka cluster address (bootstrapServers). Remember to replace kafka1.your_domain with your actual domain name.

Then, you instantiate a Properties object, which holds pairs of keys and values representing the configuration for operating your Kafka consumer. You set the bootstrap.servers property to the address of the Kafka cluster.

After that, you declare and instantiate an AdminClient:

src/main/java/com/dokafka/AdminClientDemo.java
final KafkaAdminClient client = AdminClient.create(properties);

The client can perform administrative actions in the cluster, such as listing, creating, and deleting topics, partitions, and offsets. Here, you use it to retrieve the number of nodes present in the cluster:

src/main/java/com/dokafka/AdminClientDemo.java
...
    try {
      Collection<Node> nodes = client.describeCluster().nodes().get();
      if (nodes == null)
        log.info("There seem to be no nodes in the cluster!");
      else
        log.info(String.format("Count of nodes: %s\n", nodes.size()));
    } catch (Exception e) {
      log.error("An error occurred", e);
    }
...

If no Node objects are returned, you log an appropriate message. Otherwise, you output the received number of Kafka nodes. When you’re done, save and close the file.

Next, you’ll create a script to compile and run AdminClientDemo. You’ll store it in a file called run-adminclient.sh. Create and open it for editing:

nano run-adminclient.sh

Add the following lines:

run-adminclient.sh
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.AdminClientDemo

Save and close the file, then mark it as executable:

chmod +x run-adminclient.sh

Finally, try running it:

./run-adminclient.sh

The output will be long, and its end should look similar to this:

Output
... [main] INFO com.dokafka.AdminClientDemo - Count of nodes: 3

AdminClientDemo has successfully connected to your Kafka installation and retrieved the count of nodes.

Creating and Listing Topics

KafkaAdminClient provides the createTopics() and listTopics() methods, which you’ll now use to create a topic and subsequently list all that are present in the cluster.

First, open AdminClientDemo for editing by running:

nano src/main/java/com/dokafka/AdminClientDemo.java

Modify the code to look like this:

nano src/main/java/com/dokafka/AdminClientDemo.java
...
    try {
      NewTopic newTopic = new NewTopic("newTopic", 1, (short) 1);
      CreateTopicsResult result = client.createTopics(
        Collections.singleton(newTopic)
      );
      result.all().get();

      ListTopicsOptions options = new ListTopicsOptions();
      options.listInternal(true);
      Collection<TopicListing> topics = client.listTopics(options).listings().get();
      for (TopicListing topic: topics) {
        log.info("Topic: " + topic.name());
      }
    } catch (Exception e) {
      log.error("An error occurred", e);
    }
...

You first create an instance of NewTopic, which describes a topic that should be created in the cluster. You pass in its name ("newTopic") and the number of partitions and replicas, which you must cast to short. The createTopics() method accepts a Collection of NewTopic instances, so you construct a list with a single element containing only newTopic.

Since this method is asynchronous, the flow of execution will return to main() immediately after it is called while it continues running in the background. Since you’ll list the topics next, you need to wait for it to complete with result.all().get(). result contains a list of created topics, and getting all() of them requires the operation to complete before proceeding.

Next, you construct a ListTopicsOptions instance, which holds the configuration for the process of retrieving topics. You call it the listInteral() method and pass it in as true to allow fetching topics that Kafka considers for internal use only. Then, you pass it in to client.listTopics() and retrieve the listings(), through which you iterate in a loop and log the topic names.

Save and close the file when you’re done, then run the script:

./run-adminclient.sh

All topics in the cluster will be listed at the end of the output:

Output
... [main] INFO com.dokafka.AdminClientDemo - Topic: newTopic [main] INFO com.dokafka.AdminClientDemo - Topic: java_demo

Deleting Topics

To delete a topic (or multiple), you can use the deleteTopics() method of KafkaAdminClient. You’ll now use it to delete the newTopic you’ve just created. Open AdminClientDemo for editing:

nano src/main/java/com/dokafka/AdminClientDemo.java

Replace the code for topic creation with the highlighted lines:

nano src/main/java/com/dokafka/AdminClientDemo.java
...
      DeleteTopicsResult deleted = client.deleteTopics(Collections.singleton("newTopic"));
      deleted.all().get();
      log.info("Topic newTopic deleted!");

      ListTopicsOptions options = new ListTopicsOptions();
      options.listInternal(true);
      Collection<TopicListing> topics = client.listTopics(options).listings().get();
      for (TopicListing topic: topics) {
        log.info("Topic: " + topic.name());
      }
...

Here, you pass in a Collection of strings representing topic names to client.deleteTopics(). Then, you wait for it to finish by getting deleted.all(), which returns only when all topics have been processed. Save and close the file when you’re done, then execute the run-adminclient.sh script:

./run-adminclient.sh

You’ll see that the new topic is not present in the list of topics:

Output
[main] INFO com.dokafka.AdminClientDemo - Topic newTopic deleted! [main] INFO com.dokafka.AdminClientDemo - Topic: java_demo

In this step, you’ve used the KafkaAdminClient to access your cluster and retrieve information about it programmatically. You’ve also seen how to list, create, and delete topics in the cluster. You’ll now learn how to use the kcat CLI tool to access your cluster.

Step 2 - Using kcat to Manage the Cluster

In this step, you’ll learn how to download and install kcat, a command line tool for accessing and configuring Kafka clusters without the dependency on Java.

First, you’ll need to install the appropriate package for your OS. If you’re on MacOS, you can download kcat with Brew:

brew install kcat

On Debian and Ubuntu systems, you can install it as usual through apt:

sudo apt install kafkacat

kafkacat is an old name for kcat, but is kept in the package manager for compatibility reasons. For installation instructions on other distributions, visit the official docs.

Producing and Consuming Messages

One of the basic operations that the scripts included with Kafka allow is to stream data from a topic. With kcat, you can stream from multiple topics at once:

kcat -b your_broker_address:9092 first_topic second_topic ...

Running the following command will stream messages from the java_demo topic into the console:

kcat -b kafka1.your_domain:9092 java_demo

The Kafka-console-consumer.sh script also allows you to read all messages on the topic, starting from the beginning. Pass in -t to achieve the same with kcat:

kcat -b kafka1.your_domain:9092 -t java_demo

The output will contain the messages you’ve produced as part of the prerequisites:

Output
% Auto-selecting Consumer mode (use -P or -C to override) % Reached end of topic java_demo [1] at offset 0 % Reached end of topic java_demo [2] at offset 0 Hello World! % Reached end of topic java_demo [0] at offset 0 % Reached end of topic java_demo [3] at offset 0 % Reached end of topic java_demo [4] at offset 1 % Reached end of topic java_demo [5] at offset 0

You can also output the consumed messages as JSON by passing in -J:

kcat -b kafka1.your_domain:9092 -t java_demo -J

The output will look similar to this:

Output
% Auto-selecting Consumer mode (use -P or -C to override) % Reached end of topic java_demo [2] at offset 0 % Reached end of topic java_demo [0] at offset 0 % Reached end of topic java_demo [1] at offset 0 {"topic":"java_demo","partition":4,"offset":0,"tstype":"create","ts":1714922509999,"broker":1,"key":null,"payload":"Hello World!"} % Reached end of topic java_demo [3] at offset 0 % Reached end of topic java_demo [5] at offset 0 % Reached end of topic java_demo [4] at offset 1

To produce messages to a topic, pass in -P to switch to producer mode:

kcat -b kafka1.your_domain:9092 -t java_demo -P

Similarly to kafka-console-producer.sh, kcat will accept messages separated by ENTER. To exit the prompt, you can press CTRL + C and follow with ENTER.

You can use the templating ability of kcat to output more information about a message along with it. To pass in a format of your choosing, use -f as shown in the following command:

kcat -b kafka1.your_domain:9092 -t java_demo -f 'Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n'

Here, you instruct it to read all messages from the beginning of the java_demo topic and to output the topic name (%t), partition number (%p), offset (%o), and message key (%k), as well as the length of the value (%S) and the message itself (%s).

The output will look similar to this:

% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic java_demo [2] at offset 0
% Reached end of topic java_demo [1] at offset 0
Topic java_demo[4], offset: 0, key: , payload: 12 bytes: Hello World!
% Reached end of topic java_demo [0] at offset 0
% Reached end of topic java_demo [3] at offset 0
% Reached end of topic java_demo [4] at offset 1
% Reached end of topic java_demo [5] at offset 0

kcat used the string template and outputted additional information about the message record.

Listing Cluster Metadata

You can list cluster metadata in an organized fashion by passing in -L:

kcat -b kafka1.your_domain:9092 -L

The output will list brokers in the cluster, as well as all topics and their partitions:

Output
3 brokers: broker 1 at kafka1.your_domain:9092 broker 2 at kafka2.your_domain:9092 (controller) broker 3 at kafka3.your_domain:9092 1 topics: topic "java_demo" with 6 partitions: partition 0, leader 3, replicas: 3,1 isrs: 3,1 partition 1, leader 1, replicas: 1,2 isrs: 1,2 partition 2, leader 2, replicas: 2,3 isrs: 2,3 partition 3, leader 2, replicas: 2,1 isrs: 2,1 partition 4, leader 1, replicas: 1,3 isrs: 1,3 partition 5, leader 3, replicas: 3,2 isrs: 3,2

Similarly to the Kafka-topics.sh script, kcat lists the partition leaders, replicas, and in-sync replica sets.

You can also pass in -J to get the output as JSON:

kcat -b kafka1.your_domain:9092 -L -J

The output will look like this:

Output
{ "originating_broker": { "id": 2, "name": "kafka2.your_domain:9092/2" }, "query": { "topic": "*" }, "controllerid": 3, "brokers": [ { "id": 1, "name": "kafka1.your_domain:9092" }, { "id": 2, "name": "kafka2.your_domain:9092" }, { "id": 3, "name": "kafka3.your_domain:9092" } ], "topics": [ { "topic": "java_demo", "partitions": [ { "partition": 0, "leader": 3, "replicas": [ { "id": 3 } ], "isrs": [ { "id": 3 } ] }, ... ] } ] }

In this step, you’ve installed kcat, a tool for easily accessing and managing Kafka clusters without relying on Java. You’ve seen how to retrieve information about the cluster and its topics, as well as how to produce and consume messages. You’ll now learn how to automate appropriate in-cluster rebalances with Cruise Control.

Step 3 - Automating Rebalances with Kafka Cruise Control

Cruise Control is an open-source project developed by LinkedIn that continuously monitors the activity of Kafka brokers in a cluster and rebalances the workloads to optimize resource usage and throughput. In this step, you’ll learn how to enable it for your cluster and monitor its operations.

By default, Cruise Control has sensible targets for ensuring the cluster behaves optimally, called goals. Some main goals include ensuring that each topic has a proper number of replicas, keeping CPU, network, and disk usage balanced and under specified limits, and having enough replicas of topic partitions. Implementing custom goals is also supported.

You’ll need to compile Cruise Control from the source. First, clone the official Git repository by running:

git clone https://github.com/linkedin/cruise-control.git

Navigate to it:

cd cruise-control

Then, use Gradle to compile the project:

./gradlew jar

The build process will take some time. The end of the output will look similar to this:

Output
... BUILD SUCCESSFUL in 2m 41s 17 actionable tasks: 17 executed

Cruise Control is now compiled along with its metrics reporter, which is located under cruise-control-metrics-reporter/build/libs/ in a JAR file. The reporter sends metrics about a broker into a topic on the cluster that Cruise Control can monitor.

Then, run the following command to copy all dependencies to the target directory:

./gradlew jar copyDependantLibs

The output will be:

Output
BUILD SUCCESSFUL in 15s 17 actionable tasks: 1 executed, 16 up-to-date

Next, you’ll need to configure every Kafka broker in the cluster to use the special metrics reporter. Copy it into the libs/ directory where Kafka is installed by running:

cp cruise-control-metrics-reporter/build/libs/* /home/kafka/kafka/libs/

Then, you’ll need to modify the Kafka broker configuration to utilize the new reporter. Open the server.properties file of the broker for editing:

nano /home/kafka/kafka/config/kraft/server.properties

Add the following line to the end of the file:

...
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter

Save and close the file when you’re done. Restart the broker by running:

sudo systemctl restart kafka

All brokers in the cluster must have the Cruise Control metric reporter activated, so repeat the above steps on the other nodes to obtain the metric reporter JAR. After a few minutes, you can list the topics in the cluster using kcat to verify that the metrics reporter has started working:

Output
... topic "__CruiseControlMetrics" with 6 partitions: partition 0, leader 3, replicas: 3,2 isrs: 3,2 partition 1, leader 2, replicas: 2,3 isrs: 2,3 partition 2, leader 3, replicas: 3,2 isrs: 3,2 partition 3, leader 2, replicas: 2,3 isrs: 2,3 partition 4, leader 2, replicas: 2,3 isrs: 2,3 partition 5, leader 3, replicas: 3,2 isrs: 3,2

In order to optimize brokers correctly, Cruise Control needs to be aware of the hardware specs of the individual brokers. This configuration is stored in a file called capacity.json, located under config. Open it for editing:

nano config/capacity.json

In its default state, the file looks like this:

config/capacity.json
{
  "brokerCapacities":[
    {
      "brokerId": "-1",
      "capacity": {
        "DISK": "100000",
        "CPU": "100",
        "NW_IN": "10000",
        "NW_OUT": "10000"
      },
      "doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
    },
    {
      "brokerId": "0",
      "capacity": {
        "DISK": "500000",
        "CPU": "100",
        "NW_IN": "50000",
        "NW_OUT": "50000"
      },
      "doc": "This overrides the capacity for broker 0."
    }
  ]
}

The brokerCapacities array contains elements tied to broker IDs. Each contains fields for disk space, CPU allowance, and network throughput to and from the broker.

You deployed three Kafka brokers as part of the prerequisites. Modify the file by adding the correct specifications for each of them:

config/capacity.json
{
  "brokerCapacities":[
    {
      "brokerId": "-1",
      "capacity": {
        "DISK": "100000",
        "CPU": "100",
        "NW_IN": "10000",
        "NW_OUT": "10000"
      },
      "doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
    },
    {
      "brokerId": "1",
      "capacity": {
        "DISK": "100000",
        "CPU": "100",
        "NW_IN": "10000",
        "NW_OUT": "10000"
      },
      "doc": ""
    },
    {
      "brokerId": "2",
      "capacity": {
        "DISK": "100000",
        "CPU": "100",
        "NW_IN": "10000",
        "NW_OUT": "10000"
      },
      "doc": ""
    },
    {
      "brokerId": "3",
      "capacity": {
        "DISK": "100000",
        "CPU": "100",
        "NW_IN": "10000",
        "NW_OUT": "10000"
      },
      "doc": ""
    }
  ]
}

Set the disk capacities according to your Droplet sizes, then save and close the file.

Next, you’ll configure Cruise Control to connect to your cluster in KRaft mode. You’ll modify the cruisecontrol.properties file under config. Open it for editing by running the following:

nano config/cruisecontrol.properties

Find the following lines:

config/cruisecontrol.properties
...
# The Kafka cluster to control.
bootstrap.servers=localhost:9092
...

The bootstrap.servers parameter specifies which broker, and by the extent of its cluster, to connect. Modify it to look like this:

config/cruisecontrol.properties
...
# The Kafka cluster to control.
bootstrap.servers=kafka1.your_domain:9092

# Switch to KRaft mode
kafka.broker.failure.detection.enable=true
...

The kafka.broker.failure.detection.enable parameter instructs Cruise Control to disregard ZooKeeper settings and use KRaft mode natively. Replace kafka1.your_domain with your Kafka broker domain name.

Then, find the following lines:

config/cruisecontrol.properties
...
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
#capacity.config.file=config/capacity.json
capacity.config.file=config/capacityJBOD.json
...

Make the first parameter config active:

config/cruisecontrol.properties
...
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
capacity.config.file=config/capacity.json
#capacity.config.file=config/capacityJBOD.json
...

Here, you set the capacity.json file (residing under config/) as the specification for broker capacities.

Save and close the file when you’re done. Now that Cruise Control is configured, run it in a separate terminal using the following command:

./kafka-cruise-control-start.sh config/cruisecontrol.properties

There will be a lot of streaming output as Cruise Control monitors and optimizes your cluster continually.

Using the Cruise Control CLI

Cruise Control exposes a REST API at port 9090 for configuration and administrative tasks. However, the project also maintains cccli, a Python tool that wraps the API in an intuitive command-line interface. In this section, you’ll install it and learn how to fetch information about your cluster.

Navigate to the Python 3 virtual environment you’ve created as part of the prerequisites:

cd ~/venv

Activate it by running:

./bin/activate

Then, use pip to install the tool:

pip install cruise-control-client

The end of the output will be similar to this:

Output
... Installing collected packages: urllib3, idna, charset-normalizer, certifi, requests, cruise-control-client Successfully installed certifi-2024.2.2 charset-normalizer-3.3.2 cruise-control-client-1.1.3 idna-3.7 requests-2.31.0 urllib3-2.2.1

The installer has bound the cccli command to the appropriate Python package.

To fetch statistics about the current cluster load, run the following command, passing in the address of your Cruise Control instance to -a:

cccli -a localhost:9090 load

The output will detail the parameters it is watching over, supplied by the metrics reporter:

Output
Starting long-running poll of http://localhost:9090/kafkacruisecontrol/load?allow_capacity_estimation=False HOST BROKER RACK DISK_CAP(MB) DISK(MB)/_(%)_ CORE_NUM CPU(%) NW_IN_CAP(KB/s) LEADER_NW_IN(KB/s) FOLLOWER_NW_IN(KB/s) NW_OUT_CAP(KB/s) NW_OUT(KB/s) PNW_OUT(KB/s) LEADERS/REPLICAS kafka1.your_domain, 1,kafka1.your_domain, 100000.000, 0.023/00.00, 1, 0.999, 10000.000, 0.012, 0.002, 10000.000, 0.028, 0.080, 23/45 kafka2.your_domain, 2,kafka2.your_domain, 100000.000, 0.238/00.00, 1, 0.819, 10000.000, 0.028, 0.009, 10000.000, 0.066, 0.105, 27/48 kafka3.your_domain, 3,kafka3.your_domain, 100000.000, 0.383/00.00, 1, 1.292, 10000.000, 0.041, 0.006, 10000.000, 0.081, 0.108, 28/49

Cruise Control can be configured to auto-heal the cluster in case of broker failure, goal violation, or metric anomaly. Run the following command to enable the reparation process in case of a broker failure:

cccli -a localhost:9090 admin --enable-self-healing-for broker_failure

The will show the old and new state of the touched setting:

Output
{ selfHealingEnabledBefore: {BROKER_FAILURE=false}, selfHealingEnabledAfter: {BROKER_FAILURE=true} }

In this step, you’ve downloaded and set up the Cruise Control metrics reporter on each Kafka broker node in your cluster. Then, you’ve configured and started Cruise Control, which continually monitors and optimizes your cluster’s operations. You’ve also seen how to install and use cccli, a tool for monitoring and configuring Cruise Control on the fly from the command line. For more information about what it offers, visit the official docs.

Conclusion

You can now manage a Kafka cluster from your code using the KafkaAdminClient API. You’ve also learned how to use kcat and set up Cruise Control to increase efficiency and reliability in your Kafka cluster.


The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products


About the authors
Default avatar
Savic

author


Default avatar

Sr Technical Writer


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Featured on Community

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel