Developer Center

Kafka Migration and Event Streaming

Kafka Migration and Event Streaming

Introduction

Apache Kafka is an open-source distributed event and stream-processing platform written in Java, built to process demanding real-time data feeds. It is inherently scalable, with high throughput and availability. It can scale from a single-node cluster for testing to hundreds of nodes in production to serve large amounts of data. When expanding the cluster, existing topics are not automatically rearranged across the new cluster state.

In this tutorial, you’ll learn how to expand your Kafka cluster by adding a new node and properly migrating topic partitions to the new node, ensuring maximum resource utilization. You’ll learn how to achieve that manually using the provided script, as well as automatically with Kafka Cruise Control, a daemon for automatically optimizing the inner processes of a Kafka cluster. You’ll also learn how to aggregate your event data using ksqlDB, a database that seamlessly operates on top of Kafka topics.

Prerequisites

To complete this tutorial, you’ll need:

  • Four Droplets available with at least 4GB RAM and 2 CPUs. In case of an Ubuntu server, follow the Ubuntu Initial Server Setup for setup instructions.
  • Java Development Kit (JDK) 11 installed on your Droplet or local machine. For instructions on installing Java on Ubuntu, see the How To Install Java with Apt on Ubuntu tutorial.
  • A cluster of three Apache Kafka nodes. You can create it by following the How To Set Up a Multi-Node Kafka Cluster using KRaft tutorial.
  • A single Apache Kafka node, installed and configured on the fourth Droplet. You can follow the Introduction to Kafka tutorial for setup instructions. You only need to complete Step 1 and Step 2.
  • A fully registered domain name with four subdomains pointed towards the four Kafka nodes in total. This tutorial will refer to them individually as kafkaX.your_domain throughout. You can purchase a domain name on Namecheap, get one for free on Freenom, or use the domain registrar of your choice.
  • kcat and Kafka Cruise Control metrics reporter installed all four nodes in total. Follow the How To Manage Kafka Programmatically tutorial for instructions. You only need to complete Step 2 and Step 3, and you do not need to configure Cruise Control on the nodes other than the fourth one.
  • Docker installed on your machine. For Ubuntu, visit How To Install and Use Docker on Ubuntu. You only need to complete Step 1 and Step 2. Otherwise, visit Docker’s website for other distributions.
  • Docker Compose installed on your machine. For Ubuntu, visit How To Install and Use Docker Compose on Ubuntu. You only need to complete Step 1. Otherwise, visit Docker’s website for other distributions.

Step 1 - Expanding the Cluster

In this step, you’ll learn how to add nodes as brokers to your KRaft Kafka cluster. With KRaft, the nodes themselves can organize and perform administrative tasks without the overhead of depending on Apache ZooKeeper, freeing you from the additional dependency. You’ll also learn how to use the new broker by migrating topics to it.

After completing the prerequisites, you will have a Kafka cluster consisting of three nodes. Before expanding the cluster with one more node, you’ll create a new topic.

On the fourth node, as user kafka, navigate to the directory where Kafka is installed (~/kafka) and run the following command:

./bin/kafka-topics.sh --bootstrap-server kafka1.your_domain:9092 --create --topic new-topic

The output will be:

Output
Created topic new-topic.

Integrating the Fourth Node

First, navigate to the directory where Kafka resides and open its configuration file for editing by running:

nano config/kraft/server.properties

Find the following lines:

config/kraft/server.properties
...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...

Modify them to look like this:

config/kraft/server.properties
...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker

# The node id associated with this instance's roles
node.id=4

# The connect string for the controller quorum
controller.quorum.voters=1@kafka1.your_domain:9093
...

By default, the process.roles parameter configures the node to act as both broker and controller, making it suitable for both receiving and consuming data, as well as performing administrative tasks. In this case, you’ll configure it to act only as a broker.

node.id specifies the node’s ID in the cluster. Each node in the cluster must have a unique ID, regardless of its role. Here, you set it to 4.

controller.quorum.voters maps node IDs to their respective addresses and ports for communication. This is where you specify the addresses of all controller nodes in the cluster. Remember to replace kafka1.your_domain with the domain name that points to the first Kafka node.

Next, find the following lines in the file:

config/kraft/server.properties
...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://:9092
...

Modify them to look like this:

config/kraft/server.properties
...
listeners=PLAINTEXT://kafka4.your_domain:9092

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka4.your_domain:9092
...

Because this node is not a controller, you remove the CONTROLLER reference in listeners. You also specify the domain name of the broker explicitly. You have to do this on every node in your cluster.

You’ve now configured the basic parameters for connecting this node to the first one, forming a proper cluster. You’ll now configure the replication factors to ensure redundancy across the cluster.

Configuring Replication Factors

In the same file, find the num.partitions setting:

config/kraft/server.properties
...
num.partitions=1
...

Set it to 6 as you did for the other nodes as part of the prerequisites:

config/kraft/server.properties
...
num.partitions=6
...

Next, find the following lines:

config/kraft/server.properties
...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...

These parameters determine the count of nodes that must be in sync regarding the internal metadata of topics (consumer offsets and transaction states). To ensure redundancy, set them to a number that’s less than the number of nodes in the cluster:

config/kraft/server.properties
...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...

When you’re done, save and close the file. If you’ve modified the replication parameters shown above, you’ll have to do the same on each node of your cluster, as well as add the proper domain name to listeners.

Recreating Storage

After completing the configuration, you’ll have to reinitialize the log storage on the new node. First, delete the existing log files by running:

rm -rf /home/kafka/kafka-logs/*

On the first node, show the existing cluster ID by running:

cat /home/kafka/kafka-logs/meta.properties

The output will be similar to this:

...
node.id=1
directory.id=Mvbt8cwgTAwUGRTwMscO-g
version=1
cluster.id=i-hvtE_3Tg6RMKc3G6oOyg

Note the cluster.id. On the fourth node, store that ID in a variable named KAFKA_CLUSTER_ID:

KAFKA_CLUSTER_ID="your_cluster_id"

Then, create the log storage by running:

./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

The output will be similar to this:

Output
... Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.

With this, you’ve completely configured the second node. Restart Kafka by running:

sudo systemctl restart kafka

After a minute, check that the cluster is expanded by listing info using kcat:

kcat -b kafka1.your_domain -L

The output will detail four nodes and topics:

Metadata for all topics (from broker 1: kafka1.your_domain:9092/1):
 4 brokers:
  broker 1 at kafka1.your_domain:9092
  broker 2 at kafka2.your_domain:9092 (controller)
  broker 3 at kafka3.your_domain:9092
  broker 4 at kafka4.your_domain:9092
 3 topics:
  topic "new-topic" with 6 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 2, leader 2, replicas: 2, isrs: 2
    partition 3, leader 3, replicas: 3, isrs: 3
    partition 4, leader 2, replicas: 2, isrs: 2
    partition 5, leader 1, replicas: 1, isrs: 1
  topic "__CruiseControlMetrics" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "__consumer_offsets" with 50 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 2, leader 1, replicas: 1, isrs: 1
    partition 3, leader 1, replicas: 1, isrs: 1
    partition 4, leader 1, replicas: 1, isrs: 1
...

You’ve successfully added a new node to your Kafka cluster. You can repeat this step for as many nodes as you need, but they won’t be used automatically. You’ll now learn how to rearrange topics accross the new node.

Step 2 - Migrating Topics To New Nodes

In this step, you’ll learn how to migrate topics in-between the cluster nodes using Cruise Control, as well as manually using the provided script for rearranging partitions on brokers.

Migrating Using the Included Script

As part of the Kafka installation, you are provided the kafka-reassign-partitions.sh script that allows you to create and execute plans for moving topics in-between cluster brokers. In the previous step, you’ve created the new_topic, and it’s only available on the first broker.

To make use of the new node, you’ll have to manually instruct Kafka to host the topic on both brokers. The included script will handle the process of determining exactly from which broker(s) the topics should be retrieved, and you have to instruct it on which brokers they should be available.

The scripts accepts a JSON file detailing which topics to move. You’ll store the code in a file named topics-to-migrate.json. Create and open it for editing:

nano topics-to-migrate.json

Add the following lines:

topics-to-migrate.json
{
    "topics": [
        {
            "topic": "new-topic"
        }
    ],
    "version": 1
}

Here, you reference new-topic under topics. Save and close the file when you’re done.

Since new-topic should be balanced across brokers all brokers, run the script to generate the migration plan:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-migrate.json --broker-list "1,2,3,4" --generate

Here, you pass in topics-to-migrate.json and the list of brokers, ordering it to --generate a new partition assignment.

The output will look like this:

Output
Current partition replica assignment {"version":1,"partitions":[{"topic":"new-topic","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"new-topic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"new-topic","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"new-topic","partition":3,"replicas":[3],"log_dirs":["any"]},{"topic":"new-topic","partition":4,"replicas":[2],"log_dirs":["any"]},{"topic":"new-topic","partition":5,"replicas":[1],"log_dirs":["any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"new-topic","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"new-topic","partition":1,"replicas":[4],"log_dirs":["any"]},{"topic":"new-topic","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"new-topic","partition":3,"replicas":[2],"log_dirs":["any"]},{"topic":"new-topic","partition":4,"replicas":[3],"log_dirs":["any"]},{"topic":"new-topic","partition":5,"replicas":[4],"log_dirs":["any"]}]}

The script generated the current and proposed partition layouts. You’ll store the second plan in a file named migration-plan.json, so create and open it for editing:

nano migration-plan.json

Add the proposed partition configuration:

migration-plan.json
{"version":1,"partitions":[{"topic":"new-topic","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"new-topic","partition":1,"replicas":[4],"log_dirs":["any"]},{"topic":"new-topic","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"new-topic","partition":3,"replicas":[2],"log_dirs":["any"]},{"topic":"new-topic","partition":4,"replicas":[3],"log_dirs":["any"]},{"topic":"new-topic","partition":5,"replicas":[4],"log_dirs":["any"]}]}

Save and close the file, then apply it by running:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

The output will be similar to this:

Output
Current partition replica assignment {"version":1,"partitions":[{"topic":"new-topic","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"new-topic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"new-topic","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"new-topic","partition":3,"replicas":[3],"log_dirs":["any"]},{"topic":"new-topic","partition":4,"replicas":[2],"log_dirs":["any"]},{"topic":"new-topic","partition":5,"replicas":[1],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for new-topic-0,new-topic-1,new-topic-2,new-topic-3,new-topic-4,new-topic-5

The migration of partitions is now in progress, and you can monitor it by passing in --verify:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

Once the migration is done, the output will be:

Output
Status of partition reassignment: Reassignment of partition new-topic-0 is completed. Reassignment of partition new-topic-1 is completed. Reassignment of partition new-topic-2 is completed. Reassignment of partition new-topic-3 is completed. Reassignment of partition new-topic-4 is completed. Reassignment of partition new-topic-5 is completed. Clearing broker-level throttles on brokers 1,2,3,4 Clearing topic-level throttles on topic new-topic

You can now list the layout of the new_topic using kcat:

kcat -b kafka1.your_domain:9092 -L

You’ll see that it’s evenly spread on both brokers:

Output
... topic "new-topic" with 6 partitions: partition 0, leader 3, replicas: 3, isrs: 3 partition 1, leader 4, replicas: 4, isrs: 4 partition 2, leader 1, replicas: 1, isrs: 1 partition 3, leader 2, replicas: 2, isrs: 2 partition 4, leader 3, replicas: 3, isrs: 3 partition 5, leader 4, replicas: 4, isrs: 4 ...

You’ve seen how to use the included Kafka partition migration script to rebalance the partitions across brokers in the cluster. You’ll now learn how to balance Kafka brokers using Cruise Control automatically.

Migrating Using Cruise Control

Alternatively, you can use Cruise Control to easily arrange all topics to any number of new brokers without having to manually specify them or generate accompanying plans.

As part of the prerequisites, you’ve expanded the capacity.json configuration file to cover the three nodes. Since you now have four, you’ll need to add it to the file. Navigate to the cruise-control directory, then open it for editing by running:

nano config/capacities.json

Add configuration for the fourth broker:

config/capacities.json
...
    {
      "brokerId": "3",
      "capacity": {
        "DISK": "500000",
        "CPU": "100",
        "NW_IN": "50000",
        "NW_OUT": "50000"
      },
      "doc": ""
    },
    {
      "brokerId": "4",
      "capacity": {
        "DISK": "500000",
        "CPU": "100",
        "NW_IN": "50000",
        "NW_OUT": "50000"
      },
      "doc": ""
    }
...

Save and close the file when you’re done. Then, run Cruise Control in a separate terminal:

./kafka-cruise-control-start.sh config/cruisecontrol.properties

Finally, add the broker with ID 4 using cccli by running:

cccli -a localhost:9090 add-broker 4

The output will be long and will look similar to this:

Output
Starting long-running poll of http://localhost:9090/kafkacruisecontrol/add_broker?brokerid=4&allow_capacity_estimation=False&dryrun=True Optimization has 19 inter-broker replica(0 MB) moves, 0 intra-broker replica(0 MB) moves and 15 leadership moves with a cluster model of 1 recent windows and 100.000% of the partitions covered. Excluded Topics: []. Excluded Brokers For Leadership: []. Excluded Brokers For Replica Move: []. Counts: 4 brokers 185 replicas 5 topics. On-demand Balancedness Score Before (71.541) After(71.541). Provision Status: OVER_PROVISIONED. Provision Recommendation: [RackAwareGoal] Remove at least 1 rack with brokers. [ReplicaDistributionGoal] Remove at least 4 brokers. ... Cluster load after adding broker [4]: ...

As this is a cluster with minimal traffic, Cruise Control recommends downsizing it.

List cluster info using kcat again, and you’ll see that new-topic has been balanced across the brokers with no manual input necessary:

Output
... topic "new-topic" with 6 partitions: partition 0, leader 3, replicas: 3, isrs: 3 partition 1, leader 4, replicas: 4, isrs: 4 partition 2, leader 2, replicas: 2, isrs: 2 partition 3, leader 3, replicas: 3, isrs: 3 partition 4, leader 2, replicas: 2, isrs: 2 partition 5, leader 4, replicas: 4, isrs: 4 ...

In this section, you’ve seen how to migrate topic partitions to new brokers in the cluster. You can manually specify topics to rearrange and the brokers where they should be available, and pass them to the provided kafka-reassign-partitions.sh script. You can also rely on Cruise Control, which will automatically rearrange all topics to the new broker with no further input necessary; you just have to provide the ID of the new broker. In the next step, you’ll learn how to process events in Kafka using an SQL-like syntax with ksqlDB.

Step 3 - Structured Stream Processing with ksqlDB

In this step, you’ll deploy ksqlDB and its CLI shell using Docker Compose. You’ll use it to aggregate a stream of events representing temperature measurements from multiple sensors that report to a Kafka topic.

In comparison to Kafka Streams, which is a Java client library providing an API for accepting and processing events from Kafka in your applications, ksqlDB is a database that allows you to process events from Kafka topics using an SQL-like syntax easily. ksqlDB uses Kafka Streams internally and abstracts much of the programming that’s required for achieving the same level of functionality manually.

You’ll store the Docker Compose configuration in a file named ksqldb-compose.yaml. Create and open it for editing by running:

nano ~/ksqldb-compose.yaml

Add the following lines:

~/ksqldb-compose.yaml
services:
  ksqldb:
    image: confluentinc/ksqldb-server:latest
    hostname: ksqldb
    ports:
      - "8088:8088"
    healthcheck:
      test: curl -f http://ksqldb:8088/ || exit 1
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka1.your_domain:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    depends_on:
      - ksqldb
    entrypoint: /bin/sh
    tty: true

You define two services, ksqldb and ksqldb-cli which represent the database and its CLI interface, respectively. You configure a health check for the database by polling its exposed port (8088) and providing a reference to your Kafka cluster in the KSQL_BOOTSTRAP_SERVERS config parameter. You also configure it to automatically create streams and topics if they do not exist, which you’ll learn about in this step.

Remember to replace kafka1.your_domain with the actual domain name, then save and close the file.

Create the Compose deployment by running:

docker compose -f ~/ksqldb-compose.yaml up -d

The end of the output will be similar to this:

Output
... [+] Running 3/3 ✔ Network root_default Created 0.1s ✔ Container root-ksqldb-1 Started 0.4s ✔ Container ksqldb-cli Started 0.5s

With the ksqlDB server now running, enter the CLI by running:

docker exec -it ksqldb-cli ksql http://ksqldb:8088

You’ll enter the CLI shell, from which you can query and configure the database:

Output
... =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = The Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2022 Confluent Inc. CLI v0.28.2, Server v0.28.2 located at http://ksqldb:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>

By default, ksqlDB won’t fetch topic messages from the beginning when accessing them for the first time. To remedy this, set the auto.offset.reset configuration parameter to earliest by running:

SET 'auto.offset.reset'='earliest';

You’ll get the following output:

Output
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

As the database is directly connected to your Kafka cluster, you can list the topics by running:

list topics;

The output will detail their names, partitions, and replicas:

Output
Kafka Topic | Partitions | Partition Replicas ------------------------------------------------------------------------------ __CruiseControlMetrics | 1 | 1 __KafkaCruiseControlModelTrainingSamples | 32 | 2 __KafkaCruiseControlPartitionMetricSamples | 32 | 2 default_ksql_processing_log | 1 | 1 new-topic | 6 | 1 ------------------------------------------------------------------------------

Now that you’ve tested the connection to your Kafka cluster, you’ll learn how to use ksqlDB and its streaming functionality.

Creating Streams Based on Topics

Classic relational databases are based on the concept of a table, which contains rows partitioned into columns that hold data in a structured way, and the table itself presents the current state. ksqlDB bridges event streaming and the traditional table model with streams, which represent a series of events signifying data creation, modification, or removal.

Streams are based on Kafka topics, and their difference is in the semantic interpretation of the data they hold. While a table shows the latest state of data, the accompanying stream allows you to replay the events that led up to it.

You’ll create and interact with a stream holding temperature measurements from sensors. To create it, run the following command in ksqlDB CLI:

CREATE STREAM MEASUREMENTS (measurement_id VARCHAR, sensor_name VARCHAR, temperature DOUBLE)
WITH (VALUE_FORMAT='JSON', PARTITIONS=2, KAFKA_TOPIC='MEASUREMENTS');

Here, you CREATE a STREAM called MEASUREMENTS. Then, similarly to SQL, you specify the fields, namely measurement_id as the event ID as well as sensor_name and the temperature reading itself. You also specify that the underlying structure should be represented as JSON in a topic called MEASUREMENTS. Thanks to the Docker Compose configuration from earlier, the topic will be automatically created in Kafka.

The output will look like this:

Output
Message ---------------- Stream created ----------------

You can now list all streams to confirm that it’s been created:

list streams;

The output will be similar to this:

Output
Stream Name | Kafka Topic | Key Format | Value Format | Windowed ------------------------------------------------------------------------------------------ KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false MEASUREMENTS | MEASUREMENTS | KAFKA | JSON | false ------------------------------------------------------------------------------------------

As with a standard database, you can insert data into the stream:

INSERT INTO MEASUREMENTS (measurement_id, sensor_name, temperature) VALUES ('1', 'first', 100.0);

There will be no output upon a successful INSERT. Then, retrieve all events from the stream by running:

SELECT * FROM MEASUREMENTS EMIT CHANGES;

As streams are based on Kafka topics, by specifying EMIT CHANGES, you instruct the database to stream events from the underlying topic in real time. You’ll see it retrieves the entry you’ve just made:

Output
+-------------------------------+-------------------------------+-------------------------------+ |MEASUREMENT_ID |SENSOR_NAME |TEMPERATURE | +-------------------------------+-------------------------------+-------------------------------+ |1 |first |100.0 |

In a separate terminal, retrieve all messages from the measurements topic to see what the underlying data looks like:

./bin/kafka-console-consumer.sh --bootstrap-server kafka1.your_domain:9092 --topic MEASUREMENTS --from-beginning

You’ll receive the following output, showing the entry as JSON:

Output
{"MEASUREMENT_ID":"1","SENSOR_NAME":"first","TEMPERATURE":100.0}

To verify that ksqlDB is actually streaming from the topic, first run the kafka-console-producer.sh script:

./bin/kafka-console-producer.sh --bootstrap-server kafka1.your_domain:9092 --topic MEASUREMENTS

Input the following message and press ENTER to produce it:

{"MEASUREMENT_ID":"2","SENSOR_NAME":"first","TEMPERATURE":120.0}

Then, return to the ksqlDB CLI. You’ll see that the new measurement is shown immediately:

Output
+-------------------------------+-------------------------------+-------------------------------+ |MEASUREMENT_ID |SENSOR_NAME |TEMPERATURE | +-------------------------------+-------------------------------+-------------------------------+ |1 |first |100.0 | |2 |first |120.0 |

Press CTRL+C to exit the live query mode.

You have now populated the MEASUREMENTS stream with two events. You’ll now create a table based on it.

Creating a Table

While streams represent the unbounded data that comes in through Kafka, tables allow you to summarize and retrieve aggregated data, akin to SQL views. You’ll now create a table called MEASUREMENT_MAX_TEMP that will summarize each sensor’s maximum recorder temperature value. Run the following code to create it:

CREATE TABLE MEASUREMENT_MAX_TEMP AS SELECT sensor_name, MAX(TEMPERATURE) AS max_sensor_temp
FROM MEASUREMENTS GROUP BY sensor_name;

You define what fields and their transformations should be represented (sensor_name and maximum temperature, respectively). The output will be similar to this:

Output
Message --------------------------------------------------- Created query with ID CTAS_MEASUREMENT_MAX_TEMP_5 ---------------------------------------------------

ksqlDB has created an underlying query to retrieve the requested data.

You can now SELECT from the table by running:

select * from MEASUREMENT_MAX_TEMP EMIT CHANGES;

The output will be:

Output
+--------------------------------------------+--------------------------------------------+ |SENSOR_NAME |MAX_SENSOR_TEMP | +--------------------------------------------+--------------------------------------------+ |first |120.0 |

In the secondary terminal, input the following message into the console producer script, representing a temperature measurement from a different sensor:

{"MEASUREMENT_ID":"3","SENSOR_NAME":"second","TEMPERATURE":200.0}

Return to the database CLI, and you’ll see streamed into the table:

Output
+--------------------------------------------+--------------------------------------------+ |SENSOR_NAME |MAX_SENSOR_TEMP | +--------------------------------------------+--------------------------------------------+ |first |120.0 | |second |200.0 |

Then, produce another event for the second sensor, but modify the temperature:

{"MEASUREMENT_ID":"4","SENSOR_NAME":"second","TEMPERATURE":300.0}

You’ll see that the change is represented by a new event:

Output
+--------------------------------------------+--------------------------------------------+ |SENSOR_NAME |MAX_SENSOR_TEMP | +--------------------------------------------+--------------------------------------------+ |first |120.0 | |second |200.0 | |second |300.0 |

In this step, you’ve learned how to deploy and configure ksqlDB using Docker Compose. You’ve also seen how to create streams based on Kafka topics and run aggregations on them with tables.

Conclusion

In this tutorial, you’ve seen how to rearrange existing Kafka topics across a cluster that’s just been expanded with a node. You’ve seen how to accomplish that manually, using the provided script and automatically leveraging Cruise Control. You’ve also learned how to deploy ksqlDB and seen how to aggregate and structure events stored in topics.


The author selected Free and Open Source Fund to receive a donation as part of the Write for DOnations program.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products


About the authors
Default avatar
Savic

author


Default avatar

Sr Technical Writer


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Featured on Community

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel