Skip to content

IoT Data Processing with KSML

This guide demonstrates how to build an IoT data processing application using KSML. You'll learn how to ingest, process, and analyze high-volume sensor data from IoT devices.

Introduction

Internet of Things (IoT) applications generate massive amounts of data from distributed sensors and devices. Processing this data in real-time presents several challenges:

  • Handling high-volume, high-velocity data streams
  • Managing device state and context
  • Processing geospatial data
  • Implementing edge-to-cloud data pipelines
  • Detecting anomalies and patterns in sensor readings

KSML provides powerful capabilities for building scalable IoT data processing pipelines that address these challenges.

Prerequisites

Before starting this guide, you should:

The Use Case

Imagine you're managing a smart building system with thousands of IoT sensors monitoring:

  • Temperature and humidity in different rooms
  • Energy consumption of various appliances and systems
  • Occupancy and movement patterns
  • Air quality metrics

You want to process this data in real-time to:

  1. Monitor building conditions and detect anomalies
  2. Optimize energy usage based on occupancy patterns
  3. Track device health and identify maintenance needs
  4. Generate aggregated analytics for building performance

Define the topics for the use case

In earlier tutorials, you created a Docker Compose file with all the necessary containers. For this use case guide, some other topics are needed. To have these created, open the docker-compose.yml in the examples directory, and find the definitions for the kafka-setup container which creates the topics.
Change the definition so that the startup command for the setup container (the command section) looks like the following:

command section for the kafka-setup container (click to expand)
command: "bash -c 'echo Creating topics... && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic iot_sensor_readings && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temperature_alerts && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_status && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic energy_consumption && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic proximity_alerts && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic building_analytics && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic edge_processed_readings && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic room_temperature_stats'"

Defining the Data Model

Our IoT sensor data will have the following structure:

{
  "device_id": "sensor-123",
  "timestamp": 1625097600000,
  "device_type": "temperature_sensor",
  "location": {
    "building": "headquarters",
    "floor": 3,
    "room": "conference-a",
    "coordinates": {
      "lat": 37.7749,
      "lng": -122.4194
    }
  },
  "readings": {
    "temperature": 22.5,
    "humidity": 45.2,
    "energy": 70
  },
  "battery_level": 87,
  "status": "active"
}

Creating the KSML Definition

Now, let's create our KSML definition file:

IoT data processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

streams:
  sensor_readings:
    topic: iot_sensor_readings
    keyType: string  # device_id
    valueType: json  # sensor data

  temperature_alerts:
    topic: temperature_alerts
    keyType: string  # device_id
    valueType: json  # alert data

  device_status:
    topic: device_status
    keyType: string  # device_id
    valueType: json  # status information

  energy_consumption:
    topic: energy_consumption
    keyType: string  # location
    valueType: json  # aggregated energy data

  building_analytics:
    topic: building_analytics
    keyType: string  # building/floor
    valueType: json  # aggregated analytics

functions:
  extract_temperature:
    type: valueTransformer
    expression: value.get("readings", {}).get("temperature")

  check_temperature_threshold:
    type: predicate
    code: |
      temp = value.get("readings", {}).get("temperature")
      return temp is not None and (temp > 30.0 or temp < 10.0)

  create_temperature_alert:
    type: valueTransformer
    code: |
      temp = value.get("readings", {}).get("temperature")
      return {
        "device_id": value.get("device_id"),
        "timestamp": value.get("timestamp"),
        "location": value.get("location"),
        "alert_type": "temperature_threshold",
        "reading": temp,
        "status": "critical" if (temp > 35.0 or temp < 5.0) else "warning"
      }

  check_battery_level:
    type: predicate
    expression: value.get("battery_level", 100) < 20

  create_battery_alert:
    type: valueTransformer
    expression: |
      {
        "device_id": value.get("device_id"),
        "timestamp": value.get("timestamp"),
        "location": value.get("location"),
        "alert_type": "low_battery",
        "battery_level": value.get("battery_level"),
        "status": "warning"
      }

pipelines:
  # Pipeline for temperature monitoring
  temperature_monitoring:
    from: sensor_readings
    via:
      - type: filter
        if: check_temperature_threshold
      - type: transformValue
        mapper: create_temperature_alert
    to: temperature_alerts

  # Pipeline for device status monitoring
  device_status_monitoring:
    from: sensor_readings
    via:
      - type: filter
        if: check_battery_level
      - type: transformValue
        mapper: create_battery_alert
    to: device_status

  # Pipeline for energy consumption analytics
  energy_analytics:
    from: sensor_readings
    via:
      - type: filter
        if:
          expression: value.get("device_type") == "energy_meter"
      - type: groupBy
        mapper:
          expression: value.get("location", {}).get("building") + "-" + str(value.get("location", {}).get("floor"))
      - type: aggregate
        store:
          name: energy_consumption_store
          type: keyValue
          keyType: string
          valueType: json
        initializer:
          expression: |
            {"total_consumption": 0, "count": 0}
          resultType: json
        aggregator:
          expression: |
            {
              "total_consumption": aggregatedValue.get("total_consumption") + value.get("readings", {}).get("energy", 0),
              "count": aggregatedValue.get("count") + 1,
              "average": (aggregatedValue.get("total_consumption") + value.get("readings", {}).get("energy", 0)) / (aggregatedValue.get("count") + 1)
            }
          resultType: json
      - type: toStream
    to: energy_consumption

Processing Geospatial Data

IoT applications often involve geospatial data processing. Here's how to handle location-based analytics with KSML:

Geospatial data processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

streams:
  sensor_readings:
    topic: iot_sensor_readings
    keyType: string
    valueType: json

  proximity_alerts:
    topic: proximity_alerts
    keyType: string
    valueType: json

functions:
  haversine:
    type: generic
    parameters:
      - name: lat1
        type: double
      - name: lon1
        type: double
      - name: lat2
        type: double
      - name: lon2
        type: double
    globalCode: |
      # Import only once
      import math
    code: |
      R = 6371  # Earth radius in kilometers
      dlat = math.radians(lat2 - lat1)
      dlon = math.radians(lon2 - lon1)
      a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2
      c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
      distance = R * c
    expression: distance
    resultType: double

  calculate_distance:
    type: valueTransformer
    code: |
      # Reference point (e.g., building entrance)
      ref_lat = 52.0903304
      ref_lng = 5.1063283

      # Device coordinates
      device_lat = value.get("location", {}).get("coordinates", {}).get("lat")
      device_lng = value.get("location", {}).get("coordinates", {}).get("lng")

      if device_lat is not None and device_lng is not None:
        distance = haversine(ref_lat, ref_lng, device_lat, device_lng)
        value["distance_from_reference"] = distance

      return value

pipelines:
  proximity_analysis:
    from: sensor_readings
    via:
      - type: transformValue
        mapper: calculate_distance
      - type: filter
        if:
          expression: value.get("distance_from_reference", float('inf')) < 0.1  # Within 100 meters
    to: proximity_alerts

Implementing Device State Tracking

For many IoT applications, tracking device state over time is crucial. Here's how to implement this using KSML's state stores:

Device state tracking processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

streams:
  sensor_readings:
    topic: iot_sensor_readings
    keyType: string
    valueType: json

  device_status:
    topic: device_status
    keyType: string
    valueType: json

stores:
  device_state_store:
    type: keyValue
    persistent: true

functions:
  update_device_state:
    type: valueTransformer
    code: |
      # Get previous state
      previous_state = device_state_store.get(key)

      # Create new state
      new_state = {
        "device_id": value.get("device_id"),
        "last_reading_time": value.get("timestamp"),
        "current_readings": value.get("readings"),
        "battery_level": value.get("battery_level"),
        "status": value.get("status")
      }

      # Add derived fields
      if previous_state is not None:
        # Calculate time since last reading
        time_diff = (value.get("timestamp") - previous_state.get("last_reading_time")) / 1000  # in seconds
        new_state["seconds_since_last_reading"] = time_diff

        # Calculate rate of change for temperature
        prev_temp = previous_state.get("current_readings", {}).get("temperature")
        curr_temp = value.get("readings", {}).get("temperature")

        if prev_temp is not None and curr_temp is not None and time_diff > 0:
          new_state["temperature_change_rate"] = (curr_temp - prev_temp) / time_diff  # degrees per second

      # Update store
      device_state_store.put(key, new_state)

      return new_state
    resultType: struct
    stores:
      - device_state_store

pipelines:
  device_state_tracking:
    from: sensor_readings
    via:
      - type: transformValue
        mapper: update_device_state
    to: device_status

Edge-to-Cloud Processing

IoT architectures often involve processing at the edge before sending data to the cloud. KSML can be deployed at both edge and cloud levels:

Edge Processing

At the edge, you might want to filter, aggregate, and compress data before sending it to the cloud:

Edge processing pipeline (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

streams:
  raw_sensor_readings:
    topic: iot_sensor_readings
    keyType: string
    valueType: json

  edge_processed_readings:
    topic: edge_processed_readings
    keyType: string
    valueType: json

pipelines:
  edge_preprocessing:
    from: raw_sensor_readings
    via:
      - type: filter
        if:
          code: |
            # Forward temperatures outside comfortable range (more data for cloud analytics)
            temp = value.get("readings", {}).get("temperature")
            return temp is not None and (temp < 18.0 or temp > 26.0)
      - type: transformValue
        mapper:
          code: |
            # Simplify payload for transmission
            return {
              "id": value.get("device_id"),
              "ts": value.get("timestamp"),
              "loc": value.get("location", {}).get("room"),
              "temp": value.get("readings", {}).get("temperature"),
              "hum": value.get("readings", {}).get("humidity")
            }
    to: edge_processed_readings

Cloud Processing

In the cloud, you can perform more complex analytics and aggregations:

Cloud processing pipeline (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

streams:
  edge_processed_readings:
    topic: edge_processed_readings
    keyType: string
    valueType: json

  room_temperature_stats:
    topic: room_temperature_stats
    keyType: windowed(string)
    valueType: json

pipelines:
  cloud_analytics:
    from: edge_processed_readings
    via:
      - type: transformValue
        mapper:
          code: |
            # Expand abbreviated fields
            return {
              "device_id": value.get("id"),
              "timestamp": value.get("ts"),
              "location": {"room": value.get("loc")},
              "readings": {
                "temperature": value.get("temp"),
                "humidity": value.get("hum")
              }
            }
      - type: groupBy
        mapper:
          expression: value.get("location", {}).get("room")
      - type: windowByTime
        windowType: tumbling
        duration: 20s
      - type: aggregate
        store:
          name: room_temp_window_store
          type: window
          keyType: string
          valueType: json
          windowSize: 20s
          retention: 2m
        initializer:
          expression: |
            {
              "room": None,
              "min_temp": None,
              "max_temp": None,
              "avg_temp": 0.0,
              "count": 0
            }
          resultType: json
        aggregator:
          resultType: json
          code: |
            temp = value.get("readings", {}).get("temperature")
            room = value.get("location", {}).get("room")
            count = aggregatedValue.get("count")

            # Handle first reading
            if count == 0:
              return {
                "room": room,
                "min_temp": temp,
                "max_temp": temp,
                "avg_temp": temp,
                "count": 1
              }
            else:
              return {
                "room": room,
                "min_temp": min(aggregatedValue.get("min_temp"), temp),
                "max_temp": max(aggregatedValue.get("max_temp"), temp),
                "avg_temp": (aggregatedValue.get("avg_temp") * count + temp) / (count + 1),
                "count": count + 1
              }
      - type: toStream
      - type: convertKey
        into: json:windowed(string)
    to: room_temperature_stats

Testing and Validation

To test your IoT data processing pipeline:

  1. Generate sample IoT data using a simulator or replay historical data
  2. Deploy your KSML application using the proper configuration
  3. Monitor the output topics to verify correct processing
  4. Use visualization tools to display the processed data

The following producer pipeline can serve as a starting point to generate sample data. This pipeline wll produce sample measurements from three separate rooms with two sensors each, containing randomized data. Occasionally some outlier values are generated so that the alerts will be visible.

IoT sample data generator (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

streams:
  sensor_readings:
    topic: iot_sensor_readings
    keyType: string  # device_id
    valueType: json  # sensor data

functions:
  generate_sensor_reading:
    type: generic
    resultType: boolean
    expression: False
    globalCode: |
      import random
      import time

      # Static data
      rooms = ["conference-a", "conference-b", "conference-c"]
      device_types = ["temperature_sensor", "energy_meter"]

      # Counter for generating unique device IDs
      global counter
      counter = 0

      def generate_reading():
        global counter
        counter += 1

        # Select room and device type
        room = rooms[counter % len(rooms)]
        device_type = device_types[counter % len(device_types)]
        device_id = f"{device_type}-{room}"

        # Generate random readings, generate extreme temperature every 50 readings
        if counter % 50 == 0:
          temperature = random.choice([4.0, 36.0])
        else:
          temperature = round(random.uniform(19.0, 24.0), 1)

        humidity = round(random.uniform(40.0, 70.0), 1)
        energy = random.randint(50, 150)

        # Create sensor reading
        reading = {
          "device_id": device_id,
          "timestamp": int(time.time() * 1000),
          "device_type": device_type,
          "location": {
            "building": "headquarters",
            "floor": 3,
            "room": room,
            "coordinates": {
              "lat": 37.7749,
              "lng": -122.4194
            }
          },
          "readings": {
            "temperature": temperature,
            "humidity": humidity,
            "energy": energy
          },
          "battery_level": random.randint(17, 100),
          "status": "active"
        }

        return reading

producers:
  iot_sensor_producer:
    to: sensor_readings
    interval: 1000  # Generate every second
    generator:
      code: |
        reading = generate_reading()
      expression: '(reading.get("device_id"), reading)'
      resultType: (string, json)

Production Considerations

When deploying IoT data processing pipelines to production:

  1. Scalability: Ensure your Kafka cluster can handle the volume of IoT data
  2. Fault Tolerance: Configure proper replication and error handling
  3. Data Retention: Set appropriate retention policies for raw and processed data
  4. Security: Implement device authentication and data encryption
  5. Monitoring: Set up alerts for anomalies in both the data and the processing pipeline
  6. Edge Deployment: Consider deploying KSML at the edge for preprocessing

Conclusion

KSML provides a powerful and flexible way to process IoT data streams. By combining real-time processing with state management and windowed operations, you can build sophisticated IoT applications that derive valuable insights from your device data.

For more advanced IoT scenarios, explore: