Real-Time Analytics with KSML
This tutorial demonstrates how to build a real-time analytics application using KSML. You'll learn how to process streaming data, calculate metrics in real-time, and visualize the results.
Introduction
Real-time analytics is one of the most common use cases for stream processing. By analyzing data as it arrives, you can:
- Detect trends and patterns as they emerge
- Respond quickly to changing conditions
- Make data-driven decisions with minimal latency
- Provide up-to-date dashboards and visualizations
In this tutorial, we'll build a real-time analytics pipeline that processes a stream of e-commerce transactions, calculates various metrics, and makes the results available for visualization.
Prerequisites
Before starting this tutorial, you should:
- Understand basic KSML concepts (streams, functions, pipelines)
- Have completed the KSML Basics Tutorial
- Be familiar with Aggregations
- Have a basic understanding of Windowed Operations
The Use Case
Imagine you're running an e-commerce platform and want to analyze transaction data in real-time. You want to track:
- Total sales by product category
- Average order value
- Transaction volume by region
- Conversion rates from different marketing channels
Define the topics for the use case
In earlier tutorials, you created a Docker Compose file with all the necessary containers. For this use case guide, some other topics
are needed.
To have these created, open the docker-compose.yml
in the examples directory, and find the definitions for the kafka-setup
container
which creates the topics.
Change the definition so that the startup command for the setup container (the command
section) looks like the following:
command
section for the kafka-setup container (click to expand)
command: "bash -c 'echo Creating topics... && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ecommerce_transactions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sales_by_category && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic avg_order_value && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic transactions_by_region'"
Defining the Data Model
Our transaction data will have the following structure:
{
"transaction_id": "12345",
"timestamp": 1625097600000,
"customer_id": "cust-789",
"product_id": "prod-456",
"product_category": "electronics",
"quantity": 1,
"price": 499.99,
"region": "north_america",
"marketing_channel": "social_media"
}
Creating the KSML Definition
Now, let's create our KSML definition file:
Real-time analytics processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
streams:
transactions:
topic: ecommerce_transactions
keyType: string # transaction_id
valueType: json # transaction data
sales_by_category:
topic: sales_by_category
keyType: string # product_category
valueType: json # aggregated sales data
avg_order_value:
topic: avg_order_value
keyType: windowed(string) # time window
valueType: json # average order value
transactions_by_region:
topic: transactions_by_region
keyType: string # region
valueType: string # transaction count
functions:
calculate_total:
type: aggregator
resultType: json
code: |
if aggregatedValue is None:
return {"total_sales": value.get("price") * value.get("quantity"), "count": 1}
else:
return {
"total_sales": aggregatedValue.get("total_sales") + (value.get("price") * value.get("quantity")),
"count": aggregatedValue.get("count") + 1
}
calculate_window_total:
type: aggregator
resultType: json
code: |
if aggregate is None:
return {"total_sales": value.get("price") * value.get("quantity"), "count": 1}
else:
return {
"total_sales": aggregate.get("total_sales") + (value.get("price") * value.get("quantity")),
"count": aggregate.get("count") + 1
}
pipelines:
# Pipeline for sales by category
sales_by_category_pipeline:
from: transactions
via:
- type: selectKey
mapper:
expression: value.get("product_category")
- type: groupByKey
- type: aggregate
initializer:
expression: '{"total_sales": 0, "count": 0}'
resultType: json
aggregator: calculate_total
store:
name: category_store
type: keyValue
keyType: string
valueType: json
- type: toStream
to: sales_by_category
# Pipeline for average order value (windowed)
avg_order_value_pipeline:
from: transactions
via:
- type: groupBy
mapper:
expression: '"all"' # Use a constant key to aggregate all transactions
resultType: string
- type: windowByTime
windowType: tumbling
duration: 10s
- type: aggregate
store:
name: avg_order_store_10s
type: window
keyType: string
valueType: json
windowSize: 10s
retention: 30s
initializer:
expression: '{"total_sales": 0.0, "count": 0}'
resultType: json
aggregator:
expression: '{"total_sales": aggregatedValue.get("total_sales") + (value.get("price") * value.get("quantity")), "count": aggregatedValue.get("count") + 1}'
resultType: json
- type: suppress
until: windowCloses
- type: toStream
- type: convertKey
into: json:windowed(string)
- type: mapValues
mapper:
expression: '{"avg_order_value": value.get("total_sales") / value.get("count") if value and value.get("count") and value.get("count") > 0 else 0}'
resultType: json
to: avg_order_value
# Pipeline for transactions by region
transactions_by_region_pipeline:
from: transactions
via:
- type: selectKey
mapper:
expression: value.get("region")
- type: groupByKey
- type: count
- type: toStream
- type: convertValue
into: string
to: transactions_by_region
Running the Application
You can use the following producer pipeline example as a starting point to write some simulated sale data; it will write four hard coded sales records into the input stream and exit.
Transaction data producer (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
streams:
transactions:
topic: ecommerce_transactions
keyType: string # transaction_id
valueType: json # transaction data
# set up global data and a function that will loop over it once
functions:
generate_transactions:
type: generic
resultType: boolean
expression: False
globalCode: |
global count
count = 0
global done
done = False
global orders
orders = [
{
"transaction_id": "12345",
"timestamp": 1625097600000,
"customer_id": "cust-789",
"product_id": "prod-456",
"product_category": "electronics",
"quantity": 1,
"price": 499.99,
"region": "north_america",
"marketing_channel": "social_media"
},
{
"transaction_id": "12346",
"timestamp": 1625097606000,
"customer_id": "cust-788",
"product_id": "prod-457",
"product_category": "gardening",
"quantity": 1,
"price": 229.99,
"region": "europe",
"marketing_channel": "social_media"
},
{
"transaction_id": "12349",
"timestamp": 1625097606000,
"customer_id": "cust-788",
"product_id": "prod-457",
"product_category": "electronics",
"quantity": 1,
"price": 229.99,
"region": "europe",
"marketing_channel": "social_media"
},
{
"transaction_id": "12347",
"timestamp": 1625097612000,
"customer_id": "cust-789",
"product_id": "prod-458",
"product_category": "gardening",
"quantity": 1,
"price": 12.99,
"region": "north_america",
"marketing_channel": "social_media"
}
]
nr_orders = len(orders)
def nextOrder():
global count
global done
result = orders[count]
count = count + 1
if count >= len(orders):
count = 0
done = True # loop once over the data
return result
producers:
produce_order:
to: transactions
interval: 7000 # Emit every 7 seconds to ensure windows close
until:
expression:
done
generator:
code: |
order = nextOrder()
expression: '(str(count), order)'
resultType: (string, json)
To run the application:
- Save the KSML definition to
definitions/analytics.yaml
- Add the
example-data-producer.yaml
above if desired, or create test data in some other way - Create a configuration file at
config/application.properties
with your Kafka connection details - Start the Docker Compose environment:
docker-compose up -d
- Monitor the output topics to see the real-time analytics results
Visualizing the Results
Setting up visualizations is outside the scope of this tutorial. You can connect the output topics to visualization tools like:
- Grafana
- Kibana
- Custom dashboards using web frameworks
For example, to create a simple dashboard with Grafana:
- Set up Grafana to connect to your Kafka topics
- Create dashboards for each metric:
- Bar chart for sales by category
- Line chart for average order value over time
- Map visualization for transactions by region
- Gauge charts for conversion rates
Extending the Application
You can extend this application in several ways:
- Add anomaly detection to identify unusual patterns
- Implement trend analysis to detect changing consumer behavior
- Create alerts for specific conditions (e.g., sales dropping below a threshold)
- Enrich the data with additional information from reference data sources
Conclusion
In this tutorial, you've learned how to:
- Process streaming transaction data in real-time
- Calculate various business metrics using KSML
- Structure a real-time analytics application
- Prepare the results for visualization
Real-time analytics is a powerful use case for KSML, allowing you to gain immediate insights from your streaming data without complex coding.
Next Steps
- Learn about Data Transformation for more complex processing
- Explore Event-Driven Applications to trigger actions based on analytics
- Check out Performance Optimization for handling high-volume data