KSML Basics Tutorial
This tutorial will guide you through creating your first KSML data pipeline. By the end, you'll understand the basic components of KSML and how to create a simple but functional data processing application.
What You'll Build
In this tutorial, you'll build a simple data pipeline that:
- Reads temperature sensor data from a Kafka topic
- Filters out readings below a certain threshold
- Transforms the data by converting Fahrenheit to Celsius
- Writes the processed data to another Kafka topic
- Logs information about the processed messages
Here's a visual representation of what we'll build:
Prerequisites
Before you begin, make sure you have:
- Completed the Installation and Setup guide with Docker Compose running
- Basic understanding of YAML syntax
- Your KSML environment running (
docker compose ps
should show all services as "Up")
Choose Your Setup Method
Option A: Quick Start (Recommended) Download the pre-configured tutorial files and start immediately:
- Download and extract: local-docker-compose-setup-basics-tutorial.zip
- Navigate to the extracted folder
- Run
docker compose up -d
(if not already running) - Skip to Step 5: Run Your Pipeline
Option B: Step-by-Step Tutorial Use the docker-compose.yml from the Quick Start Tutorial and manually create/modify the producer and processor definitions as described below to learn each component step by step.
Understanding the KSML File Structure
A KSML definition file consists of three main sections:
- Streams: Define the input and output Kafka topics
- Functions: Define reusable code snippets
- Pipelines: Define the data processing flow
Let's create each section step by step.
Step 1: Define Your Streams
Note: Skip Steps 1-4 if you chose Option A above.
First, let's create a new file tutorial.yaml
and start by defining the input and output streams for our pipeline:
Input and output streams for our pipeline (click to expand)
This defines:
- An input stream reading from the
temperature_data
topic with string keys and JSON values - An output stream writing to the
temperature_data_converted
topic with the same data types
Understanding Stream Definitions
Each KSML stream has:
- A unique name (e.g.,
input_stream
) - The Kafka topic it connects to
- The data types for keys and values
KSML supports various data types and notations including:
string
: For text datajson
: For JSON-formatted dataavro
: For Avro-formatted data (requires schema)binary
: For raw binary data- And more
Step 2: Create a Simple Function
Next, let's add functions to filter, transform and log messages as they flow through our pipeline:
Functions to filter, transform and log messages to filter (click to expand)
functions:
temperature_above_threshold:
type: predicate
expression: value.get('temperature', 0) > 70
fahrenheit_to_celsius:
type: valueTransformer
expression: |
{"sensor": key, "temp_fahrenheit": value.get('temperature'), "temp_celsius": (value.get('temperature') - 32) * 5/9}
resultType: struct
log_message:
type: forEach
parameters:
- name: prefix
type: string
code: |
msg = prefix + " message" if isinstance(prefix, str) and prefix != "" else "Message"
log.info("{}: key={}, value={}", msg, key, value)
We defined three uniquely named functions:
temperature_above_threshold
function:
- Is of type
predicate
, which means it always gets akey
andvalue
as its parameters and needs to return aboolean
value - Uses the
expression
tag to return aTrue
if thetemperature
field in the value (zero if it does not exist) is above 70,False
otherwise.
fahrenheit_to_celsius
function:
- Is of type
valueTransformer
, which means it always gets two parameterskey
andvalue
, and returns a (modified, or transformed) value for the next processing step - Uses the
expression
tag to define the value to return, in this case renamingtemperature
totemp_fahrenheit
and adding a field calledtemp_celsius
log_message
function:
- Is of type
forEach
, which means it always gets two parameterskey
andvalue
, and does not return a value - Takes an extra parameter
prefix
of typestring
- Checks if the
prefix
variable is of the correct type, and not empty, and if so, uses it to compose a description for the log message - Uses the built-in
log
object to output information atinfo
level
Understanding Functions in KSML
Functions in KSML:
- Can be reused across multiple operations in your pipelines
- Are written in Python
- Have access to pre-defined parameters based on function type
- Can take additional parameters for more flexibility
- Must return a value if required by the function type
Step 3: Build Your Pipeline
Now, let's add the pipeline that processes our data:
Adding Pipelines (click to expand)
pipelines:
tutorial_pipeline:
from: input_stream
via:
- type: filter
if: temperature_above_threshold
- type: transformValue
mapper: fahrenheit_to_celsius
- type: peek
forEach:
code: |
# Manually call log_message to pass in additional parameters
log_message(key, value, prefix="Processed")
to: output_stream
This pipeline:
- Reads from
input_stream
- Filters out messages where the temperature is 70°F or lower
- Transforms the values to include both Fahrenheit and Celsius temperatures
- Logs each processed message
- Writes the results to
output_stream
Understanding Pipeline Operations
Let's break down each operation:
Filter Operation
The filter operation:
- Evaluates the expression for each message
- Only passes messages where the expression returns
True
- Discards messages where the expression returns
False
Transform Value Operation
Adding Transforming Value Operation (click to expand)
The transformValue operation:
- Transforms the value of each message
- Keeps the original key unchanged
- Creates a new value based on the expression
- In this case, creates a new JSON object with the original temperature and a calculated Celsius value
Note that we put the expression on a new line in this example to force the KSML YAML parser to interpret the expression as a literal string for Python, instead of parsing it as part of the YAML syntax. Another way of achieving the same would be to surround the '{...}' with quotes, but in that case, be aware of consistent single/double quoting to not confuse the KSML parser and/or the Python interpreter. We generally recommend using the above notation for readability purposes.
Peek Operation
Adding Peek Operation (click to expand)
The peek operation:
- Executes the provided code for each message
- Doesn't modify the message
- Allows the message to continue through the pipeline
- Is useful for logging, metrics, or other side effects
Step 4: Put It All Together
Let's combine all the sections into a complete KSML definition file. In the Quick Start guide you created a directory
structure containing an examples/
directory; in this directory create a file called tutorial.yaml
and copy
the following content:
Full KSML processing definition (click to expand)
streams:
input_stream:
topic: temperature_data
keyType: string
valueType: json
output_stream:
topic: temperature_data_converted
keyType: string
valueType: json
functions:
temperature_above_threshold:
type: predicate
expression: value.get('temperature', 0) > 70
fahrenheit_to_celsius:
type: valueTransformer
expression: |
{"sensor": key, "temp_fahrenheit": value.get('temperature'), "temp_celsius": (value.get('temperature') - 32) * 5/9}
resultType: struct
log_message:
type: forEach
parameters:
- name: prefix
type: string
code: |
msg = prefix + " message" if isinstance(prefix, str) and prefix != "" else "Message"
log.info("{}: key={}, value={}", msg, key, value)
pipelines:
tutorial_pipeline:
from: input_stream
via:
- type: filter
if: temperature_above_threshold
- type: transformValue
mapper: fahrenheit_to_celsius
- type: peek
forEach:
code: |
# Manually call log_message to pass in additional parameters
log_message(key, value, prefix="Processed")
to: output_stream
Save the file.
We also need to make the KSML Runner aware of the new pipeline. In the ksml-runner.yaml
you created before, there is
a section containing the definitions; modify this part so that it looks like this:
KSML Runner Configuration Update (click to expand)
You can either replace the line containing helloworld
or add the tutorial, in the latter case both pipelines will be run.
Step 5: Run Your Pipeline definition(s)
Now let's run the pipeline using our Docker Compose setup. If the compose was still running, you can just restart the KSML runner to make it aware of the new definition:
Or, if you stopped the setup, you can start the complete compose as before:
Check the logs to verify your pipeline(s) started:
Step 5.1: Test with Sample Data
Produce some test messages to the input topic:
docker compose exec broker kafka-console-producer.sh --bootstrap-server broker:9093 --topic temperature_data --property "parse.key=true" --property "key.separator=:"
Then enter messages in the format key:value
:
Press
Step 5.2: View the Results
Consume messages from the output topic to see the results:
docker compose exec broker kafka-console-consumer.sh --bootstrap-server broker:9093 --topic temperature_data_converted --from-beginning
You can also view the topics and messages in the Kafka UI at http://localhost:8080.
You should see messages like:
{"sensor":"sensor1",temp_fahrenheit":75,"temp_celsius":23.88888888888889}
{"sensor":"sensor3",temp_fahrenheit":80,"temp_celsius":26.666666666666668}
Notice that:
- The message with temperature 65°F was filtered out (below our 70°F threshold)
- The remaining messages have been transformed to include both Fahrenheit and Celsius temperatures
- You can see processing logs in the KSML container logs:
docker compose logs ksml
Understanding What's Happening
When you run your KSML definition:
- The KSML runner parses your
tutorial.yaml
definition - It creates a Kafka Streams topology based on your pipeline definition
- The topology starts consuming from the input topic
- Each message flows through the operations you defined:
- The filter operation drops messages with temperatures ≤ 70°F
- The mapValues operation transforms the remaining messages
- The peek operation logs each message
- The messages are written to the output topic
Using KSML to produce messages
While you can manually produce the above messages, KSML can also generate messages for you automatically.
Create a new file called producer.yaml
in your examples/
directory:
Producer Definition - producer.yaml (click to expand)
functions:
generate_temperature_message:
type: generator
globalCode: |
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor" + str(sensorCounter) # Simulate 10 sensors "sensor0" to "sensor9"
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
value = {"temperature": random.randrange(150)}
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Indicate the type of key and value
producers:
# Produce a temperature message every 3 seconds
temperature_producer:
generator: generate_temperature_message
interval: 3s
to:
topic: temperature_data
keyType: string
valueType: json
Now update your ksml-runner.yaml
to include the producer definition:
KSML Runner Configuration Update (click to expand)
Restart the KSML Runner to load the new producer:
You can check the runner logs (docker compose logs ksml
) or go to the Kafka UI at http://localhost:8080
to verify that new messages are generated every 3 seconds in the temperature_data
topic. The filtered and converted
messages will appear on the temperature_data_converted
topic.
Next Steps
Congratulations! You've built your first KSML data pipeline. Here are some ways to expand on what you've learned:
Try These Modifications:
- Add another filter condition (e.g., filter by sensor name)
- Add more fields to the transformed output
- Create a second pipeline that processes the data differently
Continue Your Learning Journey:
- Check out the beginner tutorials for more guided examples
- Dive into the reference documentation to learn about all available operations