IoT Data Processing with KSML
This guide demonstrates how to build an IoT data processing application using KSML. You'll learn how to ingest, process, and analyze high-volume sensor data from IoT devices.
Introduction
Internet of Things (IoT) applications generate massive amounts of data from distributed sensors and devices. Processing this data in real-time presents several challenges:
- Handling high-volume, high-velocity data streams
- Managing device state and context
- Processing geospatial data
- Implementing edge-to-cloud data pipelines
- Detecting anomalies and patterns in sensor readings
KSML provides powerful capabilities for building scalable IoT data processing pipelines that address these challenges.
Prerequisites
Before starting this guide, you should:
- Understand basic KSML concepts (streams, functions, pipelines)
- Have completed the KSML Basics Tutorial
- Be familiar with Aggregations
- Have a basic understanding of State Stores
The Use Case
Imagine you're managing a smart building system with thousands of IoT sensors monitoring:
- Temperature and humidity in different rooms
- Energy consumption of various appliances and systems
- Occupancy and movement patterns
- Air quality metrics
You want to process this data in real-time to:
- Monitor building conditions and detect anomalies
- Optimize energy usage based on occupancy patterns
- Track device health and identify maintenance needs
- Generate aggregated analytics for building performance
Define the topics for the use case
In earlier tutorials, you created a Docker Compose file with all the necessary containers. For this use case guide, some other topics
are needed.
To have these created, open the docker-compose.yml
in the examples directory, and find the definitions for the kafka-setup
container
which creates the topics.
Change the definition so that the startup command for the setup container (the command
section) looks like the following:
command
section for the kafka-setup container (click to expand)
command: "bash -c 'echo Creating topics... && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic iot_sensor_readings && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temperature_alerts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_status && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic energy_consumption && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic proximity_alerts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic building_analytics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic edge_processed_readings && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic room_temperature_stats'"
Defining the Data Model
Our IoT sensor data will have the following structure:
{
"device_id": "sensor-123",
"timestamp": 1625097600000,
"device_type": "temperature_sensor",
"location": {
"building": "headquarters",
"floor": 3,
"room": "conference-a",
"coordinates": {
"lat": 37.7749,
"lng": -122.4194
}
},
"readings": {
"temperature": 22.5,
"humidity": 45.2,
"energy": 70
},
"battery_level": 87,
"status": "active"
}
Creating the KSML Definition
Now, let's create our KSML definition file:
IoT data processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
streams:
sensor_readings:
topic: iot_sensor_readings
keyType: string # device_id
valueType: json # sensor data
temperature_alerts:
topic: temperature_alerts
keyType: string # device_id
valueType: json # alert data
device_status:
topic: device_status
keyType: string # device_id
valueType: json # status information
energy_consumption:
topic: energy_consumption
keyType: string # location
valueType: json # aggregated energy data
building_analytics:
topic: building_analytics
keyType: string # building/floor
valueType: json # aggregated analytics
functions:
extract_temperature:
type: valueTransformer
expression: value.get("readings", {}).get("temperature")
check_temperature_threshold:
type: predicate
code: |
temp = value.get("readings", {}).get("temperature")
return temp is not None and (temp > 30.0 or temp < 10.0)
create_temperature_alert:
type: valueTransformer
code: |
temp = value.get("readings", {}).get("temperature")
return {
"device_id": value.get("device_id"),
"timestamp": value.get("timestamp"),
"location": value.get("location"),
"alert_type": "temperature_threshold",
"reading": temp,
"status": "critical" if (temp > 35.0 or temp < 5.0) else "warning"
}
check_battery_level:
type: predicate
expression: value.get("battery_level", 100) < 20
create_battery_alert:
type: valueTransformer
expression: |
{
"device_id": value.get("device_id"),
"timestamp": value.get("timestamp"),
"location": value.get("location"),
"alert_type": "low_battery",
"battery_level": value.get("battery_level"),
"status": "warning"
}
pipelines:
# Pipeline for temperature monitoring
temperature_monitoring:
from: sensor_readings
via:
- type: filter
if: check_temperature_threshold
- type: transformValue
mapper: create_temperature_alert
to: temperature_alerts
# Pipeline for device status monitoring
device_status_monitoring:
from: sensor_readings
via:
- type: filter
if: check_battery_level
- type: transformValue
mapper: create_battery_alert
to: device_status
# Pipeline for energy consumption analytics
energy_analytics:
from: sensor_readings
via:
- type: filter
if:
expression: value.get("device_type") == "energy_meter"
- type: groupBy
mapper:
expression: value.get("location", {}).get("building") + "-" + str(value.get("location", {}).get("floor"))
- type: aggregate
store:
name: energy_consumption_store
type: keyValue
keyType: string
valueType: json
initializer:
expression: |
{"total_consumption": 0, "count": 0}
resultType: json
aggregator:
expression: |
{
"total_consumption": aggregatedValue.get("total_consumption") + value.get("readings", {}).get("energy", 0),
"count": aggregatedValue.get("count") + 1,
"average": (aggregatedValue.get("total_consumption") + value.get("readings", {}).get("energy", 0)) / (aggregatedValue.get("count") + 1)
}
resultType: json
- type: toStream
to: energy_consumption
Processing Geospatial Data
IoT applications often involve geospatial data processing. Here's how to handle location-based analytics with KSML:
Geospatial data processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
streams:
sensor_readings:
topic: iot_sensor_readings
keyType: string
valueType: json
proximity_alerts:
topic: proximity_alerts
keyType: string
valueType: json
functions:
haversine:
type: generic
parameters:
- name: lat1
type: double
- name: lon1
type: double
- name: lat2
type: double
- name: lon2
type: double
globalCode: |
# Import only once
import math
code: |
R = 6371 # Earth radius in kilometers
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = R * c
expression: distance
resultType: double
calculate_distance:
type: valueTransformer
code: |
# Reference point (e.g., building entrance)
ref_lat = 52.0903304
ref_lng = 5.1063283
# Device coordinates
device_lat = value.get("location", {}).get("coordinates", {}).get("lat")
device_lng = value.get("location", {}).get("coordinates", {}).get("lng")
if device_lat is not None and device_lng is not None:
distance = haversine(ref_lat, ref_lng, device_lat, device_lng)
value["distance_from_reference"] = distance
return value
pipelines:
proximity_analysis:
from: sensor_readings
via:
- type: transformValue
mapper: calculate_distance
- type: filter
if:
expression: value.get("distance_from_reference", float('inf')) < 0.1 # Within 100 meters
to: proximity_alerts
Implementing Device State Tracking
For many IoT applications, tracking device state over time is crucial. Here's how to implement this using KSML's state stores:
Device state tracking processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
streams:
sensor_readings:
topic: iot_sensor_readings
keyType: string
valueType: json
device_status:
topic: device_status
keyType: string
valueType: json
stores:
device_state_store:
type: keyValue
persistent: true
functions:
update_device_state:
type: valueTransformer
code: |
# Get previous state
previous_state = device_state_store.get(key)
# Create new state
new_state = {
"device_id": value.get("device_id"),
"last_reading_time": value.get("timestamp"),
"current_readings": value.get("readings"),
"battery_level": value.get("battery_level"),
"status": value.get("status")
}
# Add derived fields
if previous_state is not None:
# Calculate time since last reading
time_diff = (value.get("timestamp") - previous_state.get("last_reading_time")) / 1000 # in seconds
new_state["seconds_since_last_reading"] = time_diff
# Calculate rate of change for temperature
prev_temp = previous_state.get("current_readings", {}).get("temperature")
curr_temp = value.get("readings", {}).get("temperature")
if prev_temp is not None and curr_temp is not None and time_diff > 0:
new_state["temperature_change_rate"] = (curr_temp - prev_temp) / time_diff # degrees per second
# Update store
device_state_store.put(key, new_state)
return new_state
resultType: struct
stores:
- device_state_store
pipelines:
device_state_tracking:
from: sensor_readings
via:
- type: transformValue
mapper: update_device_state
to: device_status
Edge-to-Cloud Processing
IoT architectures often involve processing at the edge before sending data to the cloud. KSML can be deployed at both edge and cloud levels:
Edge Processing
At the edge, you might want to filter, aggregate, and compress data before sending it to the cloud:
Edge processing pipeline (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
streams:
raw_sensor_readings:
topic: iot_sensor_readings
keyType: string
valueType: json
edge_processed_readings:
topic: edge_processed_readings
keyType: string
valueType: json
pipelines:
edge_preprocessing:
from: raw_sensor_readings
via:
- type: filter
if:
code: |
# Forward temperatures outside comfortable range (more data for cloud analytics)
temp = value.get("readings", {}).get("temperature")
return temp is not None and (temp < 18.0 or temp > 26.0)
- type: transformValue
mapper:
code: |
# Simplify payload for transmission
return {
"id": value.get("device_id"),
"ts": value.get("timestamp"),
"loc": value.get("location", {}).get("room"),
"temp": value.get("readings", {}).get("temperature"),
"hum": value.get("readings", {}).get("humidity")
}
to: edge_processed_readings
Cloud Processing
In the cloud, you can perform more complex analytics and aggregations:
Cloud processing pipeline (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
streams:
edge_processed_readings:
topic: edge_processed_readings
keyType: string
valueType: json
room_temperature_stats:
topic: room_temperature_stats
keyType: windowed(string)
valueType: json
pipelines:
cloud_analytics:
from: edge_processed_readings
via:
- type: transformValue
mapper:
code: |
# Expand abbreviated fields
return {
"device_id": value.get("id"),
"timestamp": value.get("ts"),
"location": {"room": value.get("loc")},
"readings": {
"temperature": value.get("temp"),
"humidity": value.get("hum")
}
}
- type: groupBy
mapper:
expression: value.get("location", {}).get("room")
- type: windowByTime
windowType: tumbling
duration: 20s
- type: aggregate
store:
name: room_temp_window_store
type: window
keyType: string
valueType: json
windowSize: 20s
retention: 2m
initializer:
expression: |
{
"room": None,
"min_temp": None,
"max_temp": None,
"avg_temp": 0.0,
"count": 0
}
resultType: json
aggregator:
resultType: json
code: |
temp = value.get("readings", {}).get("temperature")
room = value.get("location", {}).get("room")
count = aggregatedValue.get("count")
# Handle first reading
if count == 0:
return {
"room": room,
"min_temp": temp,
"max_temp": temp,
"avg_temp": temp,
"count": 1
}
else:
return {
"room": room,
"min_temp": min(aggregatedValue.get("min_temp"), temp),
"max_temp": max(aggregatedValue.get("max_temp"), temp),
"avg_temp": (aggregatedValue.get("avg_temp") * count + temp) / (count + 1),
"count": count + 1
}
- type: toStream
- type: convertKey
into: json:windowed(string)
to: room_temperature_stats
Testing and Validation
To test your IoT data processing pipeline:
- Generate sample IoT data using a simulator or replay historical data
- Deploy your KSML application using the proper configuration
- Monitor the output topics to verify correct processing
- Use visualization tools to display the processed data
The following producer pipeline can serve as a starting point to generate sample data. This pipeline wll produce sample measurements from three separate rooms with two sensors each, containing randomized data. Occasionally some outlier values are generated so that the alerts will be visible.
IoT sample data generator (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
streams:
sensor_readings:
topic: iot_sensor_readings
keyType: string # device_id
valueType: json # sensor data
functions:
generate_sensor_reading:
type: generic
resultType: boolean
expression: False
globalCode: |
import random
import time
# Static data
rooms = ["conference-a", "conference-b", "conference-c"]
device_types = ["temperature_sensor", "energy_meter"]
# Counter for generating unique device IDs
global counter
counter = 0
def generate_reading():
global counter
counter += 1
# Select room and device type
room = rooms[counter % len(rooms)]
device_type = device_types[counter % len(device_types)]
device_id = f"{device_type}-{room}"
# Generate random readings, generate extreme temperature every 50 readings
if counter % 50 == 0:
temperature = random.choice([4.0, 36.0])
else:
temperature = round(random.uniform(19.0, 24.0), 1)
humidity = round(random.uniform(40.0, 70.0), 1)
energy = random.randint(50, 150)
# Create sensor reading
reading = {
"device_id": device_id,
"timestamp": int(time.time() * 1000),
"device_type": device_type,
"location": {
"building": "headquarters",
"floor": 3,
"room": room,
"coordinates": {
"lat": 37.7749,
"lng": -122.4194
}
},
"readings": {
"temperature": temperature,
"humidity": humidity,
"energy": energy
},
"battery_level": random.randint(17, 100),
"status": "active"
}
return reading
producers:
iot_sensor_producer:
to: sensor_readings
interval: 1000 # Generate every second
generator:
code: |
reading = generate_reading()
expression: '(reading.get("device_id"), reading)'
resultType: (string, json)
Production Considerations
When deploying IoT data processing pipelines to production:
- Scalability: Ensure your Kafka cluster can handle the volume of IoT data
- Fault Tolerance: Configure proper replication and error handling
- Data Retention: Set appropriate retention policies for raw and processed data
- Security: Implement device authentication and data encryption
- Monitoring: Set up alerts for anomalies in both the data and the processing pipeline
- Edge Deployment: Consider deploying KSML at the edge for preprocessing
Conclusion
KSML provides a powerful and flexible way to process IoT data streams. By combining real-time processing with state management and windowed operations, you can build sophisticated IoT applications that derive valuable insights from your device data.
For more advanced IoT scenarios, explore:
- Complex Event Processing for pattern detection
- Custom Functions for domain-specific processing
- KSML Definition Reference for a full explanation of the KSML definition syntax