The author selected Open Source Initiative to receive a donation as part of the Write for DOnations program.
Apache Kafka provides shell scripts for producing and consuming basic textual messages to and from a Kafka cluster. While those are useful for exploring and experimenting, real-world applications access Kafka programmatically. For this purpose, Kafka offers many client libraries for widely used programming languages and environments.
In this tutorial, you’ll create a Java program that produces data into a Kafka topic. You’ll create a Java project using Apache Maven, a tool for building and packaging Java projects, and add the Kafka client library as a dependency. Then, you’ll implement a class that leverages the Kafka client by producing messages and retrieving in-cluster metadata about them.
To complete this tutorial, you’ll need:
In this step, you’ll install Apache Maven and use it to generate a project that you’ll use to interface with Kafka.
On Ubuntu, Maven is readily available in the official repositories. For installing Maven on a MacOS system, please refer to the How to Install Maven on Mac OS tutorial.
First, update your existing list of packages by running:
Run the following command to install it:
Verify that it’s installed by reading out its version number:
The output will be similar to the following, depending on your Java version and platform:
Next, create a directory where you’ll store your Java projects for working with Kafka:
Navigate to the newly created directory:
Then, generate an empty Java project by running:
Here, you instruct Maven to generate a new project named dokafka
, with a group ID of com.dokafka
. The group ID uniquely identifies this project across the Maven ecosystem. The project will be generated according to the maven-archetype-quickstart
archetype, which is how Maven calls templates.
There will be a lot of output, especially if this is the first time Maven is being run. The end of the output will look like this:
Maven has downloaded the necessary Java packages from its central repository and created the dokafka
project using the maven-archetype-quickstart
template.
Navigate to the project directory by running:
The structure of the project looks like this:
As part of the prerequisites, you learned about the standard Maven project structure that you see here. The src/main/java
directory holds the project source code, src/test/java
contains the test sources, and pom.xml
in the root of the project is the main configuration file for Maven.
The project contains only one source file, App.java
. Show its contents to see what Maven generated:
The output will be:
To run this code, you’ll first need to build the project by running:
Maven will compile the code and package it into a JAR file for execution. The end of the output will look like this, signifying that it’s completed:
Maven placed the resulting JAR file under the target
directory. To execute the App
class you’ve just built, run the following command, passing in the full identifier of the class:
The output will be:
You’ve installed Maven and created an empty Java project. You’ll add the necessary dependencies for Kafka in the next step.
You’ll now add the Java Kafka client to your project, as well as other dependencies for logging. You’ll also configure Maven to include those dependencies during packaging.
First, you’ll add the kafka-clients
dependency. Navigate to the Maven repository page for the Java client in your browser and select the latest available version, then copy the provided XML snippet for Maven. At the time of writing, the latest version of the Java client library was 3.7.0
.
Dependencies are added to pom.xml
in the root of your project. Open it for editing:
Find the <dependencies>
section and add the dependency definition:
This will make the Kafka client library available to your project. However, the library itself requires two more dependencies that you need to add manually. They stem from the SLF4J library it uses for logging messages, as it supports many logging libraries and allows the developer to be flexible in regards as to how the log messages are processed. The two dependencies that you’ll need to also add are:
Once you’ve defined the dependencies, you’ll need to make them available alongside the final, built JAR. Find the <build>
section of pom.xml
and add the highlighted lines:
Here, you configure the maven-dependency-plugin
plugin to copy all dependencies at packaging time. The JAR files of the dependencies will, in this project configuration, be under target/lib
. Note that you shouldn’t modify the existing <plugins>
section under <pluginManagement>
.
When you’re done, save and close the file.
Build the project to verify that everything is configured correctly:
The end of the output should be similar to this:
You can try listing the files under target/lib
to verify that dependencies have indeed been copied:
You’ve added the necessary dependencies to your Maven project. You’ll now dive into connecting to Kafka and producing messages programmatically.
In this step, you’ll set up a Kafka producer in Java and write messages to a topic.
As per the project structure, the source code is stored under src/main/java/com/dokafka
. Since you won’t be needing App.java
for the rest of the tutorial, delete it by running:
You’ll store the producer code in a class called ProducerDemo
. Create and open the accompanying file for editing:
Add the following lines:
First, you define the ProducerDemo
class, import the used classes and create a Logger
. In the main
method, you first declare the Kafka cluster address (bootstrapServers
) and the name of the topic for producing messages (topicName
).
Then, you instantiate a Properties
object, which is similar to a key-value dictionary and will hold the configuration for operating your Kafka producer. You populate the BOOTSTRAP_SERVERS_CONFIG
property with the address of the Kafka cluster. You also set the KEY_SERIALIZER_CLASS_CONFIG
and VALUE_SERIALIZER_CLASS_CONFIG
entries to StringSerializer.class.getName()
.
These properties specify which serializers should be used to process keys and values of the produced messages. Serializers are classes that accept an input and give back an array of bytes as output, ready for transportation through the network. Deserializers do the opposite and reconstruct the original object from the stream of bytes. Here, both the key and the value will be serialized as strings using the built-in StringSerializer
.
Next, you declare and instantiate a KafkaProducer
:
The producer
will accept keys and values of type String
, with the accompanying properties for configuration.
To send a message to a topic, KafkaProducer
accepts a ProducerRecord
, which you instantiate with the name of the topic and the message itself, which is Hello World!
. Notice that the producer itself is not attached to a particular topic.
After sending the message, you flush and close the producer. The producer.send()
call is asynchronous, meaning that the control flow will return to the main
method while the message is being sent on another thread. Since this example program wants to exit after, you force the producer to send out everything it has left by flushing. Then, you close()
it, signaling to Kafka that the producer is being destroyed.
Next, you’ll create a script that will handle the building and running ProducerDemo
. You’ll store it in a file called run-producer.sh
. Create and open it for editing:
Add the following lines:
When you’re done, save and close the file. The highlighted part specifies where the dependencies are located.
Then, mark it as executable:
Finally, try producing a Hello World!
message by running it:
The output will be long, and its end should look like this:
KafkaProducer
logged that it was successfully created and later on unregistered. The message has now been written to the java_demo
topic, and you can retrieve it using the kafka-console-consumer.sh
script.
In a separate shell, navigate to the directory of your Kafka installation and run the following command to read the topic:
The output will be:
You can press CTRL+C
to exit.
In this step, you’ve programmatically produced a message into the java_demo
topic and read it back using the Kafka-provided bash script. You’ll now learn how to utilize the information Kafka returns when a message has been successfully sent.
The send()
method of KafkaProducer
accepts callbacks, that allows you to act upon events that occur, such as when the record is received. This is useful for retrieving information about how the cluster handled the record.
To extend the send()
call with a callback, first open ProducerDemo
for editing:
Modify the code to look like this:
You now pass in an implementation of the Callback
interface to the send()
method and implement onCompletion()
, which receives RecordMetadata
and optionally an Exception
. Then, if an error occurs, you log it. Otherwise, you log the timestamp, partition number, and offset of the record, which is now in the cluster. Since sending the message this way is asynchronous, your code will be called when the cluster accepts the record, without you having to explicitly wait for that to happen.
When you’re done, save and close the file, then run the producer:
Notice a new message at the end of the output:
The message that was just produced was accepted by the cluster and stored in partition 0
.
If you run it again, you’ll notice that the offset is bigger by one, denoting the place of the message in the partition:
In this article, you’ve created a Java project using Maven and equipped it with dependencies for interfacing with Kafka. Then, you’ve developed a class that produces messages to your Kafka cluster and extends it to retrieve information about the sent records.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!