Quick Start with KSML
Get KSML running in 5 minutes! This guide shows you how to create your first real-time analytics application using only YAML - no Java required. You'll build an IoT analytics system that processes sensor data in real-time.
What We'll Build
A real-time device health monitoring system that:
- Filters offline devices from processing
- Analyzes device health from battery, signal, and error metrics
- Categorizes devices as "healthy" or "needs attention"
- Alerts on specific issues like low battery or poor signal
All with just YAML configuration and clear Python logic.
Prerequisites
You'll need:
- Docker Compose installed (installation guide)
- 5 minutes of your time
Choose Your Setup Method
Option A: Quick Start (Recommended) Download the pre-configured setup and run immediately:
- Download and extract: local-docker-compose-setup-quick-start.zip
- Navigate to the extracted folder
- Run
docker compose up -d
- Skip to Step 6: See It In Action!
Option B: Step-by-Step Setup Follow the detailed instructions below to create everything from scratch.
Step 1: Set Up Your Environment
Note: Skip this step if you chose Option A above.
- Create a new directory for your KSML project:
- Create a
docker-compose.yml
file:
Docker Compose Configuration (click to expand)
networks:
ksml:
driver: bridge
services:
broker:
image: bitnami/kafka:3.8.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
networks:
- ksml
restart: always
environment:
KAFKA_CFG_PROCESS_ROLES: 'controller,broker'
KAFKA_CFG_BROKER_ID: 0
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: '0@broker:9090'
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CFG_ADVERTISED_LISTENERS: 'INNER://broker:9093,OUTER://localhost:9092'
KAFKA_CFG_LISTENERS: 'INNER://broker:9093,OUTER://broker:9092,CONTROLLER://broker:9090'
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'INNER:PLAINTEXT,OUTER:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_CFG_LOG_CLEANUP_POLICY: delete
KAFKA_CFG_LOG_RETENTION_MINUTES: 10070
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INNER
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_CFG_MIN_INSYNC_REPLICAS: 1
KAFKA_CFG_NUM_PARTITIONS: 1
# If the kafka topics can list data, the broker is healthy
healthcheck:
test: kafka-topics.sh --bootstrap-server broker:9093 --list
interval: 5s
timeout: 10s
retries: 10
start_period: 5s
ksml:
image: registry.axual.io/opensource/images/axual/ksml:snapshot
networks:
- ksml
container_name: ksml
working_dir: /ksml
volumes:
- ./examples:/ksml
depends_on:
broker:
condition: service_healthy
kafka-setup:
condition: service_completed_successfully
kafka-ui:
image: quay.io/cloudhut/kowl:master
platform: linux/amd64
container_name: kowl
restart: always
ports:
- 8080:8080
volumes:
- ./kowl-ui-config.yaml:/config/kowl-ui-config.yaml
environment:
CONFIG_FILEPATH: "/config/kowl-ui-config.yaml"
depends_on:
broker:
condition: service_healthy
networks:
- ksml
# This "container" is a workaround to pre-create topics, because setting KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE to true results in Kafka Streams errors
kafka-setup:
image: bitnami/kafka:3.8.0
hostname: kafka-setup
networks:
- ksml
depends_on:
broker:
condition: service_healthy
restart: on-failure
command: "bash -c 'echo Creating tutorial topics... && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic tutorial_input && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temperature_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temperature_data_converted && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic iot_sensor_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_health_alerts'"
- Create
kowl-ui-config.yaml
for Kafka UI:
Kafka UI Configuration (click to expand)
Step 2: Start Docker Services
Note: Skip this step if you chose Option A above.
This starts:
- Kafka broker (port 9092)
- KSML runner for your stream processing
- Kowl (Kafka UI) for monitoring (port 8080)
- Automatic topic creation
Step 3: Verify Everything Started
You should see:
- Kafka broker and Kafka UI are running
- Topic setup container has exited successfully (this creates the required topics)
- KSML runner will have exited (missing config - we'll fix this next)
Check the Kafka UI at http://localhost:8080 to see your topics.
Step 4: Create Your First KSML Application
Note: If you chose Option A (zip download), these files already exist - you can skip to Step 5.
Now let's create a smart device health monitoring application!
In the examples/
directory, create iot-analytics.yaml
:
KSML Device Health Monitoring processing definition (click to expand)
This is a definition for a demo KSML application showing device health monitoring. It is not required at this point to understand this YAML & Python syntax, we will explain what it does and how it works later.
functions:
add_health_status:
type: valueTransformer
code: |
# Get device metrics
battery = value.get('battery_percent', 100)
signal = value.get('signal_strength', 100)
errors = value.get('error_count', 0)
# Check each condition separately for clarity
has_low_battery = battery < 20
has_poor_signal = signal < 50
has_high_errors = errors > 5
# Determine if device needs attention
if has_low_battery or has_poor_signal or has_high_errors:
health_status = "needs_attention"
# Determine the specific reason
if has_low_battery:
reason = "low_battery"
elif has_poor_signal:
reason = "poor_signal"
else:
reason = "high_errors"
else:
health_status = "healthy"
reason = "ok"
return {
"device_id": key,
"battery_percent": battery,
"signal_strength": signal,
"error_count": errors,
"health_status": health_status,
"issue_reason": reason
}
resultType: json
pipelines:
device_health_monitor:
from:
topic: iot_sensor_data
keyType: string
valueType: json
offsetResetPolicy: latest
via:
# Step 1: Only process online devices
- type: filter
if:
expression: value is not None and value.get('status') == 'online'
# Step 2: Add health status
- type: transformValue
mapper: add_health_status
# Step 3: Log device health
- type: peek
forEach:
code: |
device = key
status = value.get('health_status')
reason = value.get('issue_reason')
battery = value.get('battery_percent')
if status == "healthy":
print(f"{device}: Healthy (Battery: {battery}%)")
else:
print(f"{device}: Needs attention - {reason}")
to:
topic: device_health_alerts
keyType: string
valueType: json
Now create the KSML runner configuration file ksml-runner.yaml
:
KSML Runner Configuration (click to expand)
Step 5: Start Your Application
Restart the KSML runner:
Check the logs to see when KSML is ready to receive messages:
KSML is ready to receive messages when you see a message:
Step 6: See It In Action!
Send some test data to see your pipeline work:
docker compose exec broker kafka-console-producer.sh --bootstrap-server broker:9093 --topic iot_sensor_data --property "parse.key=true" --property "key.separator=:"
Paste these device health metrics (press Enter after each):
sensor-001:{"battery_percent":15,"signal_strength":80,"error_count":1,"status":"online"}
sensor-002:{"battery_percent":75,"signal_strength":30,"error_count":2,"status":"online"}
sensor-003:{"battery_percent":90,"signal_strength":85,"error_count":0,"status":"online"}
sensor-004:{"battery_percent":60,"signal_strength":70,"error_count":8,"status":"online"}
sensor-005:{"battery_percent":50,"signal_strength":60,"error_count":1,"status":"offline"}
Press Ctrl+C
to exit.
What Just Happened?
Your KSML pipeline performed real-time device health monitoring in just a few lines of YAML:
- Filtered offline devices (only processes devices with status "online")
- Analyzed device health from battery, signal strength, and error metrics
- Categorized devices as "healthy" or "needs attention"
- Identified specific issues (low battery, poor signal, high errors)
Check the KSML logs to see the real-time analysis:
You'll see device health reports like:
sensor-003: Healthy (Battery: 90%)
sensor-001: Needs attention - low_battery
sensor-002: Needs attention - poor_signal
sensor-004: Needs attention - high_errors
This is real-time device monitoring! Each device status is instantly analyzed and categorized as data flows through the pipeline.
Example messages
INPUT (to iot_sensor_data
topic):
- key:
sensor-001
- value:
OUTPUT (to device_health_alerts
topic):
- key:
sensor-001
- value:
Open http://localhost:8080 to explore your topics and see the transformed data!
Congratulations!
You just built a real-time device health monitoring system with:
- Smart device analysis with battery, signal, and error monitoring
- Real-time filtering and categorization of device health status
- No compilation or complex infrastructure needed
What Makes KSML Powerful?
Traditional stream processing requires:
- Java/Scala expertise for Kafka Streams
- Complex filtering and transformation code
- Build and deployment pipelines
With KSML you get:
- Simple YAML configuration for data processing
- Built-in filtering and transformation operations
- Optional Python for custom business logic
- Instant deployment with containers
Next Steps
Ready to learn more?
- Schema Validation - Set up IDE validation for error-free development
- Understanding KSML - Learn the concepts
- KSML Basics Tutorial - Build more advanced pipelines
- Examples Library - More patterns
Need help? Check our Troubleshooting Guide