The author selected Open Source Initiative to receive a donation as part of the Write for DOnations program.
Apache Kafka provides shell scripts for producing and consuming basic textual messages to and from a Kafka cluster. While those are useful for exploring and experimenting, real-world applications access Kafka programmatically. For this purpose, Kafka offers many client libraries for widely used programming languages and environments.
In this tutorial, you’ll create a Java program that produces data into a Kafka topic. You’ll create a Java project using Apache Maven, a tool for building and packaging Java projects, and add the Kafka client library as a dependency. Then, you’ll implement a class that leverages the Kafka client by producing messages and retrieving in-cluster metadata about them.
To complete this tutorial, you’ll need:
In this step, you’ll install Apache Maven and use it to generate a project that you’ll use to interface with Kafka.
On Ubuntu, Maven is readily available in the official repositories. For installing Maven on a MacOS system, please refer to the How to Install Maven on Mac OS tutorial.
First, update your existing list of packages by running:
sudo apt update
Run the following command to install it:
sudo apt install maven
Verify that it’s installed by reading out its version number:
mvn --version
The output will be similar to the following, depending on your Java version and platform:
OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"
Next, create a directory where you’ll store your Java projects for working with Kafka:
mkdir ~/kafka-projects
Navigate to the newly created directory:
cd ~/kafka-projects
Then, generate an empty Java project by running:
mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false
Here, you instruct Maven to generate a new project named dokafka
, with a group ID of com.dokafka
. The group ID uniquely identifies this project across the Maven ecosystem. The project will be generated according to the maven-archetype-quickstart
archetype, which is how Maven calls templates.
There will be a lot of output, especially if this is the first time Maven is being run. The end of the output will look like this:
Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------
Maven has downloaded the necessary Java packages from its central repository and created the dokafka
project using the maven-archetype-quickstart
template.
Navigate to the project directory by running:
cd dokafka
The structure of the project looks like this:
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.java
As part of the prerequisites, you learned about the standard Maven project structure that you see here. The src/main/java
directory holds the project source code, src/test/java
contains the test sources, and pom.xml
in the root of the project is the main configuration file for Maven.
The project contains only one source file, App.java
. Show its contents to see what Maven generated:
cat src/main/java/com/dokafka/App.java
The output will be:
package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}
To run this code, you’ll first need to build the project by running:
mvn package
Maven will compile the code and package it into a JAR file for execution. The end of the output will look like this, signifying that it’s completed:
Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------
Maven placed the resulting JAR file under the target
directory. To execute the App
class you’ve just built, run the following command, passing in the full identifier of the class:
java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App
The output will be:
OutputHello World!
You’ve installed Maven and created an empty Java project. You’ll add the necessary dependencies for Kafka in the next step.
You’ll now add the Java Kafka client to your project, as well as other dependencies for logging. You’ll also configure Maven to include those dependencies during packaging.
First, you’ll add the kafka-clients
dependency. Navigate to the Maven repository page for the Java client in your browser and select the latest available version, then copy the provided XML snippet for Maven. At the time of writing, the latest version of the Java client library was 3.7.0
.
Dependencies are added to pom.xml
in the root of your project. Open it for editing:
nano pom.xml
Find the <dependencies>
section and add the dependency definition:
...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>
This will make the Kafka client library available to your project. However, the library itself requires two more dependencies that you need to add manually. They stem from the SLF4J library it uses for logging messages, as it supports many logging libraries and allows the developer to be flexible in regards as to how the log messages are processed. The two dependencies that you’ll need to also add are:
Once you’ve defined the dependencies, you’ll need to make them available alongside the final, built JAR. Find the <build>
section of pom.xml
and add the highlighted lines:
...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...
Here, you configure the maven-dependency-plugin
plugin to copy all dependencies at packaging time. The JAR files of the dependencies will, in this project configuration, be under target/lib
. Note that you shouldn’t modify the existing <plugins>
section under <pluginManagement>
.
When you’re done, save and close the file.
Build the project to verify that everything is configured correctly:
mvn package
The end of the output should be similar to this:
Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------
You can try listing the files under target/lib
to verify that dependencies have indeed been copied:
Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar
You’ve added the necessary dependencies to your Maven project. You’ll now dive into connecting to Kafka and producing messages programmatically.
In this step, you’ll set up a Kafka producer in Java and write messages to a topic.
As per the project structure, the source code is stored under src/main/java/com/dokafka
. Since you won’t be needing App.java
for the rest of the tutorial, delete it by running:
rm src/main/java/com/dokafka/App.java
You’ll store the producer code in a class called ProducerDemo
. Create and open the accompanying file for editing:
nano src/main/java/com/dokafka/ProducerDemo.java
Add the following lines:
package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}
First, you define the ProducerDemo
class, import the used classes and create a Logger
. In the main
method, you first declare the Kafka cluster address (bootstrapServers
) and the name of the topic for producing messages (topicName
).
Then, you instantiate a Properties
object, which is similar to a key-value dictionary and will hold the configuration for operating your Kafka producer. You populate the BOOTSTRAP_SERVERS_CONFIG
property with the address of the Kafka cluster. You also set the KEY_SERIALIZER_CLASS_CONFIG
and VALUE_SERIALIZER_CLASS_CONFIG
entries to StringSerializer.class.getName()
.
These properties specify which serializers should be used to process keys and values of the produced messages. Serializers are classes that accept an input and give back an array of bytes as output, ready for transportation through the network. Deserializers do the opposite and reconstruct the original object from the stream of bytes. Here, both the key and the value will be serialized as strings using the built-in StringSerializer
.
Next, you declare and instantiate a KafkaProducer
:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
The producer
will accept keys and values of type String
, with the accompanying properties for configuration.
To send a message to a topic, KafkaProducer
accepts a ProducerRecord
, which you instantiate with the name of the topic and the message itself, which is Hello World!
. Notice that the producer itself is not attached to a particular topic.
After sending the message, you flush and close the producer. The producer.send()
call is asynchronous, meaning that the control flow will return to the main
method while the message is being sent on another thread. Since this example program wants to exit after, you force the producer to send out everything it has left by flushing. Then, you close()
it, signaling to Kafka that the producer is being destroyed.
Next, you’ll create a script that will handle the building and running ProducerDemo
. You’ll store it in a file called run-producer.sh
. Create and open it for editing:
nano run-producer.sh
Add the following lines:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo
When you’re done, save and close the file. The highlighted part specifies where the dependencies are located.
Then, mark it as executable:
chmod +x run-producer.sh
Finally, try producing a Hello World!
message by running it:
./run-producer.sh
The output will be long, and its end should look like this:
Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
KafkaProducer
logged that it was successfully created and later on unregistered. The message has now been written to the java_demo
topic, and you can retrieve it using the kafka-console-consumer.sh
script.
In a separate shell, navigate to the directory of your Kafka installation and run the following command to read the topic:
bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092
The output will be:
OutputHello World!
You can press CTRL+C
to exit.
In this step, you’ve programmatically produced a message into the java_demo
topic and read it back using the Kafka-provided bash script. You’ll now learn how to utilize the information Kafka returns when a message has been successfully sent.
The send()
method of KafkaProducer
accepts callbacks, that allows you to act upon events that occur, such as when the record is received. This is useful for retrieving information about how the cluster handled the record.
To extend the send()
call with a callback, first open ProducerDemo
for editing:
nano src/main/java/com/dokafka/ProducerDemo.java
Modify the code to look like this:
...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...
You now pass in an implementation of the Callback
interface to the send()
method and implement onCompletion()
, which receives RecordMetadata
and optionally an Exception
. Then, if an error occurs, you log it. Otherwise, you log the timestamp, partition number, and offset of the record, which is now in the cluster. Since sending the message this way is asynchronous, your code will be called when the cluster accepts the record, without you having to explicitly wait for that to happen.
When you’re done, save and close the file, then run the producer:
./run-producer.sh
Notice a new message at the end of the output:
Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...
The message that was just produced was accepted by the cluster and stored in partition 0
.
If you run it again, you’ll notice that the offset is bigger by one, denoting the place of the message in the partition:
Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3
In this article, you’ve created a Java project using Maven and equipped it with dependencies for interfacing with Kafka. Then, you’ve developed a class that produces messages to your Kafka cluster and extends it to retrieve information about the sent records.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!