With the exponential growth of connected devices in today’s data-driven world, the ability to process billions of pieces of information at scale has become essential. MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for resource-constrained devices and low-bandwidth networks, making it ideal for IoT applications. An MQTT broker like Coreflux and a scalable cloud platform like DigitalOcean can solve the challenges of processing and analyzing IoT data.
This tutorial will teach you how to connect an MQTT broker with a managed OpenSearch service on DigitalOcean. This seamless setup enables real-time data collection and storage, making monitoring, analyzing, and visualizing your IoT data more accessible.
Coreflux provides a Lightweight MQTT Broker for Efficient IoT Communication on DigitalOcean.
MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe network protocol widely adopted in IoT ecosystems. Designed for constrained devices and low-bandwidth, high-latency, or unreliable networks, MQTT enables efficient, real-time messaging in bandwidth-constrained environments.
Coreflux offers a lightweight MQTT broker to facilitate efficient, real-time communication between IoT devices and applications. Built for scalability and reliability, Coreflux is tailored for environments where low latency and high throughput are critical.
Coreflux provides the robust messaging backbone to ensure smooth data flow between devices, whether developing a small-scale IoT project or deploying a large-scale industrial monitoring system.
With Coreflux on DigitalOcean, you get:
Scalability: Easily handle growing amounts of data and devices without compromising performance.
Reliability: Ensure consistent and dependable messaging across all connected devices.
Efficiency: Optimize bandwidth usage in environments where network resources are limited.
The above screenshot is a real-life example of solar-power park monitoring on OpenSearch Dashboards
Before diving into the integration, make sure you have the following:
A DigitalOcean account. If you don’t have one, sign up for an account at DigitalOcean.
Coreflux Broker Setup: Coreflux Broker should be running and accessible. If it’s not set up yet, refer to the Coreflux documentation or check the initial steps in this guide.
MQTT Explorer: This tool interacts with the MQTT broker. You can download it from MQTT Explorer.
Python Environment: Ensure you have Python installed and the necessary libraries like paho-mqtt
and OpenSearch-py
.
Python Script: You’ll need the Python script that bridges the Coreflux MQTT broker with your DigitalOcean OpenSearch instance. This script checks published MQTT messages, processes them, and stores them in OpenSearch.
If you are new to Python? Here’s a basic outline of what the Python script above does:
Connects to Coreflux: The script uses paho-mqtt to connect to your Coreflux MQTT broker.
paho-mqtt
to connect to your Coreflux MQTT broker.OpenSearch-py
.
• Publishes Feedback: After processing, the script can publish feedback messages back to the MQTT broker alerting of errors or task completion.You can watch the Coreflux Tutorial on how to start a Free Trial of the Online MQTT Broker quickly:
Or you can follow this step-by-step guide:
Create a Coreflux Account
Start a Free Trial Broker
Receive Broker Credentials
Set Up MQTT Explorer
Test the Broker Connection
Now that your Coreflux MQTT broker is set up and tested, it’s time to connect it to a managed OpenSearch instance on DigitalOcean. Here’s how:
Log in to DigitalOcean:
Create a New Database:
Configure Your OpenSearch Instance:
Create the Cluster:
Get Your Connection Details:
Before you start indexing data from your Coreflux MQTT broker into OpenSearch, you need to define the mapping for your index. Mapping is the schema for your index, specifying the data types for each field in your documents. This step is crucial for ensuring the data is stored correctly and can be searched effectively.
Here’s how to create and map an index in your OpenSearch instance:
Log in to the OpenSearch dashboard using the connection details you obtained when setting up the OpenSearch instance.Navigate to the “Index Management” section
Click on Create Index to start the process. Enter a name for your index (e.g., machine_production)
Click on the Mappings tab during the index creation process. Here, you will define the fields that your data will have. For example:
{
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"machine_id": {
"type": "keyword"
},
"temperature": {
"type": "float"
},
"status": {
"type": "keyword"
},
"error_code": {
"type": "integer"
}
}
}
}
In this example:
After defining your mappings, review the settings and click on Create Index.
OpenSearch will now create the index with the mappings you specified. This index is now ready to store and organize the data that will be published from your Coreflux MQTT broker.
Use your Python script or the OpenSearch API to index a test document and ensure it matches the defined mapping.
Example of a test document:
{
"timestamp": "2024-08-23T10:30:00Z",
"machine_id": "MACHINE123",
"temperature": 75.5,
"status": "operational",
"error_code": 0
}
Insert this document into the machine-production
index (or the index you choose) and verify that all fields are correctly stored and searchable.
With Coreflux and OpenSearch set up, it’s time to link them together using a Python script. This script will connect to the Coreflux broker, process published messages, and store them in OpenSearch.
In the directory where your Python script is located, create a .env
file.
Add the following environment variables, replacing the placeholder values with your actual credentials (note: if the MQTT url begins with MQTT://
, please remove that section. Since the code only requires the DNS.):
MQTT_BROKER=<your-coreflux-broker-url>
MQTT_PORT=1883
MQTT_USERNAME=<your-coreflux-username>
MQTT_PASSWORD=<your-coreflux-password>
OPENSEARCH_HOST=<your-opensearch-host>
OPENSEARCH_USERNAME=<your-opensearch-username>
OPENSEARCH_PASSWORD=<your-opensearch-password>
Ensure you have the necessary Python libraries installed. You can install them using pip
.
pip install paho-mqtt Opensearch-py python-dotenv
Use the Python script, which connects to the Coreflux MQTT broker, listens for published messages in the topic Machine/Produce, and indexes them into OpenSearch.
Make sure the script correctly references the environment variables you set up.
Execute the Python script. It should connect to the Coreflux broker, subscribe to the desired topics, and index published messages into your OpenSearch instance.
python mqttToOS.py
Monitor the output to ensure that messages are processed and stored correctly.
– Data Validation: Verify that the data in OpenSearch matches the payloads you published. Check for consistency and accuracy, ensuring your integration is working as expected.
– Real-Time Monitoring: Set up a real-time feed using MQTT Explorer to publish messages continuously. Watch how OpenSearch handles incoming data streams and explore how quickly you can retrieve and analyze the data.
– Create Dashboards: Use OpenSearch’s dashboarding tools to create dynamic dashboards that visualize your IoT data. You could track metrics like device uptime, sensor readings, or user interactions.
– Trend Analysis: Analyze trends over time by aggregating data in OpenSearch. Look for patterns, spikes, or anomalies in your data.
– Geo-Visualizations: If your data includes geographic information, create maps that display data points based on location. This is especially useful for IoT devices spread across different regions.
Performance Tuning: Experiment with different broker and OpenSearch configurations to optimize performance. Adjust your Coreflux broker settings to improve efficiency, such as connection limits or message retention policies. You can also learn about more advanced configurations for DigitalOcean’s Droplet.
Load Testing: Simulate high traffic by publishing many messages simultaneously. Monitor how your Coreflux broker and OpenSearch instance handle the load and identify any bottlenecks or areas for improvement.
Scaling: DigitalOcean offers scaling, allowing you to increase the resources (CPU, RAM, or storage) of your Droplets as your data needs grow. You can also set up alerts to notify you when resource limits are approaching.
Integrating Coreflux MQTT Broker with DigitalOcean’s Managed OpenSearch service provides a powerful solution for real-time IoT data processing and analytics. Following this tutorial, you have set up a seamless data pipeline that allows you to collect, process, and visualize IoT data efficiently.
With Coreflux’s scalability and reliability and OpenSearch’s robust search and analytics capabilities, you can handle large volumes of data and gain valuable insights in real-time. Whether you are monitoring industrial systems, tracking environmental data, or managing smart home devices, this integration empowers you to make data-driven decisions quickly and effectively.
You can check here to learn how to start with OpenSearch on DigitalOcean.
Get a free Coreflux Online MQTT Broker trial or learn more with the Coreflux Docs and Tutorials.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!