Apache Kafka provides shell scripts for producing and consuming basic textual messages to and from a Kafka cluster. While those are useful for exploring and experimenting, real-world applications access Kafka programmatically. For this purpose, Kafka offers many client libraries for widely used programming languages and environments.
In this tutorial, you’ll create a Java program that consumes data from a Kafka topic. You’ll reuse the Java project from the previous tutorial on How to Set Up a Kafka Producer to Source Data Through CLI.
You’ll implement a class that leverages the Kafka client by consuming messages from a topic. Then, you’ll learn how Kafka manages multiple consumers reading the same topic at once and how it tracks their progress. You’ll also learn how to manually report consumer progress back to the cluster.
To complete this tutorial, you’ll need:
As part of the prerequisites, you have created a Java project with the necessary dependencies for programmatically accessing Kafka and producing messages into the java_demo
topic. In this step, you’ll create a class that will consume messages from that topic.
Navigate to the directory where the dokafka
project is stored. As per the project structure, the source code is stored under src/main/java/com/dokafka
.
You’ll store the class in a file named ConsumerDemo.java
. Create and open it for editing by running:
nano src/main/java/com/dokafka/ConsumerDemo.java
Add the following lines:
package com.dokafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.time.*;
public class ConsumerDemo {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
String groupId = "group1";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record: records) {
log.info(String.format("topic = %s, partition = %d, offset = %d, value = %s\n",
record.topic(),
record.partition(),
record.offset(),
record.value()));
}
}
} catch (Exception e) {
log.error("An error occurred", e);
} finally {
consumer.close();
}
}
}
First, you define the ConsumerDemo
class and import the classes you’ll use. You also instantiate a Logger
as a member of the class. Similarly to ProducerDemo
, in the main()
method, you first declare the Kafka cluster address (bootstrapServers
) and the name of the topic from which you’ll consume messages.
Every Kafka consumer belongs to a consumer group, identified by a unique string called group ID. You define it as group1
and store it in groupId
.
Then, you instantiate a Properties object, which holds pairs of keys and values representing the configuration for operating your Kafka consumer. You set the BOOTSTRAP_SERVERS_CONFIG
property to the address of the Kafka cluster. Here, you set the key and value deserializer entries to StringDeserializer.class.getName()
.
Inversely to serializers (which are used in the ProducerDemo
class), deserializers are classes that accept an input in bytes and reconstruct the original object. The consumer uses them to convert the network-acceptable state of the key and value back into their original forms that the code understands. Both the key and the value will be deserialized back into strings using the built-in StringDeserializer
.
You then set the consumer’s group ID (GROUP_ID_CONFIG
). You also set the AUTO_OFFSET_RESET_CONFIG
parameter, which defines from which place in the topic the consumer should start reading if it has no previously saved position. Setting it to earliest
instructs it to start from the beginning (offset 0
).
After that, you declare and instantiate a KafkaConsumer
:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
The consumer
will parse keys and values of type String
with the accompanying properties for configuration.
To express interest in receiving messages from the topic
, you subscribe()
to it:
consumer.subscribe(Collections.singletonList(topicName));
A consumer can subscribe to multiple topics at once by passing in the ID of each one in the form of a List
. Because you’ll subscribe to only one topic, you use the Collections.singletonList()
helper method to create a List
with topic
as its only element.
Next, you start receiving the topic records by polling in an infinite loop:
...
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record: records) {
log.info(String.format("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()));
}
}
} catch (Exception e) {
log.error("An error occurred", e);
} finally {
consumer.close();
}
...
The poll()
method of KafkaConsumer
accepts a Duration
, denoting how long the consumer should wait for new records to be streamed to it before returning. You then log the metadata and value of each received record. Notice that the infinite loop is located in a try
block. Without it, the compilation would fail because consumer.close()
would be unreachable.
When you’re done, save and close the file.
Next, you’ll create a script that will handle compiling and running ConsumerDemo
. You’ll store it in a file called run-consumer.sh
. Create and open it for editing:
nano run-consumer.sh
Add the following lines:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ConsumerDemo
Save and close the file, then mark it as executable:
chmod +x run-consumer.sh
Finally, try running it:
./run-consumer.sh
The output will be long, and its end should look similar to this:
Output...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Found no committed offset for partition java_demo-0
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting offset for partition java_demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo, partition = 0, offset = 0, key = null, value = Hello World!
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo, partition = 0, offset = 1, key = null, value = Hello World!
KafkaConsumer
logged that it was assigned to partition 0
of the java_demo
topic and that it didn’t find a committed offset for that partition. For that reason, you previously set AUTO_OFFSET_RESET_CONFIG
to earliest
and the offset for the partition is reset to 0
as the next log message states. Consumers track where they stopped reading by periodically (or manually) committing the offset back into the cluster. You’ll learn more about offsets and how they relate to partitions and consumer groups in the next section.
Since it started reading from the beginning, the consumer received the Hello World!
messages you produced as part of the prerequisites.
Press CTRL+C
to stop it, then run it again:
Output...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}
...
The consumer will now start reading at offset 2
, which is where it stopped the last time. This means that the Kafka client has automatically saved its previous offset to the cluster.
In this step, you’ve created a Kafka consumer in Java that streams records from the java_demo
topic. You’ll now extend it so that it disposes of the KafkaConsumer
properly when shutting down.
The consumer.close()
line in the finally
block will run after the flow of execution exits the try
block. Since it contains an infinite loop, that will happen only if an exception occurs. You’ll now extend ConsumerDemo
to also close the consumer when it’s being shut down or killed.
Java allows you to register functions which will be run when the program is being closed, called shutdown hooks. Open ConsumerDemo
for editing:
nano src/main/java/com/dokafka/ConsumerDemo.java
Add the highlighted lines:
...
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
Thread currentThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.wakeup();
try {
currentThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
...
Here you first retrieve a reference to the thread that’s currently executing. Then, you add a shutdown hook by passing in a new Thread
with your code. You override the run()
method of that thread, in which you order the consumer to wakeup()
. This will stop the consumer and raise a WakeupException
which should be caught. After, the finally
block will run and close()
the consumer. For that to happen, this thread will merge with the main one and give back the flow of execution to it.
You’ll need to catch
the WakeupException
by adding the following lines after the try
block:
...
} catch (WakeupException e) {
// Ignore
} catch (Exception e) {
log.error("An error occurred", e);
} finally {
consumer.close();
}
...
The WakeupException
is now caught and ignored, since the consumer will be closed in finally
.
Save and close the file when you’re done, then run the consumer:
./run-consumer.sh
Wait for it to load, then press CTRL+C
. You’ll notice that shutdown now isn’t instantaneous as the consumer is communicating with the cluster and announcing its departure from the consumer group. The end of the output denoting that will be similar to this:
Output...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions java_demo_partitions-0, java_demo_partitions-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-7bddcea2-ee1a-4a15-9797-669c0302d19f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer is being closed
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting generation and member id due to: consumer pro-actively leaving the group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Request joining group due to: consumer pro-actively leaving the group
...
In this step, you’ve implemented a shutdown hook in ConsumerDemo
that ensures the consumer will properly close when the program is being shut down. You’ll now learn more about the architecture of consumer groups and offsets.
The consumer you’ve created in the previous step has its GROUP_ID_CONFIG
parameter set to group1
, which denotes that it belongs to a consumer group with that ID. Consumers in a group are considered homogenous, which allows Kafka to load balance the events between them.
Each Kafka topic is comprised of partitions, which are distributed among the nodes in the cluster for fault tolerance. Every partition has its own set of records, as well as its own offset. Kafka guarantees order of records within a partition, but not across them within a topic. By default, Kafka employs a round-robin algorithm for deciding to which partition an incoming record should be added to. While useful in the general case, this isn’t acceptable when you need related records to be stored in strong order, as reading back one partition would not net the whole logical chain of events.
To maintain strict ordering, Kafka allows you to pass in a key for the message. With a key, Kafka will always add the record to the same partition. As Kafka guarantees ordering in a partition, you can be sure that all messages with the same key will be streamed in order of insertion.
Partitions can be replicated across multiple brokers to ensure redundancy. The main partition with which the consumers interact is called the partition leader, while the secondary ones are regarded as replicas. Replicas are stored on nodes other than the one where the partition leader resides.
As each partition is a separate stream of events with its own offsets, each partition will get assigned to a consumer in the group. If there are more partitions than consumers, some consumers will read from multiple partitions. Note that in case of there being more consumers than partitions, some of the consumers won’t be assigned anything and will stay idle. This is done on purpose, as having multiple consumers read from the same partition in parallel would lead to processing the same events more than once. For this reason, it’s recommended to match the number of partitions and consumers in a group.
Consumers can enter or leave a consumer group at any time. When that happens, Kafka will rebalance the partitions among the new set of consumers. Each consumer then will fetch the latest committed offset for each partition it is assigned to and continue processing from there. There are no guarantees as to whether a consumer will retain the set of partitions it was working on before, so it’s very important to commit the offsets back to Kafka only when the work has actually been completed.
You’ll now create a new topic with two partitions. Then, you’ll modify ProducerDemo
to set a key for each message it sends and run multiple consumers in a group to see how Kafka arranges the incoming messages.
From the directory of your Kafka installation, run the following command to create a new topic:
bin/kafka-topics.sh --create --topic java_demo_partitions --bootstrap-server localhost:9092 --partitions 2
The topic is called java_demo_partitions
and contains two partitions. The output will be:
Output...
Created topic java_demo_partitions.
Since you’ll modify ProducerDemo
, open it for editing:
nano src/main/java/com/dokafka/ProducerDemo.java
First, set topicName
to java_demo_producer
:
...
String topicName = "java_demo_partitions";
...
Then, pass in the first CLI argument that your program receives as the key for the ProducerRecord
:
...
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, args[0], "Hello World!");
...
The string array args
is passed in to the main()
method and contains arguments that are passed in when executing the program. When you’re done, save and close the file.
Next, you’ll update ConsumerDemo
to fetch data from the new topic. Open it for editing:
nano src/main/java/com/dokafka/ConsumerDemo.java
Similarly, set topicName
to the new topic:
...
String topicName = "java_demo_partitions";
...
Save and close the file.
Before running the producer, you’ll need to update the run-producer.sh
script to pass in a given argument to ProducerDemo
. Open it for editing:
nano run-producer.sh
Pass in the first argument to the java
command as shown:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1
When you’re done, save and close the file.
Next, open a separate terminal sessions and run the consumer:
./run-consumer.sh
Then, open a third terminal session and run the second consumer in it:
./run-consumer.sh
The end of the output of one of the consumers will be similar to this:
Output...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo_partitions-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo_partitions-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}
The other consumer will have the java_demo_partitions-1
partition assigned to it.
In the main session, produce a message with key key1
by running:
./run-producer.sh key1
Notice that only one of the two consumers will receive the message:
Output[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 0, offset = 0, key = key1, value = Hello World!
Try producing a message with a different key:
./run-producer.sh key2
This time, the other consumer will receive this message because Kafka routed it to the remaining partition (from which that consumer streams records):
Output[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 1, offset = 0, key = key2, value = Hello World!
Press CTRL+C
in the third session to terminate the second consumer. You’ll see the two partitions being rebalanced to the remaining consumer:
Output...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo_partitions-0, java_demo_partitions-1])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo_partitions-0, java_demo_partitions-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-1 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}
Then, try producing messages again:
./run-producer.sh key1
./run-producer.sh key2
You’ll see that the remaining consumer has received both:
Output...
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 0, offset = 1, key = key1, value = Hello World!
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 1, offset = 1, key = key2, value = Hello World!
In this step, you’ve learned about running multiple consumers in a group and how Kafka rebalances traffic between them by reassigning partitions in case of changes. You’ll now learn how to manually commit offsets back to the cluster.
The default behaviour of the Kafka client library is to automatically commit the latest offset returned by poll()
every 5 seconds. This is unsafe if the consumer processes the events at a slower pace than it auto commits and may lead to records being unprocessed. You’ll now learn how to manually commit offsets to prevent this issue.
For example, if poll()
returned messages from offsets 0
to 10
, the consumer would send back that 10
is the latest offset it processed, regardless of if that actually happened. If the app crashes before actually processing the message at offset 10
, the next time it runs it will start from there, effectively leaving some records unprocessed.
You’ll modify ConsumerDemo
, so open it for editing:
nano src/main/java/com/dokafka/ConsumerDemo.java
First, disable auto commits by setting ENABLE_AUTO_COMMIT_CONFIG
to false
:
...
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
...
Then, add the highlighted line to the loop which goes over each received record:
...
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record: records) {
log.info(String.format("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()));
consumer.commitSync();
}
}
}
...
The commitSync()
method of the KafkaConsumer
will commit the current offset in a blocking way, meaning that further processing of records will not happen until the commit process is finished or an exception is thrown.
KafkaConsumer
also provides the commitAsync()
method, which will try to commit in the background while returning the flow of execution back to the caller (in this case, the loop will continue executing). The downside to this is that the state may still be inconsistent - Kafka may return an error and your code would have already moved on to the next record.
When you’re done, save and close the file. You can try running the consumer in a separate session and then producing a few messages. The overall flow will be the same, but the consumer now won’t commit offsets that it hasn’t yet processed.
In this section, you’ve learned to manually commit offsets back to Kafka as you process the received records.
In this tutorial, you’ve extended the ProducerDemo
Kafka producer and created ConsumerDemo
, a Kafka consumer written in Java. You’ve learned about consumer groups and how Kafka assigns partitions to consumers in groups.
You’ve ensured that the KafkaConsumer
instance is ultimately properly closed even when the process is being disposed of. You’ve also learned how to manually commit offsets back to the cluster together with processing the records.
For more information about KafkaConsumer
and its properties, visit the official Kafka docs.
The author selected Open Source Initiative to receive a donation as part of the Write for DOnations program.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!