As businesses grow, maintaining compatibility between data formats is of paramount importance in case of event streaming. Data stored on Apache Kafka topics is immutable and cannot be retroactively modified to suit current demands. Working around this can prove to be a challenge with a large amount of schema changes.
Apache Avro is a data serialization library, designed for event streaming pipelines. It allows you to define rich structures (called schemas) for your data with serialization capabilities for efficient transport and storage. To track the schemas and their versions, Confluent Schema Registry was created. It acts as a centralized repository for your schemas, handles their storage, and ensures inter-compatibility. This allows you to focus on the data instead of coming up with ways of manually converting one schema version to another.
In this tutorial, you’ll deploy Confluent Schema Registry using Docker and extend the Kafka producer and consumer you’ve created in previous tutorials of the series. You’ll rework them to create and consume objects that conform to a schema you’ll define. You’ll also modify the schema and learn how to evolve it without breaking data conforming to earlier versions.
To follow this tutorial, you will need:
jq
installed on your machine. For an overview, visit the How To Transform JSON Data with jq article.In this section, you’ll learn how to run the Confluent Schema Registry using Docker Compose. Unlike Kafka, which can run standalone using KRaft, Schema Registry requires a ZooKeeper instance to function.
As part of the prerequisites, you have deployed Kafka on your local machine as a systemd service. In this step, you’ll deploy a Kafka node through Docker Compose. First, you need to stop the service by running:
sudo systemctl stop kafka
You’ll now define the Docker Compose configuration in a file named schema-registry-compose.yaml
. Create and open it for editing by running:
nano schema-registry-compose.yaml
Add the following lines:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-schema-registry:
image: confluentinc/cp-schema-registry
hostname: kafka-schema-registry
container_name: kafka-schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Here you define three services, zookeeper
, kafka
and kafka-schema-registry
. All three services use the latest Docker images provided by Confluent. The ZooKeeper service will be exposed at port 2181
and kafka
will be at 9092
. For kafka
, under environment
you configure the ZooKeeper address and specify an additional listener at 29092
, which the Schema Registry will use to connect to Kafka directly. You specify that the kafka
service should wait for zookeeper
to start first.
Then, you expose kafka-schema-registry
at port 8081
and pass in the address for connecting to the kafka
service under environment
. You also specify that the Schema Registry should only be started when ZooKeeper and Kafka have finished initializing.
Save and close the file, then run the following command to bring up the services in the background:
docker-compose -f schema-registry-compose.yaml up -d
The end of the output will be similar to the following lines:
Output...
Creating root_zookeeper_1 ... done
Creating root_kafka_1 ... done
Creating kafka-schema-registry ... done
You can list the running containers with:
docker ps
The output will look like the following:
OutputCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6783568a74c8 confluentinc/cp-schema-registry "/etc/confluent/dock…" 19 seconds ago Up 17 seconds 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp kafka-schema-registry
6367df4b55f7 confluentinc/cp-kafka:latest "/etc/confluent/dock…" 19 seconds ago Up 18 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp root_kafka_1
a5f5b09984e0 confluentinc/cp-zookeeper:latest "/etc/confluent/dock…" 19 seconds ago Up 19 seconds 2181/tcp, 2888/tcp, 3888/tcp root_zookeeper_1
In this step, you’ve deployed a Schema Registry instance, along with ZooKeeper and Kafka using Docker Compose. You’ll now learn how to create and use Avro schemas in your Java project.
In this section, you’ll add Avro to your project, as well as related dependencies. You’ll learn how to define Avro schemas and have Java classes autogenerated for the defined types. Then, you’ll add your Avro schema to Schema Registry.
First, you’ll add the org.apache.avro
dependency. Navigate to the Maven repository page for the Java client in your browser and select the latest available version, then copy the provided XML snippet for Maven. At the time of writing, the latest version of the Java client library was 1.11.3
.
Dependencies are added to pom.xml
in the root of your project. Open it for editing by running:
nano pom.xml
Find the <dependencies>
section and add the dependency definition:
...
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
...
This will make Avro available to your project. However, to be able to generate Java classes from Avro schemas, you’ll need to also add the avro-maven-plugin
from the Maven repository in the same manner.
Once you’ve defined the dependencies, you’ll need to ensure the plugin generates sources. Find the <build>
section of pom.xml
and add the highlighted lines:
...
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/java/com/dokafka/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
...
Here, you configure the avro-maven-plugin
to generate Java sources based on schemas under /src/main/java/com/dokafka
and put them under /src/main/java
. Note that you shouldn’t modify the existing <plugins>
section under <pluginManagement>
.
When you’re done, save and close the file.
Build the project to verify that everything is configured correctly:
mvn package
The end of the output will look similar to this:
Output[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.759 s
[INFO] Finished at: 2024-04-01T13:38:31Z
[INFO] ------------------------------------------------------------------------
You’ll now create a schema called TempMeasurement
which describes a temperature measurement at a point in time. You’ll store it next to the ProducerDemo
and ConsumerDemo
classes you’ve created as part of the prerequisites. Create and open it for editing by running:
nano src/main/java/com/dokafka/TempMeasurement.avsc
Add the following lines:
{
"namespace": "com.dokafka",
"name": "TempMeasurement",
"type": "record",
"fields": [
{
"name": "measuredValue",
"type": "double"
},
{
"name": "measurerName",
"type": "string"
}
]
}
Avro schema files are written in JSON and their file extension is .avsc
. First, you specify the namespace
of the schema, which will also be the namespace of the autogenerated Java classes. You set its name
to TempMeasurement
and specify record
as schema type, signifying that it’s an Avro object.
Then, you specify the fields
of your schema, which you call measuredValue
and measurerName
of types double
and string
, respectively. Avro also supports other types such as int
, long
, float
, boolean
, and bytes
.
Save and close the file, then build the project:
mvn package
Now, list the files under src/main/java/com/dokafka
:
ls src/main/java/com/dokafka
You’ll see that a TempMeasurement
class has been created:
OutputConsumerDemo.java ProducerDemo.java TempMeasurement.avsc TempMeasurement.java
This class holds the code for instantiating, serializing, and deserializing TempMeasurement
objects.
Now that you’ve defined your schema, you can add it to Schema Registry by running:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "$(jq -R -s '{"schema": .}' < src/main/java/com/dokafka/TempMeasurement.avsc)" \
http://localhost:8081/subjects/TempMeasurement/versions
The Schema Registry is exposed at http://localhost:8081
and accessible through HTTP. In this command, you first set the HTTP method to be POST
with the appropriate Content-Type
that the registry will accept.
You pass in the request body to -d
and utilize jq
to wrap the schema contents in a field called schema
since that’s the format the Schema Registry accepts. Finally, you point the request to subjects/TempMeasurement/versions
, in which you specify how the new schema should be called.
The output will be:
Output{"id":1}
Schema Registry has accepted the request and assigned it an ID of 1
.
To list all available schemas, run:
curl -X GET http://localhost:8081/subjects
You’ll see only one available:
Output["TempMeasurement"]
In this step, you’ve added the necessary dependencies to your Maven project and set up code autogeneration for Avro schemas that you define. You’ll now dive into producing and consuming data based on schemas.
Confluent provides an Avro serializer library for Kafka called kafka-avro-serializer
. In this step, you will add it to your project and configure your producer and consumer to transmit TempMeasurement
objects.
To add the library to your project, navigate to the Maven repository and copy the XML dependency definition for the latest available version, which was 7.6.0
at the time of writing. Then, open pom.xml
for editing:
nano pom.xml
Since the kafka-avro-serializer
package is hosted on Confluent’s Maven repository, you’ll need to define it by adding the highlighted lines:
...
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
...
Similarly, add the library definition to the <dependencies>
section:
...
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
...
To allow your Kafka client to communicate with the Schema Registry, you’ll also need to add the kafka-schema-registry-client
as a dependency, so navigate to the Maven repo and insert the definition for the latest available version to pom.xml
.
Save and close the file when you’re done. You’ll now be able to use the KafkaAvroSerializer
and KafkaAvroDeserializer
classes in your Maven project.
Next, you’ll rework ProducerDemo
class to connect to Schema Registry and produce objects of type TempMeasurement
on a topic. Open it for editing by running:
nano src/main/java/com/dokafka/ProducerDemo.java
Import KafkaAvroSerializer
and KafkaAvroSerializerConfig
to be able to use them:
...
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
...
Then, modify the first part of the main
method to look like this:
...
String topicName = "java_demo_avro";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
KafkaProducer<String, TempMeasurement> producer = new KafkaProducer<>(properties);
ProducerRecord<String, TempMeasurement> producerRecord =
new ProducerRecord<>(topicName, new TempMeasurement(Double.parseDouble(args[0]), args[1]));
...
First, you set the topic name to java_demo_avro
. Kafka will create the topic if it doesn’t already exist. Then, you switch the value serializer class from StringSerializer
to KafkaAvroSerializer
and set the SCHEMA_REGISTRY_URL_CONFIG
parameter, which specifies the address of the Schema Registry.
You also replace the previous String
value definition with TempMeasurement
. For the producerRecord
, you pass in a new instance of TempMeasurement
with the two parameters (measurementValue
and measurerName
) sourced from the first two command-line arguments passed into the main
method.
When you’re done, save and close the file. Then, to be able to pass in two arguments to the run-producer.sh
script, you need to open it for editing:
nano run-producer.sh
Add the highlighted parameter to the command:
...
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1 $2
Save and close the file, then produce a TempMeasurement
by running:
./run-producer.sh 100 sammy
In this command, you pass in 100
as the measurementValue
and sammy
as measurerName
.
To be able to receive the Avro object you’ve just produced, you’ll need to modify ConsumerDemo
in a similar manner. Open the file for editing:
nano src/main/java/com/dokafka/ConsumerDemo.java
Import KafkaAvroDeserializer
and KafkaAvroDeserializerConfig
to be able to reference them:
...
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
...
Then, modify the main
method to look like this:
...
String topic = "java_demo_avro";
String groupId = "group1";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
final KafkaConsumer<String, TempMeasurement> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, TempMeasurement> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, TempMeasurement> record: records) {
log.info(String.format("measuredValue: %s, measurerName: %s\n",
record.value().getMeasuredValue(),
record.value().getMeasurerName()));
}
}
} catch (Exception e) {
log.error("An error occurred", e);
} finally {
consumer.close();
}
...
As with ConsumerDemo
, you update the topic
and populate the SCHEMA_REGISTRY_URL_CONFIG
parameter with the address of the Schema Registry. You also update the deserializer to KafkaAvroDeserializer
. By setting the SPECIFIC_AVRO_READER_CONFIG
parameter to true
, you tell the deserializer to return real TempMeasurement
objects. Otherwise, it would return an Avro GenericRecord
, which would still contain all fields but is not strongly typed.
Then, you propagate TempMeasurement
as the value type throughout the rest of the code. In the for loop, you modify the logging method call to output the measurementTemp
and measurementValue
.
Thanks to the Schema Registry integration, the producer doesn’t bundle the object schema with the object when sending it to a topic. Rather, it sends the object along with an identifier of the schema. The consumer will fetch that schema in its entirety from the Schema Registry and then deserialize it.
Save and close the file when you’re done, then run the consumer:
./run-consumer.sh
The end of the output will be similar to this:
Output...
[main] INFO com.dokafka.ConsumerDemo - measuredValue: 100.0, measurerName: sammy
Your Kafka consumer has successfully deserialized the Avro message, as evidenced by the log message.
In this step, you’ve updated your producer and consumer classes to use Avro objects. In the next step, you’ll learn how to update schemas and track their compatibility with Schema Registry.
In this step, you’ll learn how to update existing schemas and how those changes impact compatibility with existing versions and clients.
Aside from storing the schemas and versioning them through time, Schema Registry is crucial for enabling schema evolution. Schemas can be modified throughout the lifetime of a project, whereas already produced data cannot.
Schema Registry takes care of compatibility between schema versions and allows the consumer to parse as much of the data as it can following its internal version of the schema. This allows producers and consumers to be out of sync regarding their exact schema versions since they can reside in different codebases.
The main compatibility strategies that Schema Registry offers are:
BACKWARD
, which ensures that consumers using the new schema can read data based on the previous versionFORWARD
, meaning that consumers using the new schema can read data based on the new schema (with no guarantees for the previous ones)FULL
, which combines the previous two strategiesNONE
, meaning that compatibility checks are disabledThe first three strategies also have transitive counterparts (such as BACKWARD_TRANSITIVE
), which mandates that the new schema must be compatible with all previous versions of the schema and not just its immediate predecessor. The default strategy is BACKWARD
.
You’ll now modify the TempMeasurement
schema and add a field for storing the date of the measurement. Open the schema for editing by running:
nano src/main/java/com/dokafka/TempMeasurement.avsc
Modify it to look like this:
{
"namespace": "com.dokafka",
"name": "TempMeasurement",
"type": "record",
"fields": [
{
"name": "measuredValue",
"type": "double"
},
{
"name": "measurerName",
"type": "string"
},
{
"name": "measurementDate",
"type": "string"
}
]
}
You’ve defined a new field called measurementDate
, which will store the date of the measurement in a textual format. Save and close the file when you’re done.
Run the following command to create a new version of the schema in Schema Registry:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "$(jq -R -s '{"schema": .}' < src/main/java/com/dokafka/TempMeasurement.avsc)" \
http://localhost:8081/subjects/TempMeasurement/versions
You’ll get the following output, detailing an error:
Output{
"error_code": 409,
"message": "Schema being registered is incompatible with an earlier schema for subject \"TempMeasurement\", details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'measurementDate' at path '/fields/2' in the new schema has no default value and is missing in the old schema', additionalInfo:'measurementDate'}, {oldSchemaVersion: 1}, {oldSchema: '{\"type\":\"record\",\"name\":\"TempMeasurement\",\"namespace\":\"com.dokafka\",\"fields\":[{\"name\":\"measuredValue\",\"type\":\"double\"},{\"name\":\"measurerName\",\"type\":\"string\"}]}'}, {validateFields: 'false', compatibility: 'BACKWARD'}]"
}
The error states that the new schema isn’t backward compatible with the last because the measurementDate
field is new and it does not have a default value. This behavior ensures that consumers with this new schema will be able to read data created using the old schema. In that case, deserialization would fail without a default value for the inexistent field.
Open the file and add the highlighted line:
{
"namespace": "com.dokafka",
"name": "TempMeasurement",
"type": "record",
"fields": [
{
"name": "measuredValue",
"type": "double"
},
{
"name": "measurerName",
"type": "string"
},
{
"name": "measurementDate",
"type": "string",
"default": ""
}
]
}
measurementDate
will now have a default value of an empty string. Save and close the file, then try submitting the schema again. The output will be:
Output{"id":2}
Schema Registry has accepted the second version of the schema because it’s backward compatible with the first and assigned it 2
as its ID. You can retrieve the first version by running:
curl -X GET http://localhost:8081/subjects/TempMeasurement/versions/1
The output will be:
Output{
"subject": "TempMeasurement",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"TempMeasurement\",\"namespace\":\"com.dokafka\",\"fields\":[{\"name\":\"measuredValue\",\"type\":\"double\"},{\"name\":\"measurerName\",\"type\":\"string\"}]}"
}
You can also list all versions of a schema by omitting a specific ID:
curl -X GET http://localhost:8081/subjects/TempMeasurement/versions
You’ll see that there are two:
Output[1, 2]
To change the compatibility strategy, you can run the following command:
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
http://localhost:8081/config/TempMeasurements
This command sets the strategy for TempMeasurements
to BACKWARD_TRANSITIVE
. Notice that the endpoint is config
and not subjects
. The output will be:
Output{"compatibility":"BACKWARD_TRANSITIVE"}
To destroy the Docker Compose resources you’ve started, run the following command:
docker-compose -f schema-registry-compose.yaml down
In this step, you’ve modified the TempMeasurement
schema in accordance with the BACKWARD
compatibility strategy and published it to the Schema Registry.
In this article, you’ve extended your ProducerDemo
and ConsumerDemo
classes to produce and consume TempMeasurement
objects, serialized by Apache Avro. You’ve learned how to utilize the Schema Registry for schema storage and evolution and connected your Kafka clients to it.
The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!