Skip to content

Quick Start with KSML

Get KSML running in 5 minutes! This guide shows you how to create your first real-time analytics application using only YAML - no Java required. You'll build an IoT analytics system that processes sensor data in real-time.

What We'll Build

A real-time device health monitoring system that:

  • Filters offline devices from processing
  • Analyzes device health from battery, signal, and error metrics
  • Categorizes devices as "healthy" or "needs attention"
  • Alerts on specific issues like low battery or poor signal

All with just YAML configuration and clear Python logic.

Prerequisites

You'll need:

Choose Your Setup Method

Option A: Quick Start (Recommended) Download the pre-configured setup and run immediately:

  1. Download and extract: local-docker-compose-setup-quick-start.zip
  2. Navigate to the extracted folder
  3. Run docker compose up -d
  4. Skip to Step 6: See It In Action!

Option B: Step-by-Step Setup Follow the detailed instructions below to create everything from scratch.


Step 1: Set Up Your Environment

Note: Skip this step if you chose Option A above.

  • Create a new directory for your KSML project:
mkdir my-ksml-project
cd my-ksml-project
mkdir examples
  • Create a docker-compose.yml file:
Docker Compose Configuration (click to expand)
networks:
  ksml:
    driver: bridge

services:
  broker:
    image: bitnami/kafka:3.8.0
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
    networks:
      - ksml
    restart: always
    environment:
      KAFKA_CFG_PROCESS_ROLES: 'controller,broker'
      KAFKA_CFG_BROKER_ID: 0
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: '0@broker:9090'
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CFG_ADVERTISED_LISTENERS: 'INNER://broker:9093,OUTER://localhost:9092'
      KAFKA_CFG_LISTENERS: 'INNER://broker:9093,OUTER://broker:9092,CONTROLLER://broker:9090'
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'INNER:PLAINTEXT,OUTER:PLAINTEXT,CONTROLLER:PLAINTEXT'
      KAFKA_CFG_LOG_CLEANUP_POLICY: delete
      KAFKA_CFG_LOG_RETENTION_MINUTES: 10070
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INNER
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_CFG_MIN_INSYNC_REPLICAS: 1
      KAFKA_CFG_NUM_PARTITIONS: 1
    # If the kafka topics can list data, the broker is healthy
    healthcheck:
      test: kafka-topics.sh --bootstrap-server broker:9093 --list
      interval: 5s
      timeout: 10s
      retries: 10
      start_period: 5s

  ksml:
    image: registry.axual.io/opensource/images/axual/ksml:snapshot
    networks:
      - ksml
    container_name: ksml
    working_dir: /ksml
    volumes:
      - ./examples:/ksml
    depends_on:
      broker:
        condition: service_healthy
      kafka-setup:
        condition: service_completed_successfully

  kafka-ui:
    image: quay.io/cloudhut/kowl:master
    platform: linux/amd64
    container_name: kowl
    restart: always
    ports:
      - 8080:8080
    volumes:
      - ./kowl-ui-config.yaml:/config/kowl-ui-config.yaml
    environment:
      CONFIG_FILEPATH:  "/config/kowl-ui-config.yaml"
    depends_on:
      broker:
        condition: service_healthy
    networks:
      - ksml

  # This "container" is a workaround to pre-create topics, because setting KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE to true results in Kafka Streams errors
  kafka-setup:
    image: bitnami/kafka:3.8.0
    hostname: kafka-setup
    networks:
      - ksml
    depends_on:
      broker:
        condition: service_healthy
    restart: on-failure
    command: "bash -c 'echo Creating tutorial topics... && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic tutorial_input && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temperature_data && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temperature_data_converted && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic iot_sensor_data && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_health_alerts'"
  • Create kowl-ui-config.yaml for Kafka UI:
Kafka UI Configuration (click to expand)
server:
  listenPort: 8080
  listenAddress: 0.0.0.0

kafka:
  brokers:
    - broker:9093

Step 2: Start Docker Services

Note: Skip this step if you chose Option A above.

docker compose up -d

This starts:

  • Kafka broker (port 9092)
  • KSML runner for your stream processing
  • Kowl (Kafka UI) for monitoring (port 8080)
  • Automatic topic creation

Step 3: Verify Everything Started

docker compose ps

You should see:

  • Kafka broker and Kafka UI are running
  • Topic setup container has exited successfully (this creates the required topics)
  • KSML runner will have exited (missing config - we'll fix this next)

Check the Kafka UI at http://localhost:8080 to see your topics.

Step 4: Create Your First KSML Application

Note: If you chose Option A (zip download), these files already exist - you can skip to Step 5.

Now let's create a smart device health monitoring application!

In the examples/ directory, create iot-analytics.yaml:

KSML Device Health Monitoring processing definition (click to expand)

This is a definition for a demo KSML application showing device health monitoring. It is not required at this point to understand this YAML & Python syntax, we will explain what it does and how it works later.

functions:
  add_health_status:
    type: valueTransformer
    code: |
      # Get device metrics
      battery = value.get('battery_percent', 100)
      signal = value.get('signal_strength', 100)
      errors = value.get('error_count', 0)

      # Check each condition separately for clarity
      has_low_battery = battery < 20
      has_poor_signal = signal < 50  
      has_high_errors = errors > 5

      # Determine if device needs attention
      if has_low_battery or has_poor_signal or has_high_errors:
        health_status = "needs_attention"

        # Determine the specific reason
        if has_low_battery:
          reason = "low_battery"
        elif has_poor_signal:
          reason = "poor_signal"
        else:
          reason = "high_errors"
      else:
        health_status = "healthy"
        reason = "ok"

      return {
        "device_id": key,
        "battery_percent": battery,
        "signal_strength": signal, 
        "error_count": errors,
        "health_status": health_status,
        "issue_reason": reason
      }
    resultType: json

pipelines:
  device_health_monitor:
    from:
      topic: iot_sensor_data
      keyType: string
      valueType: json
      offsetResetPolicy: latest
    via:
      # Step 1: Only process online devices
      - type: filter
        if:
          expression: value is not None and value.get('status') == 'online'

      # Step 2: Add health status
      - type: transformValue
        mapper: add_health_status

      # Step 3: Log device health
      - type: peek
        forEach:
          code: |
            device = key
            status = value.get('health_status')
            reason = value.get('issue_reason')
            battery = value.get('battery_percent')

            if status == "healthy":
              print(f"{device}: Healthy (Battery: {battery}%)")
            else:
              print(f"{device}: Needs attention - {reason}")
    to:
      topic: device_health_alerts
      keyType: string
      valueType: json

Now create the KSML runner configuration file ksml-runner.yaml:

KSML Runner Configuration (click to expand)
kafka:
  bootstrap.servers: broker:9093
  application.id: io.ksml.example.producer
  security.protocol: PLAINTEXT
  acks: all

ksml:
  definitions:
    iot: iot-analytics.yaml

Step 5: Start Your Application

Restart the KSML runner:

docker compose restart ksml

Check the logs to see when KSML is ready to receive messages:

docker compose logs ksml -f

KSML is ready to receive messages when you see a message:

Pipeline processing state change. Moving from old state 'REBALANCING' to new state 'RUNNING'

Step 6: See It In Action!

Send some test data to see your pipeline work:

docker compose exec broker kafka-console-producer.sh --bootstrap-server broker:9093 --topic iot_sensor_data --property "parse.key=true" --property "key.separator=:"

Paste these device health metrics (press Enter after each):

sensor-001:{"battery_percent":15,"signal_strength":80,"error_count":1,"status":"online"}
sensor-002:{"battery_percent":75,"signal_strength":30,"error_count":2,"status":"online"}
sensor-003:{"battery_percent":90,"signal_strength":85,"error_count":0,"status":"online"}
sensor-004:{"battery_percent":60,"signal_strength":70,"error_count":8,"status":"online"}
sensor-005:{"battery_percent":50,"signal_strength":60,"error_count":1,"status":"offline"}

Press Ctrl+C to exit.

What Just Happened?

Your KSML pipeline performed real-time device health monitoring in just a few lines of YAML:

  1. Filtered offline devices (only processes devices with status "online")
  2. Analyzed device health from battery, signal strength, and error metrics
  3. Categorized devices as "healthy" or "needs attention"
  4. Identified specific issues (low battery, poor signal, high errors)

Check the KSML logs to see the real-time analysis:

docker compose logs ksml -f

You'll see device health reports like:

sensor-003: Healthy (Battery: 90%)
sensor-001: Needs attention - low_battery
sensor-002: Needs attention - poor_signal
sensor-004: Needs attention - high_errors

This is real-time device monitoring! Each device status is instantly analyzed and categorized as data flows through the pipeline.

Example messages

INPUT (to iot_sensor_data topic):

  • key: sensor-001
  • value:
    {
      "battery_percent": 15,
      "signal_strength": 80,
      "error_count": 1,
      "status": "online"
    }
    

OUTPUT (to device_health_alerts topic):

  • key: sensor-001
  • value:
    {
      "device_id": "sensor-001",
      "battery_percent": 15,
      "signal_strength": 80,
      "error_count": 1,
      "health_status": "needs_attention",
      "issue_reason": "low_battery"
    }
    

Open http://localhost:8080 to explore your topics and see the transformed data!

Congratulations!

You just built a real-time device health monitoring system with:

  • Smart device analysis with battery, signal, and error monitoring
  • Real-time filtering and categorization of device health status
  • No compilation or complex infrastructure needed

What Makes KSML Powerful?

Traditional stream processing requires:

  • Java/Scala expertise for Kafka Streams
  • Complex filtering and transformation code
  • Build and deployment pipelines

With KSML you get:

  • Simple YAML configuration for data processing
  • Built-in filtering and transformation operations
  • Optional Python for custom business logic
  • Instant deployment with containers

Next Steps

Ready to learn more?

  1. Schema Validation - Set up IDE validation for error-free development
  2. Understanding KSML - Learn the concepts
  3. KSML Basics Tutorial - Build more advanced pipelines
  4. Examples Library - More patterns

Need help? Check our Troubleshooting Guide