Tutorial

Streamline Data Serialization and Versioning with Confluent Schema Registry on Kafka

Published on April 15, 2024
Streamline Data Serialization and Versioning with Confluent Schema Registry on Kafka

Introduction

As businesses grow, maintaining compatibility between data formats is of paramount importance in case of event streaming. Data stored on Apache Kafka topics is immutable and cannot be retroactively modified to suit current demands. Working around this can prove to be a challenge with a large amount of schema changes.

Apache Avro is a data serialization library, designed for event streaming pipelines. It allows you to define rich structures (called schemas) for your data with serialization capabilities for efficient transport and storage. To track the schemas and their versions, Confluent Schema Registry was created. It acts as a centralized repository for your schemas, handles their storage, and ensures inter-compatibility. This allows you to focus on the data instead of coming up with ways of manually converting one schema version to another.

In this tutorial, you’ll deploy Confluent Schema Registry using Docker and extend the Kafka producer and consumer you’ve created in previous tutorials of the series. You’ll rework them to create and consume objects that conform to a schema you’ll define. You’ll also modify the schema and learn how to evolve it without breaking data conforming to earlier versions.

Prerequisites

To follow this tutorial, you will need:

Step 1 - Running Schema Registry Using Docker Compose

In this section, you’ll learn how to run the Confluent Schema Registry using Docker Compose. Unlike Kafka, which can run standalone using KRaft, Schema Registry requires a ZooKeeper instance to function.

As part of the prerequisites, you have deployed Kafka on your local machine as a systemd service. In this step, you’ll deploy a Kafka node through Docker Compose. First, you need to stop the service by running:

sudo systemctl stop kafka

You’ll now define the Docker Compose configuration in a file named schema-registry-compose.yaml. Create and open it for editing by running:

nano schema-registry-compose.yaml

Add the following lines:

schema-registry-compose.yaml
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-schema-registry:
    image: confluentinc/cp-schema-registry
    hostname: kafka-schema-registry
    container_name: kafka-schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Here you define three services, zookeeper, kafka and kafka-schema-registry. All three services use the latest Docker images provided by Confluent. The ZooKeeper service will be exposed at port 2181 and kafka will be at 9092. For kafka, under environment you configure the ZooKeeper address and specify an additional listener at 29092, which the Schema Registry will use to connect to Kafka directly. You specify that the kafka service should wait for zookeeper to start first.

Then, you expose kafka-schema-registry at port 8081 and pass in the address for connecting to the kafka service under environment. You also specify that the Schema Registry should only be started when ZooKeeper and Kafka have finished initializing.

Save and close the file, then run the following command to bring up the services in the background:

docker-compose -f schema-registry-compose.yaml up -d

The end of the output will be similar to the following lines:

Output
... Creating root_zookeeper_1 ... done Creating root_kafka_1 ... done Creating kafka-schema-registry ... done

You can list the running containers with:

docker ps

The output will look like the following:

Output
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 6783568a74c8 confluentinc/cp-schema-registry "/etc/confluent/dock…" 19 seconds ago Up 17 seconds 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp kafka-schema-registry 6367df4b55f7 confluentinc/cp-kafka:latest "/etc/confluent/dock…" 19 seconds ago Up 18 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp root_kafka_1 a5f5b09984e0 confluentinc/cp-zookeeper:latest "/etc/confluent/dock…" 19 seconds ago Up 19 seconds 2181/tcp, 2888/tcp, 3888/tcp root_zookeeper_1

In this step, you’ve deployed a Schema Registry instance, along with ZooKeeper and Kafka using Docker Compose. You’ll now learn how to create and use Avro schemas in your Java project.

Step 2 - Using Avro Schemas

In this section, you’ll add Avro to your project, as well as related dependencies. You’ll learn how to define Avro schemas and have Java classes autogenerated for the defined types. Then, you’ll add your Avro schema to Schema Registry.

Adding Avro to Your Project

First, you’ll add the org.apache.avro dependency. Navigate to the Maven repository page for the Java client in your browser and select the latest available version, then copy the provided XML snippet for Maven. At the time of writing, the latest version of the Java client library was 1.11.3.

Dependencies are added to pom.xml in the root of your project. Open it for editing by running:

nano pom.xml

Find the <dependencies> section and add the dependency definition:

pom.xml
...
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.11.3</version>
</dependency>
...

This will make Avro available to your project. However, to be able to generate Java classes from Avro schemas, you’ll need to also add the avro-maven-plugin from the Maven repository in the same manner.

Once you’ve defined the dependencies, you’ll need to ensure the plugin generates sources. Find the <build> section of pom.xml and add the highlighted lines:

pom.xml
...
<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.11.3</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/java/com/dokafka/</sourceDirectory>
        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>
...

Here, you configure the avro-maven-plugin to generate Java sources based on schemas under /src/main/java/com/dokafka and put them under /src/main/java. Note that you shouldn’t modify the existing <plugins> section under <pluginManagement>.

When you’re done, save and close the file.

Build the project to verify that everything is configured correctly:

mvn package

The end of the output will look similar to this:

Output
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 5.759 s [INFO] Finished at: 2024-04-01T13:38:31Z [INFO] ------------------------------------------------------------------------

Defining Avro Schemas

You’ll now create a schema called TempMeasurement which describes a temperature measurement at a point in time. You’ll store it next to the ProducerDemo and ConsumerDemo classes you’ve created as part of the prerequisites. Create and open it for editing by running:

nano src/main/java/com/dokafka/TempMeasurement.avsc

Add the following lines:

src/main/java/com/dokafka/TempMeasurement.avsc
{
  "namespace": "com.dokafka",
  "name": "TempMeasurement",
  "type": "record",
  "fields": [
      {
          "name": "measuredValue",
          "type": "double"
      },
      {
          "name": "measurerName",
          "type": "string"
      }
  ]
}

Avro schema files are written in JSON and their file extension is .avsc. First, you specify the namespace of the schema, which will also be the namespace of the autogenerated Java classes. You set its name to TempMeasurement and specify record as schema type, signifying that it’s an Avro object.

Then, you specify the fields of your schema, which you call measuredValue and measurerName of types double and string, respectively. Avro also supports other types such as int, long, float, boolean, and bytes.

Save and close the file, then build the project:

mvn package

Now, list the files under src/main/java/com/dokafka:

ls src/main/java/com/dokafka

You’ll see that a TempMeasurement class has been created:

Output
ConsumerDemo.java ProducerDemo.java TempMeasurement.avsc TempMeasurement.java

This class holds the code for instantiating, serializing, and deserializing TempMeasurement objects.

Storing Schema in Schema Registry

Now that you’ve defined your schema, you can add it to Schema Registry by running:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "$(jq -R -s '{"schema": .}' < src/main/java/com/dokafka/TempMeasurement.avsc)" \
    http://localhost:8081/subjects/TempMeasurement/versions

The Schema Registry is exposed at http://localhost:8081 and accessible through HTTP. In this command, you first set the HTTP method to be POST with the appropriate Content-Type that the registry will accept.

You pass in the request body to -d and utilize jq to wrap the schema contents in a field called schema since that’s the format the Schema Registry accepts. Finally, you point the request to subjects/TempMeasurement/versions, in which you specify how the new schema should be called.

The output will be:

Output
{"id":1}

Schema Registry has accepted the request and assigned it an ID of 1.

To list all available schemas, run:

curl -X GET http://localhost:8081/subjects

You’ll see only one available:

Output
["TempMeasurement"]

In this step, you’ve added the necessary dependencies to your Maven project and set up code autogeneration for Avro schemas that you define. You’ll now dive into producing and consuming data based on schemas.

Step 3 - Producing and Consuming According to Schemas

Confluent provides an Avro serializer library for Kafka called kafka-avro-serializer. In this step, you will add it to your project and configure your producer and consumer to transmit TempMeasurement objects.

Adding Necessary Dependencies

To add the library to your project, navigate to the Maven repository and copy the XML dependency definition for the latest available version, which was 7.6.0 at the time of writing. Then, open pom.xml for editing:

nano pom.xml

Since the kafka-avro-serializer package is hosted on Confluent’s Maven repository, you’ll need to define it by adding the highlighted lines:

pom.xml
...
<repositories>
  <repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
  </repository>
</repositories>
...

Similarly, add the library definition to the <dependencies> section:

pom.xml
...
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>7.6.0</version>
</dependency>
...

To allow your Kafka client to communicate with the Schema Registry, you’ll also need to add the kafka-schema-registry-client as a dependency, so navigate to the Maven repo and insert the definition for the latest available version to pom.xml.

Save and close the file when you’re done. You’ll now be able to use the KafkaAvroSerializer and KafkaAvroDeserializer classes in your Maven project.

Updating ProducerDemo and ConsumerDemo

Next, you’ll rework ProducerDemo class to connect to Schema Registry and produce objects of type TempMeasurement on a topic. Open it for editing by running:

nano src/main/java/com/dokafka/ProducerDemo.java

Import KafkaAvroSerializer and KafkaAvroSerializerConfig to be able to use them:

src/main/java/com/dokafka/ProducerDemo.java
...
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
...

Then, modify the first part of the main method to look like this:

src/main/java/com/dokafka/ProducerDemo.java
...
String topicName = "java_demo_avro";

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

KafkaProducer<String, TempMeasurement> producer = new KafkaProducer<>(properties);

ProducerRecord<String, TempMeasurement> producerRecord =
  new ProducerRecord<>(topicName, new TempMeasurement(Double.parseDouble(args[0]), args[1]));
...

First, you set the topic name to java_demo_avro. Kafka will create the topic if it doesn’t already exist. Then, you switch the value serializer class from StringSerializer to KafkaAvroSerializer and set the SCHEMA_REGISTRY_URL_CONFIG parameter, which specifies the address of the Schema Registry.

You also replace the previous String value definition with TempMeasurement. For the producerRecord, you pass in a new instance of TempMeasurement with the two parameters (measurementValue and measurerName) sourced from the first two command-line arguments passed into the main method.

When you’re done, save and close the file. Then, to be able to pass in two arguments to the run-producer.sh script, you need to open it for editing:

nano run-producer.sh

Add the highlighted parameter to the command:

run-producer.sh
...
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1 $2

Save and close the file, then produce a TempMeasurement by running:

./run-producer.sh 100 sammy

In this command, you pass in 100 as the measurementValue and sammy as measurerName.

To be able to receive the Avro object you’ve just produced, you’ll need to modify ConsumerDemo in a similar manner. Open the file for editing:

nano src/main/java/com/dokafka/ConsumerDemo.java

Import KafkaAvroDeserializer and KafkaAvroDeserializerConfig to be able to reference them:

src/main/java/com/dokafka/ConsumerDemo.java
...
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
...

Then, modify the main method to look like this:

src/main/java/com/dokafka/ConsumerDemo.java
...
String topic = "java_demo_avro";
String groupId = "group1";

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

final KafkaConsumer<String, TempMeasurement> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));

try {
  while (true) {
    ConsumerRecords<String, TempMeasurement> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, TempMeasurement> record: records) {
      log.info(String.format("measuredValue: %s, measurerName: %s\n",
        record.value().getMeasuredValue(),
        record.value().getMeasurerName()));
    }
  }
} catch (Exception e) {
  log.error("An error occurred", e);
} finally {
  consumer.close();
}
...

As with ConsumerDemo, you update the topic and populate the SCHEMA_REGISTRY_URL_CONFIG parameter with the address of the Schema Registry. You also update the deserializer to KafkaAvroDeserializer. By setting the SPECIFIC_AVRO_READER_CONFIG parameter to true, you tell the deserializer to return real TempMeasurement objects. Otherwise, it would return an Avro GenericRecord, which would still contain all fields but is not strongly typed.

Then, you propagate TempMeasurement as the value type throughout the rest of the code. In the for loop, you modify the logging method call to output the measurementTemp and measurementValue.

Thanks to the Schema Registry integration, the producer doesn’t bundle the object schema with the object when sending it to a topic. Rather, it sends the object along with an identifier of the schema. The consumer will fetch that schema in its entirety from the Schema Registry and then deserialize it.

Save and close the file when you’re done, then run the consumer:

./run-consumer.sh

The end of the output will be similar to this:

Output
... [main] INFO com.dokafka.ConsumerDemo - measuredValue: 100.0, measurerName: sammy

Your Kafka consumer has successfully deserialized the Avro message, as evidenced by the log message.

In this step, you’ve updated your producer and consumer classes to use Avro objects. In the next step, you’ll learn how to update schemas and track their compatibility with Schema Registry.

Step 4 - Schema Evolution and Compatibility

In this step, you’ll learn how to update existing schemas and how those changes impact compatibility with existing versions and clients.

Aside from storing the schemas and versioning them through time, Schema Registry is crucial for enabling schema evolution. Schemas can be modified throughout the lifetime of a project, whereas already produced data cannot.

Schema Registry takes care of compatibility between schema versions and allows the consumer to parse as much of the data as it can following its internal version of the schema. This allows producers and consumers to be out of sync regarding their exact schema versions since they can reside in different codebases.

The main compatibility strategies that Schema Registry offers are:

  • BACKWARD, which ensures that consumers using the new schema can read data based on the previous version
  • FORWARD, meaning that consumers using the new schema can read data based on the new schema (with no guarantees for the previous ones)
  • FULL, which combines the previous two strategies
  • NONE, meaning that compatibility checks are disabled

The first three strategies also have transitive counterparts (such as BACKWARD_TRANSITIVE), which mandates that the new schema must be compatible with all previous versions of the schema and not just its immediate predecessor. The default strategy is BACKWARD.

You’ll now modify the TempMeasurement schema and add a field for storing the date of the measurement. Open the schema for editing by running:

nano src/main/java/com/dokafka/TempMeasurement.avsc

Modify it to look like this:

src/main/java/com/dokafka/TempMeasurement.avsc
{
  "namespace": "com.dokafka",
  "name": "TempMeasurement",
  "type": "record",
  "fields": [
      {
          "name": "measuredValue",
          "type": "double"
      },
      {
          "name": "measurerName",
          "type": "string"
      },
      {
          "name": "measurementDate",
          "type": "string"
      }
  ]
}

You’ve defined a new field called measurementDate, which will store the date of the measurement in a textual format. Save and close the file when you’re done.

Run the following command to create a new version of the schema in Schema Registry:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "$(jq -R -s '{"schema": .}' < src/main/java/com/dokafka/TempMeasurement.avsc)" \
    http://localhost:8081/subjects/TempMeasurement/versions

You’ll get the following output, detailing an error:

Output
{ "error_code": 409, "message": "Schema being registered is incompatible with an earlier schema for subject \"TempMeasurement\", details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'measurementDate' at path '/fields/2' in the new schema has no default value and is missing in the old schema', additionalInfo:'measurementDate'}, {oldSchemaVersion: 1}, {oldSchema: '{\"type\":\"record\",\"name\":\"TempMeasurement\",\"namespace\":\"com.dokafka\",\"fields\":[{\"name\":\"measuredValue\",\"type\":\"double\"},{\"name\":\"measurerName\",\"type\":\"string\"}]}'}, {validateFields: 'false', compatibility: 'BACKWARD'}]" }

The error states that the new schema isn’t backward compatible with the last because the measurementDate field is new and it does not have a default value. This behavior ensures that consumers with this new schema will be able to read data created using the old schema. In that case, deserialization would fail without a default value for the inexistent field.

Open the file and add the highlighted line:

src/main/java/com/dokafka/TempMeasurement.avsc
{
  "namespace": "com.dokafka",
  "name": "TempMeasurement",
  "type": "record",
  "fields": [
      {
          "name": "measuredValue",
          "type": "double"
      },
      {
          "name": "measurerName",
          "type": "string"
      },
      {
          "name": "measurementDate",
          "type": "string",
          "default": ""
      }
  ]
}

measurementDate will now have a default value of an empty string. Save and close the file, then try submitting the schema again. The output will be:

Output
{"id":2}

Schema Registry has accepted the second version of the schema because it’s backward compatible with the first and assigned it 2 as its ID. You can retrieve the first version by running:

curl -X GET http://localhost:8081/subjects/TempMeasurement/versions/1

The output will be:

Output
{ "subject": "TempMeasurement", "version": 1, "id": 1, "schema": "{\"type\":\"record\",\"name\":\"TempMeasurement\",\"namespace\":\"com.dokafka\",\"fields\":[{\"name\":\"measuredValue\",\"type\":\"double\"},{\"name\":\"measurerName\",\"type\":\"string\"}]}" }

You can also list all versions of a schema by omitting a specific ID:

curl -X GET http://localhost:8081/subjects/TempMeasurement/versions

You’ll see that there are two:

Output
[1, 2]

To change the compatibility strategy, you can run the following command:

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
       --data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
       http://localhost:8081/config/TempMeasurements

This command sets the strategy for TempMeasurements to BACKWARD_TRANSITIVE. Notice that the endpoint is config and not subjects. The output will be:

Output
{"compatibility":"BACKWARD_TRANSITIVE"}

To destroy the Docker Compose resources you’ve started, run the following command:

docker-compose -f schema-registry-compose.yaml down

In this step, you’ve modified the TempMeasurement schema in accordance with the BACKWARD compatibility strategy and published it to the Schema Registry.

Conclusion

In this article, you’ve extended your ProducerDemo and ConsumerDemo classes to produce and consume TempMeasurement objects, serialized by Apache Avro. You’ve learned how to utilize the Schema Registry for schema storage and evolution and connected your Kafka clients to it.


The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products

About the authors
Default avatar
Savic

author


Default avatar

Technical Writer


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Become a contributor for community

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

DigitalOcean Documentation

Full documentation for every DigitalOcean product.

Resources for startups and SMBs

The Wave has everything you need to know about building a business, from raising funding to marketing your product.

Get our newsletter

Stay up to date by signing up for DigitalOcean’s Infrastructure as a Newsletter.

New accounts only. By submitting your email you agree to our Privacy Policy

The developer cloud

Scale up as you grow — whether you're running one virtual machine or ten thousand.

Get started for free

Sign up and get $200 in credit for your first 60 days with DigitalOcean.*

*This promotional offer applies to new accounts only.