Working with Aggregations in KSML
This tutorial explores how to compute statistics, summaries, and time-based analytics from streaming data using KSML's aggregation operations.
Introduction
Aggregations are stateful operations that combine multiple records into summary values. They are fundamental to stream processing, enabling real-time analytics from continuous data streams.
KSML aggregations(stateful processing capabilities) are built on top of Kafka Streams aggregation operations.
Prerequisites
Before starting this tutorial:
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic financial_transactions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic transaction_sums && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_amounts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_totals && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_stream && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_statistics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_actions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_action_counts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic retail_sales && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sales_by_region && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_temperatures && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_window_stats && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_refunds && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_bonuses && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_totals && \
Core Aggregation Concepts
Grouping Requirements
All aggregations require data to be grouped by key first:
# Group by existing key
- type: groupByKey
# Group by new key using mapper function
- type: groupBy
mapper:
expression: value.get("category")
resultType: string
Kafka Streams equivalents:
groupByKey
→KStream.groupByKey()
groupBy
→KStream.groupBy()
State Stores
Aggregations maintain state in local stores that are fault-tolerant through changelog topics. A changelog is a compacted Kafka topic that records every state change, allowing the state to be rebuilt if an instance fails or restarts. This provides exactly-once processing guarantees and enables automatic state recovery.
Key-Value Stores
Used for regular (non-windowed) aggregations:
store:
name: my_aggregate_store
type: keyValue
caching: true # Enable caching to reduce downstream updates
loggingDisabled: false # Keep changelog for fault tolerance (default: false)
persistent: true # Use RocksDB for persistence (default: true)
Window Stores
Required for windowed aggregations to store time-based state:
store:
name: my_window_store
type: window
windowSize: 1h # Must match the window duration
retention: 24h # How long to keep expired windows (>= windowSize + grace)
retainDuplicates: false # Keep only latest value per window (default: false)
caching: true # Enable caching for better performance
Important considerations:
windowSize
must match yourwindowByTime
durationretention
should be at leastwindowSize + grace period
to handle late-arriving data- Longer retention uses more disk space but allows querying historical windows
- Caching reduces the number of downstream updates and improves performance
Function Types in Aggregations
Initializer Function Creates the initial state value when a key is seen for the first time.
Aggregator Function Updates the aggregate state by combining the current aggregate with a new incoming value.
aggregator:
code: |
# aggregatedValue: current aggregate
# value: new record to add
result = {
"count": aggregatedValue["count"] + 1,
"sum": aggregatedValue["sum"] + value["amount"]
}
expression: result
resultType: json
Reducer Function (for reduce operations) Combines two values of the same type into a single value, used when no initialization is needed.
reducer:
code: |
# value1, value2: values to combine
combined = value1 + value2
expression: combined
resultType: long
Types of Aggregations in KSML
KSML supports several aggregation types, each with specific use cases:
1. Count
Counts the number of records per key.
Kafka Streams equivalent: KGroupedStream.count()
When to use:
- Tracking event frequencies
- Monitoring activity levels
- Simple counting metrics
2. Reduce
Combines values using a reducer function without an initial value.
Kafka Streams equivalent: KGroupedStream.reduce()
When to use:
- Summing values
- Finding min/max
- Combining values of the same type
3. Aggregate
Builds complex aggregations with custom initialization and aggregation logic.
Kafka Streams equivalent: KGroupedStream.aggregate()
When to use:
- Computing statistics (avg, stddev)
- Building complex state
- Transforming value types during aggregation
4. Cogroup
Aggregates multiple grouped streams together into a single result.
Kafka Streams equivalent: CogroupedKStream
When to use:
- Combining data from multiple sources
- Building unified aggregates from different streams
- Complex multi-stream analytics
Count Example
Simple counting of events per key:
User actions producer (click to expand)
functions:
generate_user_action:
type: generator
globalCode: |
import random
import time
user_counter = 0
users = ["alice", "bob", "charlie", "david", "eve", "frank", "grace", "henry"]
actions = ["login", "view", "click", "purchase", "logout", "search", "share", "comment"]
code: |
global user_counter, users, actions
# Cycle through users
user = users[user_counter % len(users)]
user_counter += 1
# Generate action event
value = {
"user_id": user,
"action": random.choice(actions),
"timestamp": int(time.time() * 1000)
}
# Use user_id as key
key = user
expression: (key, value)
resultType: (string, json)
producers:
user_action_producer:
generator: generate_user_action
interval: 1s
to:
topic: user_actions
keyType: string
valueType: json
Count user actions processor (click to expand)
streams:
user_actions:
topic: user_actions
keyType: string # user_id
valueType: json # action details
user_action_counts:
topic: user_action_counts
keyType: string
valueType: string
pipelines:
count_by_user:
from: user_actions
via:
- type: groupByKey
- type: count
- type: toStream
- type: convertValue
into: string
- type: peek
forEach:
code: |
log.info("User {} has performed {} actions", key, value)
to: user_action_counts
How Count Works
The count operation:
- Groups messages by key using
groupByKey
- Maintains a counter per unique key
- Increments the counter for each message
- Outputs the current count as a KTable
Reduce Example
The reduce
operation combines values without initialization, making it perfect for operations like summing, finding minimums/maximums, or concatenating strings. This section shows two approaches: a simple binary format for efficiency, and a JSON format for human readability.
Simple Reduce (Binary Format)
This example demonstrates the core reduce concept with minimal complexity, using binary long values for efficiency.
What it does:
- Generates transactions: Creates random transaction amounts as long values (cents)
- Groups by account: Groups transactions by account_id (the message key)
- Reduces values: Sums all transaction amounts using a simple reducer
- Outputs totals: Writes aggregated totals as long values
Key KSML concepts demonstrated:
groupByKey
for partitioning data by keyreduce
operation for stateful aggregation- Binary data types for processing efficiency
Simple producer (binary long values) - click to expand
functions:
generate_transaction:
type: generator
globalCode: |
import random
account_counter = 0
accounts = ["ACC001", "ACC002", "ACC003", "ACC004", "ACC005"]
code: |
global account_counter, accounts
# Cycle through accounts
account = accounts[account_counter % len(accounts)]
account_counter += 1
# Generate transaction amount (in cents)
amount = random.randint(100, 50000)
key = account
value = amount
expression: (key, value)
resultType: (string, long)
producers:
transaction_producer:
generator: generate_transaction
interval: 1s
to:
topic: financial_transactions
keyType: string
valueType: long
Simple processor (reduce only) - click to expand
streams:
financial_transactions:
topic: financial_transactions
keyType: string # account_id
valueType: long # transaction amount
transaction_sums:
topic: transaction_sums
keyType: string
valueType: long
functions:
sum_amounts:
type: reducer
expression: value1 + value2
resultType: long
pipelines:
sum_transactions:
from: financial_transactions
via:
- type: groupByKey
- type: reduce
reducer: sum_amounts
- type: toStream
- type: peek
forEach:
code: |
log.info("Account {} total: {}", key, value)
to: transaction_sums
Verifying the results:
Since binary data isn't human-readable in Kowl UI, use command-line tools to verify:
# Check current totals
kcat -b localhost:9092 -t transaction_sums -C -o end -c 5 -f 'Key: %k, Total: %s\n' -s value=Q
# Verify by summing all transactions for one account
kcat -b localhost:9092 -t financial_transactions -C -o beginning -f '%k,%s\n' -s value=Q -e | \
grep "ACC001" | cut -d',' -f2 | awk '{sum += $1} END {print "Sum:", sum}'
Note: Binary formats like
long
are common in production for performance and storage efficiency.
Human-Readable Reduce (JSON Format)
This example shows the same reduce logic but with JSON messages for better visibility in Kafka UI tools.
Additional concepts demonstrated:
transformValue
for data extraction and formatting- Type handling (JSON → long → JSON) for processing efficiency
- Human-readable message formats
JSON producer (human-readable) - click to expand
functions:
generate_transaction:
type: generator
globalCode: |
import random
import time
account_counter = 0
accounts = ["ACC001", "ACC002", "ACC003", "ACC004", "ACC005"]
code: |
global account_counter, accounts
# Cycle through accounts
account = accounts[account_counter % len(accounts)]
account_counter += 1
# Generate transaction amount (in cents to avoid float issues)
amount_cents = random.randint(100, 50000)
# Create human-readable JSON structure
value = {
"account_id": account,
"amount_cents": amount_cents,
"amount_dollars": round(amount_cents / 100.0, 2),
"transaction_id": f"TXN{account_counter:06d}",
"timestamp": int(time.time() * 1000)
}
# Use account_id as key
key = account
expression: (key, value)
resultType: (string, json)
producers:
transaction_producer:
generator: generate_transaction
interval: 1s
to:
topic: financial_transactions
keyType: string
valueType: json
JSON processor (with transformations) - click to expand
streams:
financial_transactions:
topic: financial_transactions
keyType: string # account_id
valueType: json # transaction details
transaction_sums:
topic: transaction_sums
keyType: string
valueType: json
functions:
extract_amount:
type: valueTransformer
code: |
# Extract amount_cents from JSON transaction
if value is not None:
amount = value.get("amount_cents", 0)
else:
amount = 0
expression: amount
resultType: long
sum_amounts:
type: reducer
code: |
# Sum two transaction amounts (in cents)
total = value1 + value2
expression: total
resultType: long
format_total:
type: valueTransformer
code: |
# Convert long cents to JSON format for human readability
if value is not None:
result = {
"total_cents": value,
"total_dollars": round(value / 100.0, 2)
}
else:
result = {
"total_cents": 0,
"total_dollars": 0.0
}
expression: result
resultType: json
pipelines:
sum_transactions:
from: financial_transactions
via:
- type: peek
forEach:
code: |
amount_dollars = value.get("amount_dollars", 0) if value else 0
txn_id = value.get("transaction_id", "N/A") if value else "N/A"
log.info("Processing transaction {} for account {}: ${}", txn_id, key, amount_dollars)
- type: transformValue
mapper: extract_amount
- type: groupByKey
- type: reduce
reducer: sum_amounts
- type: toStream
- type: transformValue
mapper: format_total
- type: peek
forEach:
code: |
total_dollars = value.get("total_dollars", 0) if value else 0
log.info("Account {} running total: ${}", key, total_dollars)
to: transaction_sums
Reduce vs Aggregate
Choose reduce when:
- Values are of the same type as the result
- No initialization is needed
- Simple combination logic (sum, min, max)
Choose aggregate when:
- Result type differs from input type
- Custom initialization is required
- Complex state management is needed
Aggregate Example
The aggregate
operation provides custom initialization and aggregation logic, making it perfect for building complex statistics or when input and output types differ. This section shows two approaches: a simple binary format for core concepts, and a JSON format for comprehensive statistics.
Simple Aggregate (Binary Format)
This example demonstrates the core aggregate concept with minimal complexity, using binary long values.
What it does:
- Initializes to zero: Starts aggregation with a zero value using simple expression
- Groups by customer: Groups payment amounts by customer_id (the message key)
- Sums amounts: Adds each payment amount to the running total
- Outputs totals: Writes aggregated totals as long values
Key KSML concepts demonstrated:
initializer
with simple expression (no custom function needed)aggregator
with simple arithmetic expression- Binary data types for processing efficiency
Simple producer (binary long values) - click to expand
functions:
generate_amount:
type: generator
globalCode: |
import random
customer_counter = 0
customers = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
code: |
global customer_counter, customers
# Cycle through customers
customer = customers[customer_counter % len(customers)]
customer_counter += 1
# Generate amount in cents as long
amount = random.randint(1000, 50000) # $10 to $500
key = customer
value = amount
expression: (key, value)
resultType: (string, long)
producers:
amount_producer:
generator: generate_amount
interval: 1s
to:
topic: payment_amounts
keyType: string
valueType: long
Simple processor (aggregate only) - click to expand
streams:
payment_amounts:
topic: payment_amounts
keyType: string # customer_id
valueType: long # amount in cents
payment_totals:
topic: payment_totals
keyType: string
valueType: long
pipelines:
calculate_totals:
from: payment_amounts
via:
- type: groupByKey
- type: aggregate
store:
type: keyValue
retention: 1h
initializer:
expression: 0
resultType: long
aggregator:
expression: aggregatedValue + value
- type: toStream
- type: peek
forEach:
code: |
log.info("Customer {} total: {}", key, value)
to: payment_totals
Verifying the results:
Since binary data isn't human-readable in Kowl UI, use command-line tools to verify:
# Check current totals (convert cents to dollars)
kcat -b localhost:9092 -t payment_totals -C -o end -c 5 -f 'Key: %k, Total cents: %s\n' -s value=Q
# Calculate expected total for one customer
kcat -b localhost:9092 -t payment_amounts -C -o beginning -f '%k,%s\n' -s value=Q -e | \
grep "CUST001" | cut -d',' -f2 | awk '{sum += $1} END {print "Expected:", sum}'
Complex Aggregate (JSON Format)
This example shows advanced aggregation with comprehensive statistics using JSON for human readability.
Additional concepts demonstrated:
- Custom
initializer
function for complex state initialization - Custom
aggregator
function for multi-field updates - JSON state management for rich aggregations
- Dynamic calculations (average) within aggregation logic
What it does:
- Initializes statistics: Creates a JSON structure to track multiple metrics (count, total, min, max, average)
- Groups by customer: Groups payment events by account_id (the message key)
- Updates statistics: For each payment, updates all metrics in the aggregated state
- Calculates average: Dynamically computes the average amount per customer
- Outputs comprehensive stats: Produces JSON messages with complete payment statistics
JSON payment events producer (click to expand)
functions:
generate_payment:
type: generator
globalCode: |
import random
import time
account_counter = 0
accounts = ["ACC001", "ACC002", "ACC003", "ACC004", "ACC005", "ACC006"]
code: |
global account_counter, accounts
# Cycle through accounts
account = accounts[account_counter % len(accounts)]
account_counter += 1
# Generate payment event
value = {
"account_id": account,
"amount": round(random.uniform(5.0, 500.0), 2),
"currency": "USD",
"timestamp": int(time.time() * 1000)
}
# Use account_id as key
key = account
expression: (key, value)
resultType: (string, json)
producers:
payment_producer:
generator: generate_payment
interval: 1s
to:
topic: payment_stream
keyType: string
valueType: json
JSON statistics processor (click to expand)
streams:
payment_stream:
topic: payment_stream
keyType: string # customer_id
valueType: json # payment details with amount field
payment_statistics:
topic: payment_statistics
keyType: string
valueType: json
functions:
init_stats:
type: initializer
code: |
# Initialize statistics
stats = {
"count": 0,
"total_amount": 0.0,
"min_amount": None,
"max_amount": None
}
expression: stats
resultType: json
update_stats:
type: aggregator
code: |
# Update payment statistics
if value and aggregatedValue:
amount = value.get("amount", 0.0)
aggregatedValue["count"] = aggregatedValue.get("count", 0) + 1
aggregatedValue["total_amount"] = aggregatedValue.get("total_amount", 0.0) + amount
current_min = aggregatedValue.get("min_amount")
current_max = aggregatedValue.get("max_amount")
if current_min is None or amount < current_min:
aggregatedValue["min_amount"] = amount
if current_max is None or amount > current_max:
aggregatedValue["max_amount"] = amount
aggregatedValue["average_amount"] = round(aggregatedValue["total_amount"] / aggregatedValue["count"], 2)
expression: aggregatedValue
resultType: json
pipelines:
calculate_statistics:
from: payment_stream
via:
- type: groupByKey
- type: aggregate
store:
type: keyValue
retention: 1h
caching: false
initializer: init_stats
aggregator: update_stats
- type: toStream
- type: peek
forEach:
code: |
if value:
count = value.get("count", 0)
total = round(value.get("total_amount", 0), 2)
avg = value.get("average_amount", 0)
min_amt = value.get("min_amount", 0)
max_amt = value.get("max_amount", 0)
log.info("Customer {} - Count: {}, Total: ${}, Avg: ${}, Min: ${}, Max: ${}",
key, count, total, avg, min_amt, max_amt)
to: payment_statistics
Both approaches demonstrate the flexibility of the aggregate
operation. The simple version focuses on the core concept, while the complex version shows real-world statistical aggregation with human-readable JSON output.
Aggregate Components
- Initializer: Creates empty/initial state
- Aggregator: Updates state with each new value
- Result: Continuously updated aggregate in state store
Windowed Aggregations
Aggregations can be windowed to compute time-based analytics:
Window Types
Tumbling Windows
Non-overlapping, fixed-size time windows.
- type: windowByTime
windowType: tumbling
duration: 1h # 1-hour windows
grace: 5m # Allow 5 minutes for late data
Use cases: Hourly reports, daily summaries
Hopping Windows
Overlapping, fixed-size windows that advance by a hop interval.
- type: windowByTime
windowType: hopping
duration: 1h # Window size
advance: 15m # Hop interval
grace: 5m
Use cases: Moving averages, overlapping analytics
Session Windows
Dynamic windows based on periods of inactivity.
Use cases: User sessions, activity bursts
Windowed Aggregation Example
This example demonstrates windowed aggregation by calculating temperature statistics (count, average, min, max) for each sensor within 30-second time windows. The windowed key contains both the original sensor ID and the window time boundaries.
What it does:
- Groups by sensor: Each sensor's readings are processed separately
- Windows by time: Creates 30-second tumbling windows for aggregation
- Calculates statistics: Tracks count, sum, average, min, and max temperature
- Transforms window key: Converts WindowedString to a regular string key for output
- Writes to topic: Outputs windowed statistics to
sensor_window_stats
topic
Key KSML concepts demonstrated:
windowByTime
with tumbling windows for time-based aggregation- Window stores with configurable retention
- Accessing window metadata (start/end times) from the WindowedString key
- Transforming WindowedString keys using
map
withkeyValueMapper
- Complex aggregation state using JSON
Temperature sensor producer (click to expand)
functions:
generate_sensor_reading:
type: generator
globalCode: |
import random
import time
sensor_counter = 0
sensors = ["temp001", "temp002", "temp003"]
code: |
global sensor_counter, sensors
# Cycle through sensors
sensor = sensors[sensor_counter % len(sensors)]
sensor_counter += 1
# Generate temperature reading
temperature = round(random.uniform(18.0, 35.0), 1)
# Use sensor_id as key
key = sensor
expression: (key, temperature)
resultType: (string, double)
producers:
sensor_producer:
generator: generate_sensor_reading
interval: 2s
to:
topic: sensor_temperatures
keyType: string
valueType: double
Windowed temperature statistics processor (click to expand)
streams:
sensor_temperatures:
topic: sensor_temperatures
keyType: string
valueType: double
sensor_window_stats:
topic: sensor_window_stats
keyType: string
valueType: json
functions:
init_stats:
type: initializer
code: |
stats = {
"count": 0,
"sum": 0.0,
"min": None,
"max": None
}
expression: stats
resultType: json
update_stats:
type: aggregator
code: |
# Update count and sum
aggregatedValue["count"] += 1
aggregatedValue["sum"] += value
# Update min/max
if aggregatedValue["min"] is None or value < aggregatedValue["min"]:
aggregatedValue["min"] = value
if aggregatedValue["max"] is None or value > aggregatedValue["max"]:
aggregatedValue["max"] = value
# Calculate average
aggregatedValue["avg"] = round(aggregatedValue["sum"] / aggregatedValue["count"], 2)
result = aggregatedValue
expression: result
resultType: json
transform_window_key:
type: keyValueMapper
code: |
# Extract window information from the WindowedString key
sensor_id = key["key"]
window_start = key["startTime"]
window_end = key["endTime"]
# Create a new string key with window info
new_key = f"{sensor_id}_{window_start}_{window_end}"
# Add window metadata to the value
new_value = {
"sensor_id": sensor_id,
"window_start": window_start,
"window_end": window_end,
"stats": value
}
expression: (new_key, new_value)
resultType: (string, json)
log_window_stats:
type: forEach
code: |
# Log the transformed results
sensor_id = value["sensor_id"]
window_start = value["window_start"]
window_end = value["window_end"]
stats = value["stats"]
log.info("Sensor {} | Window [{} - {}] | Stats: {} readings, avg: {}C, min: {}C, max: {}C",
sensor_id, window_start, window_end,
stats["count"], stats["avg"], stats["min"], stats["max"])
pipelines:
windowed_temperature_stats:
from: sensor_temperatures
via:
- type: groupByKey
- type: windowByTime
windowType: tumbling
duration: 30s
grace: 5s
- type: aggregate
store:
type: window
windowSize: 30s
retention: 2m
caching: false
initializer: init_stats
aggregator: update_stats
- type: toStream
- type: map
mapper: transform_window_key
- type: peek
forEach: log_window_stats
to: sensor_window_stats
Understanding the WindowedString key:
When using windowed aggregations, the key becomes a WindowedString
object containing:
key
: The original key (sensor ID)start
/end
: Window boundaries in millisecondsstartTime
/endTime
: Human-readable UTC timestamps
This allows you to know exactly which time window each aggregation result belongs to.
Output format:
The output topic contains messages with:
- Key:
{sensor_id}_{window_start}_{window_end}
(e.g.,temp001_2025-08-12T18:58:00Z_2025-08-12T18:58:30Z
) - Value: JSON object containing:
sensor_id
: The sensor identifierwindow_start
/window_end
: Window boundaries in UTCstats
: Aggregated statistics (count, avg, min, max)
Example output message:
{
"sensor_id": "temp001",
"window_start": "2025-08-12T18:58:00Z",
"window_end": "2025-08-12T18:58:30Z",
"stats": {
"count": 3,
"avg": 24.5,
"min": 19.6,
"max": 28.7,
"sum": 73.5
}
}
Verifying the input data:
To check the raw sensor temperature readings (binary double values), use:
docker exec broker kafka-console-consumer.sh \
--bootstrap-server broker:9093 \
--topic sensor_temperatures \
--from-beginning \
--max-messages 10 \
--property print.key=true \
--property key.separator=" | " \
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
--value-deserializer org.apache.kafka.common.serialization.DoubleDeserializer
Example input messages:
Advanced: Cogroup Operation
Cogroup allows combining multiple grouped streams into a single aggregation. This is useful when you need to aggregate data from different sources into one unified result.
Orders, Refunds, and Bonuses Producer (click to expand)
functions:
generate_order:
type: generator
globalCode: |
import random
import time
order_counter = 0
customers = ["alice", "bob", "charlie", "diana", "eve"]
code: |
global order_counter, customers
order_counter += 1
customer = random.choice(customers)
value = {
"order_id": f"ORD{order_counter:04d}",
"customer": customer,
"amount": round(random.uniform(10, 500), 2),
"timestamp": int(time.time() * 1000)
}
key = customer
expression: (key, value)
resultType: (string, json)
generate_refund:
type: generator
globalCode: |
import random
import time
refund_counter = 0
customers = ["alice", "bob", "charlie", "diana", "eve"]
code: |
global refund_counter, customers
# Generate refunds less frequently
if random.random() < 0.3: # 30% chance of refund
refund_counter += 1
customer = random.choice(customers)
value = {
"refund_id": f"REF{refund_counter:04d}",
"customer": customer,
"amount": round(random.uniform(5, 100), 2),
"timestamp": int(time.time() * 1000)
}
key = customer
else:
key = None
value = None
expression: (key, value) if key else (None, None)
resultType: (string, json)
generate_bonus:
type: generator
globalCode: |
import random
import time
bonus_counter = 0
customers = ["alice", "bob", "charlie", "diana", "eve"]
code: |
global bonus_counter, customers
# Generate bonuses rarely
if random.random() < 0.1: # 10% chance of bonus
bonus_counter += 1
customer = random.choice(customers)
value = {
"bonus_id": f"BON{bonus_counter:04d}",
"customer": customer,
"points": random.randint(10, 100),
"timestamp": int(time.time() * 1000)
}
key = customer
else:
key = None
value = None
expression: (key, value) if key else (None, None)
resultType: (string, json)
producers:
order_producer:
generator: generate_order
interval: 2s
to:
topic: customer_orders
keyType: string
valueType: json
refund_producer:
generator: generate_refund
interval: 3s
to:
topic: customer_refunds
keyType: string
valueType: json
bonus_producer:
generator: generate_bonus
interval: 5s
to:
topic: customer_bonuses
keyType: string
valueType: json
Cogroup Processor (click to expand)
streams:
orders:
topic: customer_orders
keyType: string
valueType: json
refunds:
topic: customer_refunds
keyType: string
valueType: json
bonuses:
topic: customer_bonuses
keyType: string
valueType: json
customer_totals:
topic: customer_totals
keyType: string
valueType: json
functions:
aggregate_order:
type: aggregator
code: |
# Initialize if first time
if aggregatedValue is None:
aggregatedValue = {}
# Add order amount to total
if value is not None:
aggregatedValue["total_orders"] = aggregatedValue.get("total_orders", 0) + value.get("amount", 0)
aggregatedValue["order_count"] = aggregatedValue.get("order_count", 0) + 1
result = aggregatedValue
expression: result
resultType: json
aggregate_refund:
type: aggregator
code: |
# Initialize if first time
if aggregatedValue is None:
aggregatedValue = {}
# Add refund amount to total (refunds are subtracted later)
if value is not None:
aggregatedValue["total_refunds"] = aggregatedValue.get("total_refunds", 0) + value.get("amount", 0)
aggregatedValue["refund_count"] = aggregatedValue.get("refund_count", 0) + 1
result = aggregatedValue
expression: result
resultType: json
aggregate_bonus:
type: aggregator
code: |
# Initialize if first time
if aggregatedValue is None:
aggregatedValue = {}
# Add bonus points to total
if value is not None:
aggregatedValue["total_bonus_points"] = aggregatedValue.get("total_bonus_points", 0) + value.get("points", 0)
aggregatedValue["bonus_count"] = aggregatedValue.get("bonus_count", 0) + 1
result = aggregatedValue
expression: result
resultType: json
compute_net_value:
type: aggregator
code: |
# Compute net value (orders - refunds + bonus points as dollars)
orders = aggregatedValue.get("total_orders", 0)
refunds = aggregatedValue.get("total_refunds", 0)
bonus_dollars = aggregatedValue.get("total_bonus_points", 0) * 0.1 # 1 point = $0.10
aggregatedValue["net_value"] = round(orders - refunds + bonus_dollars, 2)
aggregatedValue["customer"] = key
result = aggregatedValue
expression: result
resultType: json
pipelines:
# First stream: group orders
grouped_orders:
from: orders
via:
- type: groupByKey
- type: cogroup
aggregator: aggregate_order
as: orders_grouped
# Second stream: group refunds and cogroup with orders
grouped_refunds:
from: refunds
via:
- type: groupByKey
- type: cogroup
with: orders_grouped
aggregator: aggregate_refund
as: refunds_grouped
# Third stream: group bonuses and cogroup with previous
customer_aggregates:
from: bonuses
via:
- type: groupByKey
- type: cogroup
with: refunds_grouped
aggregator: aggregate_bonus
# Final aggregation to compute net value
- type: aggregate
initializer:
expression: {}
resultType: json
aggregator: compute_net_value
store:
name: customer_totals_store
type: keyValue
caching: true
- type: toStream
- type: peek
forEach:
code: |
log.info("CUSTOMER TOTALS - Customer: {}, Orders: ${} ({}), Refunds: ${} ({}), Bonus Points: {}, Net Value: ${}",
key,
value.get("total_orders", 0),
value.get("order_count", 0),
value.get("total_refunds", 0),
value.get("refund_count", 0),
value.get("total_bonus_points", 0),
value.get("net_value", 0))
to: customer_totals
How Cogroup Works
The cogroup operation:
- Groups each stream independently by key
- Combines the grouped streams using cogroup operations
- Each stream contributes to the aggregate with its own aggregator function
- Final aggregate operation computes the combined result
Note: Cogroup is an advanced feature that requires careful coordination between multiple streams. Ensure all streams are properly grouped and that aggregator functions handle null values appropriately.
Complex Example: Regional Sales Analytics
This example demonstrates rekeying (changing the grouping key) and windowed aggregation to calculate regional sales statistics:
What it does:
- Receives sales events keyed by product ID with region, amount, and quantity data
- Rekeys by region - changes the grouping from product to geographical region
- Windows by time - creates 1-minute tumbling windows for aggregation
- Aggregates metrics - tracks total sales, quantities, transaction counts, and per-product breakdowns
- Outputs regional statistics - writes windowed regional summaries to output topic
Key KSML concepts demonstrated:
map
withkeyValueMapper
to change the message key (rekeying)groupByKey
after rekeying to group by the new keywindowByTime
for time-based regional analytics- Complex aggregation state with nested data structures
- Tracking multiple metrics in a single aggregation
Sales events producer (click to expand)
functions:
generate_sales_event:
type: generator
globalCode: |
import random
import time
product_counter = 0
products = ["laptop", "phone", "tablet", "monitor", "keyboard", "mouse", "headphones", "webcam"]
regions = ["North", "South", "East", "West", "Central"]
code: |
global product_counter, products, regions
# Cycle through products
product = products[product_counter % len(products)]
product_counter += 1
# Generate sale event
value = {
"product_id": product,
"region": random.choice(regions),
"amount": round(random.uniform(10, 1000), 2),
"quantity": random.randint(1, 5),
"timestamp": int(time.time() * 1000)
}
# Use product_id as key
key = product
expression: (key, value)
resultType: (string, json)
producers:
sales_producer:
generator: generate_sales_event
interval: 2s
to:
topic: retail_sales
keyType: string
valueType: json
Regional sales analytics processor (click to expand)
streams:
sales_events:
topic: retail_sales
keyType: string # Product ID
valueType: json # Sale details including region, amount, quantity
sales_by_region:
topic: sales_by_region
keyType: string # Region
valueType: json # Aggregated sales statistics
functions:
extract_region:
type: keyValueMapper
code: |
# Extract region from the sale event and use it as the new key
region = value.get("region", "unknown")
expression: (region, value)
resultType: (string, json)
initialize_sales_stats:
type: initializer
expression: {"total_sales": 0.0, "total_quantity": 0, "transaction_count": 0, "products": {}}
resultType: json
aggregate_sales:
type: aggregator
code: |
# Initialize aggregatedValue if None (first aggregation)
if aggregatedValue is None:
aggregatedValue = {"total_sales": 0.0, "total_quantity": 0, "transaction_count": 0, "products": {}}
# Extract data from the sale
product_id = value.get("product_id", "unknown")
amount = value.get("amount", 0)
quantity = value.get("quantity", 0)
# Update aggregated values
aggregatedValue["total_sales"] += amount
aggregatedValue["total_quantity"] += quantity
aggregatedValue["transaction_count"] += 1
# Track per-product statistics
if product_id not in aggregatedValue["products"]:
aggregatedValue["products"][product_id] = {"sales": 0.0, "quantity": 0}
aggregatedValue["products"][product_id]["sales"] += amount
aggregatedValue["products"][product_id]["quantity"] += quantity
# Calculate average sale
aggregatedValue["avg_sale"] = round(aggregatedValue["total_sales"] / aggregatedValue["transaction_count"], 2)
return aggregatedValue
resultType: json
transform_window_key:
type: keyValueMapper
code: |
# Extract window information from the WindowedString key
region = key["key"]
window_start = key["startTime"]
window_end = key["endTime"]
# Create a new string key with window info
new_key = f"{region}_{window_start}_{window_end}"
# Add window metadata to the value
new_value = {
"region": region,
"window_start": window_start,
"window_end": window_end,
"stats": value
}
expression: (new_key, new_value)
resultType: (string, json)
pipelines:
regional_sales_analytics:
from: sales_events
via:
# Group by region instead of product ID
- type: map
mapper: extract_region
- type: groupByKey
# Use tumbling window of 1 minute for demo
- type: windowByTime
windowType: tumbling
duration: 1m
grace: 10s
# Aggregate sales data
- type: aggregate
store:
type: window
windowSize: 1m
retention: 10m
initializer: initialize_sales_stats
aggregator: aggregate_sales
- type: toStream
- type: map
mapper: transform_window_key
- type: peek
forEach:
code: |
region = value["region"]
stats = value["stats"]
log.info("Region {} sales (1-min window): ${} from {} transactions",
region, stats["total_sales"], stats["transaction_count"])
# Output to region-specific topic
to: sales_by_region
Pipeline flow:
- Input: Sales events with product_id as key
- Rekey: Map operation changes key to region
- Group: Group by the new region key
- Window: Apply 1-minute tumbling windows
- Aggregate: Calculate regional statistics per window
- Output: Windowed regional sales summaries
Performance Considerations
State Store Types
RocksDB (default)
- Persistent, can handle large state
- Slower than in-memory
- Survives restarts
In-Memory
- Fast but limited by heap size
- Lost on restart (rebuilt from changelog)
- Good for small, temporary state
Optimization Strategies
-
Enable Caching
-
Tune Commit Intervals
- Longer intervals = better throughput, higher latency
- Shorter intervals = lower latency, more overhead
To configure commit intervals, change your Kafka broker settings:
-
Pre-filter Data
- Filter before grouping to reduce state size
- Remove unnecessary fields early
-
Choose Appropriate Window Sizes
- Smaller windows = less memory
- Consider business requirements vs resources
Memory Management
Monitor state store sizes:
- Each unique key requires memory
- Windowed aggregations multiply by number of windows
- Use retention policies to limit window history
Common Pitfalls and Solutions
Forgetting to Group
Problem: Aggregation operations require grouped streams
Solution: Always use groupByKey
or groupBy
before aggregating
Null Value Handling
Problem: Null values can cause aggregation failures
Solution: Check for nulls in aggregator functions:
Type Mismatches
Problem: Result type doesn't match expression output
Solution: Ensure resultType
matches what your expression returns
Window Size vs Retention
Problem: Confusion between window size and retention
Solution:
- Window size = duration of each window
- Retention = how long to keep old windows
- Retention should be > window size
Late Arriving Data
Problem: Data arrives after window closes
Solution: Configure appropriate grace periods:
Conclusion
KSML aggregations enable powerful real-time analytics:
- Count for frequency analysis
- Reduce for simple combinations
- Aggregate for complex statistics
- Windowed operations for time-based analytics
- Cogroup for multi-stream aggregations
Choose the appropriate aggregation type based on your use case, and always consider state management and performance implications.