Skip to content

Working with Aggregations in KSML

This tutorial explores how to compute statistics, summaries, and time-based analytics from streaming data using KSML's aggregation operations.

Introduction

Aggregations are stateful operations that combine multiple records into summary values. They are fundamental to stream processing, enabling real-time analytics from continuous data streams.

KSML aggregations(stateful processing capabilities) are built on top of Kafka Streams aggregation operations.

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic financial_transactions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic transaction_sums && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_amounts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_totals && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_stream && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_statistics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_actions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_action_counts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic retail_sales && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sales_by_region && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_temperatures && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_window_stats && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_refunds && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_bonuses && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_totals && \

Core Aggregation Concepts

Grouping Requirements

All aggregations require data to be grouped by key first:

# Group by existing key
- type: groupByKey

# Group by new key using mapper function
- type: groupBy
  mapper:
    expression: value.get("category")
    resultType: string

Kafka Streams equivalents:

  • groupByKeyKStream.groupByKey()
  • groupByKStream.groupBy()

State Stores

Aggregations maintain state in local stores that are fault-tolerant through changelog topics. A changelog is a compacted Kafka topic that records every state change, allowing the state to be rebuilt if an instance fails or restarts. This provides exactly-once processing guarantees and enables automatic state recovery.

Key-Value Stores

Used for regular (non-windowed) aggregations:

store:
  name: my_aggregate_store
  type: keyValue
  caching: true           # Enable caching to reduce downstream updates
  loggingDisabled: false  # Keep changelog for fault tolerance (default: false)
  persistent: true        # Use RocksDB for persistence (default: true)

Window Stores

Required for windowed aggregations to store time-based state:

store:
  name: my_window_store
  type: window
  windowSize: 1h          # Must match the window duration
  retention: 24h          # How long to keep expired windows (>= windowSize + grace)
  retainDuplicates: false # Keep only latest value per window (default: false)
  caching: true           # Enable caching for better performance

Important considerations:

  • windowSize must match your windowByTime duration
  • retention should be at least windowSize + grace period to handle late-arriving data
  • Longer retention uses more disk space but allows querying historical windows
  • Caching reduces the number of downstream updates and improves performance

Function Types in Aggregations

Initializer Function Creates the initial state value when a key is seen for the first time.

initializer:
  expression: {"count": 0, "sum": 0}  # Initial state
  resultType: json

Aggregator Function Updates the aggregate state by combining the current aggregate with a new incoming value.

aggregator:
  code: |
    # aggregatedValue: current aggregate
    # value: new record to add
    result = {
      "count": aggregatedValue["count"] + 1,
      "sum": aggregatedValue["sum"] + value["amount"]
    }
  expression: result
  resultType: json

Reducer Function (for reduce operations) Combines two values of the same type into a single value, used when no initialization is needed.

reducer:
  code: |
    # value1, value2: values to combine
    combined = value1 + value2
  expression: combined
  resultType: long

Types of Aggregations in KSML

KSML supports several aggregation types, each with specific use cases:

1. Count

Counts the number of records per key.

Kafka Streams equivalent: KGroupedStream.count()

When to use:

  • Tracking event frequencies
  • Monitoring activity levels
  • Simple counting metrics

2. Reduce

Combines values using a reducer function without an initial value.

Kafka Streams equivalent: KGroupedStream.reduce()

When to use:

  • Summing values
  • Finding min/max
  • Combining values of the same type

3. Aggregate

Builds complex aggregations with custom initialization and aggregation logic.

Kafka Streams equivalent: KGroupedStream.aggregate()

When to use:

  • Computing statistics (avg, stddev)
  • Building complex state
  • Transforming value types during aggregation

4. Cogroup

Aggregates multiple grouped streams together into a single result.

Kafka Streams equivalent: CogroupedKStream

When to use:

  • Combining data from multiple sources
  • Building unified aggregates from different streams
  • Complex multi-stream analytics

Count Example

Simple counting of events per key:

User actions producer (click to expand)
functions:
  generate_user_action:
    type: generator
    globalCode: |
      import random
      import time
      user_counter = 0
      users = ["alice", "bob", "charlie", "david", "eve", "frank", "grace", "henry"]
      actions = ["login", "view", "click", "purchase", "logout", "search", "share", "comment"]
    code: |
      global user_counter, users, actions

      # Cycle through users
      user = users[user_counter % len(users)]
      user_counter += 1

      # Generate action event
      value = {
        "user_id": user,
        "action": random.choice(actions),
        "timestamp": int(time.time() * 1000)
      }

      # Use user_id as key
      key = user
    expression: (key, value)
    resultType: (string, json)

producers:
  user_action_producer:
    generator: generate_user_action
    interval: 1s
    to:
      topic: user_actions
      keyType: string
      valueType: json
Count user actions processor (click to expand)
streams:
  user_actions:
    topic: user_actions
    keyType: string  # user_id
    valueType: json  # action details

  user_action_counts:
    topic: user_action_counts
    keyType: string
    valueType: string

pipelines:
  count_by_user:
    from: user_actions
    via:
      - type: groupByKey
      - type: count
      - type: toStream
      - type: convertValue
        into: string
      - type: peek
        forEach:
          code: |
            log.info("User {} has performed {} actions", key, value)
    to: user_action_counts

How Count Works

The count operation:

  1. Groups messages by key using groupByKey
  2. Maintains a counter per unique key
  3. Increments the counter for each message
  4. Outputs the current count as a KTable

Reduce Example

The reduce operation combines values without initialization, making it perfect for operations like summing, finding minimums/maximums, or concatenating strings. This section shows two approaches: a simple binary format for efficiency, and a JSON format for human readability.

Simple Reduce (Binary Format)

This example demonstrates the core reduce concept with minimal complexity, using binary long values for efficiency.

What it does:

  1. Generates transactions: Creates random transaction amounts as long values (cents)
  2. Groups by account: Groups transactions by account_id (the message key)
  3. Reduces values: Sums all transaction amounts using a simple reducer
  4. Outputs totals: Writes aggregated totals as long values

Key KSML concepts demonstrated:

  • groupByKey for partitioning data by key
  • reduce operation for stateful aggregation
  • Binary data types for processing efficiency
Simple producer (binary long values) - click to expand
functions:
  generate_transaction:
    type: generator
    globalCode: |
      import random
      account_counter = 0
      accounts = ["ACC001", "ACC002", "ACC003", "ACC004", "ACC005"]
    code: |
      global account_counter, accounts

      # Cycle through accounts
      account = accounts[account_counter % len(accounts)]
      account_counter += 1

      # Generate transaction amount (in cents)
      amount = random.randint(100, 50000)

      key = account
      value = amount
    expression: (key, value)
    resultType: (string, long)

producers:
  transaction_producer:
    generator: generate_transaction
    interval: 1s
    to:
      topic: financial_transactions
      keyType: string
      valueType: long
Simple processor (reduce only) - click to expand
streams:
  financial_transactions:
    topic: financial_transactions
    keyType: string  # account_id
    valueType: long  # transaction amount

  transaction_sums:
    topic: transaction_sums
    keyType: string
    valueType: long

functions:
  sum_amounts:
    type: reducer
    expression: value1 + value2
    resultType: long

pipelines:
  sum_transactions:
    from: financial_transactions
    via:
      - type: groupByKey
      - type: reduce
        reducer: sum_amounts
      - type: toStream
      - type: peek
        forEach:
          code: |
            log.info("Account {} total: {}", key, value)
    to: transaction_sums

Verifying the results:

Since binary data isn't human-readable in Kowl UI, use command-line tools to verify:

# Check current totals
kcat -b localhost:9092 -t transaction_sums -C -o end -c 5 -f 'Key: %k, Total: %s\n' -s value=Q

# Verify by summing all transactions for one account
kcat -b localhost:9092 -t financial_transactions -C -o beginning -f '%k,%s\n' -s value=Q -e | \
  grep "ACC001" | cut -d',' -f2 | awk '{sum += $1} END {print "Sum:", sum}'

Note: Binary formats like long are common in production for performance and storage efficiency.

Human-Readable Reduce (JSON Format)

This example shows the same reduce logic but with JSON messages for better visibility in Kafka UI tools.

Additional concepts demonstrated:

  • transformValue for data extraction and formatting
  • Type handling (JSON → long → JSON) for processing efficiency
  • Human-readable message formats
JSON producer (human-readable) - click to expand
functions:
  generate_transaction:
    type: generator
    globalCode: |
      import random
      import time
      account_counter = 0
      accounts = ["ACC001", "ACC002", "ACC003", "ACC004", "ACC005"]
    code: |
      global account_counter, accounts

      # Cycle through accounts
      account = accounts[account_counter % len(accounts)]
      account_counter += 1

      # Generate transaction amount (in cents to avoid float issues)
      amount_cents = random.randint(100, 50000)

      # Create human-readable JSON structure
      value = {
        "account_id": account,
        "amount_cents": amount_cents,
        "amount_dollars": round(amount_cents / 100.0, 2),
        "transaction_id": f"TXN{account_counter:06d}",
        "timestamp": int(time.time() * 1000)
      }

      # Use account_id as key
      key = account
    expression: (key, value)
    resultType: (string, json)

producers:
  transaction_producer:
    generator: generate_transaction
    interval: 1s
    to:
      topic: financial_transactions
      keyType: string
      valueType: json
JSON processor (with transformations) - click to expand
streams:
  financial_transactions:
    topic: financial_transactions
    keyType: string  # account_id
    valueType: json  # transaction details

  transaction_sums:
    topic: transaction_sums
    keyType: string
    valueType: json

functions:
  extract_amount:
    type: valueTransformer
    code: |
      # Extract amount_cents from JSON transaction
      if value is not None:
        amount = value.get("amount_cents", 0)
      else:
        amount = 0
    expression: amount
    resultType: long

  sum_amounts:
    type: reducer
    code: |
      # Sum two transaction amounts (in cents)
      total = value1 + value2
    expression: total
    resultType: long

  format_total:
    type: valueTransformer
    code: |
      # Convert long cents to JSON format for human readability
      if value is not None:
        result = {
          "total_cents": value,
          "total_dollars": round(value / 100.0, 2)
        }
      else:
        result = {
          "total_cents": 0,
          "total_dollars": 0.0
        }
    expression: result
    resultType: json

pipelines:
  sum_transactions:
    from: financial_transactions
    via:
      - type: peek
        forEach:
          code: |
            amount_dollars = value.get("amount_dollars", 0) if value else 0
            txn_id = value.get("transaction_id", "N/A") if value else "N/A"
            log.info("Processing transaction {} for account {}: ${}", txn_id, key, amount_dollars)
      - type: transformValue
        mapper: extract_amount
      - type: groupByKey
      - type: reduce
        reducer: sum_amounts
      - type: toStream
      - type: transformValue
        mapper: format_total
      - type: peek
        forEach:
          code: |
            total_dollars = value.get("total_dollars", 0) if value else 0
            log.info("Account {} running total: ${}", key, total_dollars)
    to: transaction_sums

Reduce vs Aggregate

Choose reduce when:

  • Values are of the same type as the result
  • No initialization is needed
  • Simple combination logic (sum, min, max)

Choose aggregate when:

  • Result type differs from input type
  • Custom initialization is required
  • Complex state management is needed

Aggregate Example

The aggregate operation provides custom initialization and aggregation logic, making it perfect for building complex statistics or when input and output types differ. This section shows two approaches: a simple binary format for core concepts, and a JSON format for comprehensive statistics.

Simple Aggregate (Binary Format)

This example demonstrates the core aggregate concept with minimal complexity, using binary long values.

What it does:

  1. Initializes to zero: Starts aggregation with a zero value using simple expression
  2. Groups by customer: Groups payment amounts by customer_id (the message key)
  3. Sums amounts: Adds each payment amount to the running total
  4. Outputs totals: Writes aggregated totals as long values

Key KSML concepts demonstrated:

  • initializer with simple expression (no custom function needed)
  • aggregator with simple arithmetic expression
  • Binary data types for processing efficiency
Simple producer (binary long values) - click to expand
functions:
  generate_amount:
    type: generator
    globalCode: |
      import random
      customer_counter = 0
      customers = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
    code: |
      global customer_counter, customers

      # Cycle through customers
      customer = customers[customer_counter % len(customers)]
      customer_counter += 1

      # Generate amount in cents as long
      amount = random.randint(1000, 50000)  # $10 to $500

      key = customer
      value = amount
    expression: (key, value)
    resultType: (string, long)

producers:
  amount_producer:
    generator: generate_amount
    interval: 1s
    to:
      topic: payment_amounts
      keyType: string
      valueType: long
Simple processor (aggregate only) - click to expand
streams:
  payment_amounts:
    topic: payment_amounts
    keyType: string  # customer_id
    valueType: long  # amount in cents

  payment_totals:
    topic: payment_totals
    keyType: string
    valueType: long

pipelines:
  calculate_totals:
    from: payment_amounts
    via:
      - type: groupByKey
      - type: aggregate
        store:
          type: keyValue
          retention: 1h
        initializer:
          expression: 0
          resultType: long
        aggregator:
          expression: aggregatedValue + value
      - type: toStream
      - type: peek
        forEach:
          code: |
            log.info("Customer {} total: {}", key, value)
    to: payment_totals

Verifying the results:

Since binary data isn't human-readable in Kowl UI, use command-line tools to verify:

# Check current totals (convert cents to dollars)
kcat -b localhost:9092 -t payment_totals -C -o end -c 5 -f 'Key: %k, Total cents: %s\n' -s value=Q

# Calculate expected total for one customer
kcat -b localhost:9092 -t payment_amounts -C -o beginning -f '%k,%s\n' -s value=Q -e | \
  grep "CUST001" | cut -d',' -f2 | awk '{sum += $1} END {print "Expected:", sum}'

Complex Aggregate (JSON Format)

This example shows advanced aggregation with comprehensive statistics using JSON for human readability.

Additional concepts demonstrated:

  • Custom initializer function for complex state initialization
  • Custom aggregator function for multi-field updates
  • JSON state management for rich aggregations
  • Dynamic calculations (average) within aggregation logic

What it does:

  1. Initializes statistics: Creates a JSON structure to track multiple metrics (count, total, min, max, average)
  2. Groups by customer: Groups payment events by account_id (the message key)
  3. Updates statistics: For each payment, updates all metrics in the aggregated state
  4. Calculates average: Dynamically computes the average amount per customer
  5. Outputs comprehensive stats: Produces JSON messages with complete payment statistics
JSON payment events producer (click to expand)
functions:
  generate_payment:
    type: generator
    globalCode: |
      import random
      import time
      account_counter = 0
      accounts = ["ACC001", "ACC002", "ACC003", "ACC004", "ACC005", "ACC006"]
    code: |
      global account_counter, accounts

      # Cycle through accounts
      account = accounts[account_counter % len(accounts)]
      account_counter += 1

      # Generate payment event
      value = {
        "account_id": account,
        "amount": round(random.uniform(5.0, 500.0), 2),
        "currency": "USD",
        "timestamp": int(time.time() * 1000)
      }

      # Use account_id as key
      key = account
    expression: (key, value)
    resultType: (string, json)

producers:
  payment_producer:
    generator: generate_payment
    interval: 1s
    to:
      topic: payment_stream
      keyType: string
      valueType: json
JSON statistics processor (click to expand)
streams:
  payment_stream:
    topic: payment_stream
    keyType: string  # customer_id
    valueType: json  # payment details with amount field

  payment_statistics:
    topic: payment_statistics
    keyType: string
    valueType: json

functions:
  init_stats:
    type: initializer
    code: |
      # Initialize statistics
      stats = {
        "count": 0,
        "total_amount": 0.0,
        "min_amount": None,
        "max_amount": None
      }
    expression: stats
    resultType: json

  update_stats:
    type: aggregator
    code: |
      # Update payment statistics
      if value and aggregatedValue:
        amount = value.get("amount", 0.0)
        aggregatedValue["count"] = aggregatedValue.get("count", 0) + 1
        aggregatedValue["total_amount"] = aggregatedValue.get("total_amount", 0.0) + amount

        current_min = aggregatedValue.get("min_amount")
        current_max = aggregatedValue.get("max_amount")

        if current_min is None or amount < current_min:
          aggregatedValue["min_amount"] = amount
        if current_max is None or amount > current_max:
          aggregatedValue["max_amount"] = amount

        aggregatedValue["average_amount"] = round(aggregatedValue["total_amount"] / aggregatedValue["count"], 2)
    expression: aggregatedValue
    resultType: json

pipelines:
  calculate_statistics:
    from: payment_stream
    via:
      - type: groupByKey
      - type: aggregate
        store:
          type: keyValue
          retention: 1h
          caching: false
        initializer: init_stats
        aggregator: update_stats
      - type: toStream
      - type: peek
        forEach:
          code: |
            if value:
              count = value.get("count", 0)
              total = round(value.get("total_amount", 0), 2)
              avg = value.get("average_amount", 0)
              min_amt = value.get("min_amount", 0)
              max_amt = value.get("max_amount", 0)
              log.info("Customer {} - Count: {}, Total: ${}, Avg: ${}, Min: ${}, Max: ${}", 
                       key, count, total, avg, min_amt, max_amt)
    to: payment_statistics

Both approaches demonstrate the flexibility of the aggregate operation. The simple version focuses on the core concept, while the complex version shows real-world statistical aggregation with human-readable JSON output.

Aggregate Components

  1. Initializer: Creates empty/initial state
  2. Aggregator: Updates state with each new value
  3. Result: Continuously updated aggregate in state store

Windowed Aggregations

Aggregations can be windowed to compute time-based analytics:

Window Types

Tumbling Windows

Non-overlapping, fixed-size time windows.

- type: windowByTime
  windowType: tumbling
  duration: 1h  # 1-hour windows
  grace: 5m     # Allow 5 minutes for late data

Use cases: Hourly reports, daily summaries

Hopping Windows

Overlapping, fixed-size windows that advance by a hop interval.

- type: windowByTime
  windowType: hopping
  duration: 1h    # Window size
  advance: 15m    # Hop interval
  grace: 5m

Use cases: Moving averages, overlapping analytics

Session Windows

Dynamic windows based on periods of inactivity.

- type: windowBySession
  inactivityGap: 30m  # Close window after 30 min of inactivity
  grace: 5m

Use cases: User sessions, activity bursts

Windowed Aggregation Example

This example demonstrates windowed aggregation by calculating temperature statistics (count, average, min, max) for each sensor within 30-second time windows. The windowed key contains both the original sensor ID and the window time boundaries.

What it does:

  1. Groups by sensor: Each sensor's readings are processed separately
  2. Windows by time: Creates 30-second tumbling windows for aggregation
  3. Calculates statistics: Tracks count, sum, average, min, and max temperature
  4. Transforms window key: Converts WindowedString to a regular string key for output
  5. Writes to topic: Outputs windowed statistics to sensor_window_stats topic

Key KSML concepts demonstrated:

  • windowByTime with tumbling windows for time-based aggregation
  • Window stores with configurable retention
  • Accessing window metadata (start/end times) from the WindowedString key
  • Transforming WindowedString keys using map with keyValueMapper
  • Complex aggregation state using JSON
Temperature sensor producer (click to expand)
functions:
  generate_sensor_reading:
    type: generator
    globalCode: |
      import random
      import time
      sensor_counter = 0
      sensors = ["temp001", "temp002", "temp003"]
    code: |
      global sensor_counter, sensors

      # Cycle through sensors
      sensor = sensors[sensor_counter % len(sensors)]
      sensor_counter += 1

      # Generate temperature reading
      temperature = round(random.uniform(18.0, 35.0), 1)

      # Use sensor_id as key
      key = sensor
    expression: (key, temperature)
    resultType: (string, double)

producers:
  sensor_producer:
    generator: generate_sensor_reading
    interval: 2s
    to:
      topic: sensor_temperatures
      keyType: string
      valueType: double
Windowed temperature statistics processor (click to expand)
streams:
  sensor_temperatures:
    topic: sensor_temperatures
    keyType: string
    valueType: double

  sensor_window_stats:
    topic: sensor_window_stats
    keyType: string
    valueType: json

functions:
  init_stats:
    type: initializer
    code: |
      stats = {
        "count": 0,
        "sum": 0.0,
        "min": None,
        "max": None
      }
    expression: stats
    resultType: json

  update_stats:
    type: aggregator
    code: |
      # Update count and sum
      aggregatedValue["count"] += 1
      aggregatedValue["sum"] += value

      # Update min/max
      if aggregatedValue["min"] is None or value < aggregatedValue["min"]:
        aggregatedValue["min"] = value
      if aggregatedValue["max"] is None or value > aggregatedValue["max"]:
        aggregatedValue["max"] = value

      # Calculate average
      aggregatedValue["avg"] = round(aggregatedValue["sum"] / aggregatedValue["count"], 2)

      result = aggregatedValue
    expression: result
    resultType: json

  transform_window_key:
    type: keyValueMapper
    code: |
      # Extract window information from the WindowedString key
      sensor_id = key["key"]
      window_start = key["startTime"]
      window_end = key["endTime"]

      # Create a new string key with window info
      new_key = f"{sensor_id}_{window_start}_{window_end}"

      # Add window metadata to the value
      new_value = {
        "sensor_id": sensor_id,
        "window_start": window_start,
        "window_end": window_end,
        "stats": value
      }
    expression: (new_key, new_value)
    resultType: (string, json)

  log_window_stats:
    type: forEach
    code: |
      # Log the transformed results
      sensor_id = value["sensor_id"]
      window_start = value["window_start"]
      window_end = value["window_end"]
      stats = value["stats"]

      log.info("Sensor {} | Window [{} - {}] | Stats: {} readings, avg: {}C, min: {}C, max: {}C", 
               sensor_id, window_start, window_end, 
               stats["count"], stats["avg"], stats["min"], stats["max"])

pipelines:
  windowed_temperature_stats:
    from: sensor_temperatures
    via:
      - type: groupByKey
      - type: windowByTime
        windowType: tumbling
        duration: 30s
        grace: 5s
      - type: aggregate
        store:
          type: window
          windowSize: 30s
          retention: 2m
          caching: false
        initializer: init_stats
        aggregator: update_stats
      - type: toStream
      - type: map
        mapper: transform_window_key
      - type: peek
        forEach: log_window_stats
    to: sensor_window_stats

Understanding the WindowedString key:

When using windowed aggregations, the key becomes a WindowedString object containing:

  • key: The original key (sensor ID)
  • start/end: Window boundaries in milliseconds
  • startTime/endTime: Human-readable UTC timestamps

This allows you to know exactly which time window each aggregation result belongs to.

Output format:

The output topic contains messages with:

  • Key: {sensor_id}_{window_start}_{window_end} (e.g., temp001_2025-08-12T18:58:00Z_2025-08-12T18:58:30Z)
  • Value: JSON object containing:
    • sensor_id: The sensor identifier
    • window_start/window_end: Window boundaries in UTC
    • stats: Aggregated statistics (count, avg, min, max)

Example output message:

{
  "sensor_id": "temp001",
  "window_start": "2025-08-12T18:58:00Z",
  "window_end": "2025-08-12T18:58:30Z",
  "stats": {
    "count": 3,
    "avg": 24.5,
    "min": 19.6,
    "max": 28.7,
    "sum": 73.5
  }
}

Verifying the input data:

To check the raw sensor temperature readings (binary double values), use:

docker exec broker kafka-console-consumer.sh \
  --bootstrap-server broker:9093 \
  --topic sensor_temperatures \
  --from-beginning \
  --max-messages 10 \
  --property print.key=true \
  --property key.separator=" | " \
  --key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
  --value-deserializer org.apache.kafka.common.serialization.DoubleDeserializer

Example input messages:

temp001 | 24.5
temp002 | 31.2
temp003 | 18.7
temp001 | 26.3
temp002 | 29.8

Advanced: Cogroup Operation

Cogroup allows combining multiple grouped streams into a single aggregation. This is useful when you need to aggregate data from different sources into one unified result.

Orders, Refunds, and Bonuses Producer (click to expand)
functions:
  generate_order:
    type: generator
    globalCode: |
      import random
      import time
      order_counter = 0
      customers = ["alice", "bob", "charlie", "diana", "eve"]
    code: |
      global order_counter, customers

      order_counter += 1
      customer = random.choice(customers)

      value = {
        "order_id": f"ORD{order_counter:04d}",
        "customer": customer,
        "amount": round(random.uniform(10, 500), 2),
        "timestamp": int(time.time() * 1000)
      }

      key = customer
    expression: (key, value)
    resultType: (string, json)

  generate_refund:
    type: generator
    globalCode: |
      import random
      import time
      refund_counter = 0
      customers = ["alice", "bob", "charlie", "diana", "eve"]
    code: |
      global refund_counter, customers

      # Generate refunds less frequently
      if random.random() < 0.3:  # 30% chance of refund
        refund_counter += 1
        customer = random.choice(customers)

        value = {
          "refund_id": f"REF{refund_counter:04d}",
          "customer": customer,
          "amount": round(random.uniform(5, 100), 2),
          "timestamp": int(time.time() * 1000)
        }

        key = customer
      else:
        key = None
        value = None
    expression: (key, value) if key else (None, None)
    resultType: (string, json)

  generate_bonus:
    type: generator
    globalCode: |
      import random
      import time
      bonus_counter = 0
      customers = ["alice", "bob", "charlie", "diana", "eve"]
    code: |
      global bonus_counter, customers

      # Generate bonuses rarely
      if random.random() < 0.1:  # 10% chance of bonus
        bonus_counter += 1
        customer = random.choice(customers)

        value = {
          "bonus_id": f"BON{bonus_counter:04d}",
          "customer": customer,
          "points": random.randint(10, 100),
          "timestamp": int(time.time() * 1000)
        }

        key = customer
      else:
        key = None
        value = None
    expression: (key, value) if key else (None, None)
    resultType: (string, json)

producers:
  order_producer:
    generator: generate_order
    interval: 2s
    to:
      topic: customer_orders
      keyType: string
      valueType: json

  refund_producer:
    generator: generate_refund
    interval: 3s
    to:
      topic: customer_refunds
      keyType: string
      valueType: json

  bonus_producer:
    generator: generate_bonus
    interval: 5s
    to:
      topic: customer_bonuses
      keyType: string
      valueType: json
Cogroup Processor (click to expand)
streams:
  orders:
    topic: customer_orders
    keyType: string
    valueType: json

  refunds:
    topic: customer_refunds
    keyType: string
    valueType: json

  bonuses:
    topic: customer_bonuses
    keyType: string
    valueType: json

  customer_totals:
    topic: customer_totals
    keyType: string
    valueType: json

functions:
  aggregate_order:
    type: aggregator
    code: |
      # Initialize if first time
      if aggregatedValue is None:
        aggregatedValue = {}

      # Add order amount to total
      if value is not None:
        aggregatedValue["total_orders"] = aggregatedValue.get("total_orders", 0) + value.get("amount", 0)
        aggregatedValue["order_count"] = aggregatedValue.get("order_count", 0) + 1
      result = aggregatedValue
    expression: result
    resultType: json

  aggregate_refund:
    type: aggregator
    code: |
      # Initialize if first time
      if aggregatedValue is None:
        aggregatedValue = {}

      # Add refund amount to total (refunds are subtracted later)
      if value is not None:
        aggregatedValue["total_refunds"] = aggregatedValue.get("total_refunds", 0) + value.get("amount", 0)
        aggregatedValue["refund_count"] = aggregatedValue.get("refund_count", 0) + 1
      result = aggregatedValue
    expression: result
    resultType: json

  aggregate_bonus:
    type: aggregator
    code: |
      # Initialize if first time
      if aggregatedValue is None:
        aggregatedValue = {}

      # Add bonus points to total
      if value is not None:
        aggregatedValue["total_bonus_points"] = aggregatedValue.get("total_bonus_points", 0) + value.get("points", 0)
        aggregatedValue["bonus_count"] = aggregatedValue.get("bonus_count", 0) + 1
      result = aggregatedValue
    expression: result
    resultType: json

  compute_net_value:
    type: aggregator
    code: |
      # Compute net value (orders - refunds + bonus points as dollars)
      orders = aggregatedValue.get("total_orders", 0)
      refunds = aggregatedValue.get("total_refunds", 0)
      bonus_dollars = aggregatedValue.get("total_bonus_points", 0) * 0.1  # 1 point = $0.10

      aggregatedValue["net_value"] = round(orders - refunds + bonus_dollars, 2)
      aggregatedValue["customer"] = key

      result = aggregatedValue
    expression: result
    resultType: json

pipelines:
  # First stream: group orders
  grouped_orders:
    from: orders
    via:
      - type: groupByKey
      - type: cogroup
        aggregator: aggregate_order
    as: orders_grouped

  # Second stream: group refunds and cogroup with orders
  grouped_refunds:
    from: refunds
    via:
      - type: groupByKey
      - type: cogroup
        with: orders_grouped
        aggregator: aggregate_refund
    as: refunds_grouped

  # Third stream: group bonuses and cogroup with previous
  customer_aggregates:
    from: bonuses
    via:
      - type: groupByKey
      - type: cogroup
        with: refunds_grouped
        aggregator: aggregate_bonus
      # Final aggregation to compute net value
      - type: aggregate
        initializer:
          expression: {}
          resultType: json
        aggregator: compute_net_value
        store:
          name: customer_totals_store
          type: keyValue
          caching: true
      - type: toStream
      - type: peek
        forEach:
          code: |
            log.info("CUSTOMER TOTALS - Customer: {}, Orders: ${} ({}), Refunds: ${} ({}), Bonus Points: {}, Net Value: ${}", 
                     key,
                     value.get("total_orders", 0),
                     value.get("order_count", 0),
                     value.get("total_refunds", 0),
                     value.get("refund_count", 0),
                     value.get("total_bonus_points", 0),
                     value.get("net_value", 0))
    to: customer_totals

How Cogroup Works

The cogroup operation:

  1. Groups each stream independently by key
  2. Combines the grouped streams using cogroup operations
  3. Each stream contributes to the aggregate with its own aggregator function
  4. Final aggregate operation computes the combined result

Note: Cogroup is an advanced feature that requires careful coordination between multiple streams. Ensure all streams are properly grouped and that aggregator functions handle null values appropriately.

Complex Example: Regional Sales Analytics

This example demonstrates rekeying (changing the grouping key) and windowed aggregation to calculate regional sales statistics:

What it does:

  1. Receives sales events keyed by product ID with region, amount, and quantity data
  2. Rekeys by region - changes the grouping from product to geographical region
  3. Windows by time - creates 1-minute tumbling windows for aggregation
  4. Aggregates metrics - tracks total sales, quantities, transaction counts, and per-product breakdowns
  5. Outputs regional statistics - writes windowed regional summaries to output topic

Key KSML concepts demonstrated:

  • map with keyValueMapper to change the message key (rekeying)
  • groupByKey after rekeying to group by the new key
  • windowByTime for time-based regional analytics
  • Complex aggregation state with nested data structures
  • Tracking multiple metrics in a single aggregation
Sales events producer (click to expand)
functions:
  generate_sales_event:
    type: generator
    globalCode: |
      import random
      import time
      product_counter = 0
      products = ["laptop", "phone", "tablet", "monitor", "keyboard", "mouse", "headphones", "webcam"]
      regions = ["North", "South", "East", "West", "Central"]
    code: |
      global product_counter, products, regions

      # Cycle through products
      product = products[product_counter % len(products)]
      product_counter += 1

      # Generate sale event
      value = {
        "product_id": product,
        "region": random.choice(regions),
        "amount": round(random.uniform(10, 1000), 2),
        "quantity": random.randint(1, 5),
        "timestamp": int(time.time() * 1000)
      }

      # Use product_id as key
      key = product
    expression: (key, value)
    resultType: (string, json)

producers:
  sales_producer:
    generator: generate_sales_event
    interval: 2s
    to:
      topic: retail_sales
      keyType: string
      valueType: json
Regional sales analytics processor (click to expand)
streams:
  sales_events:
    topic: retail_sales
    keyType: string  # Product ID
    valueType: json  # Sale details including region, amount, quantity

  sales_by_region:
    topic: sales_by_region
    keyType: string  # Region
    valueType: json  # Aggregated sales statistics

functions:
  extract_region:
    type: keyValueMapper
    code: |
      # Extract region from the sale event and use it as the new key
      region = value.get("region", "unknown")
    expression: (region, value)
    resultType: (string, json)

  initialize_sales_stats:
    type: initializer
    expression: {"total_sales": 0.0, "total_quantity": 0, "transaction_count": 0, "products": {}}
    resultType: json

  aggregate_sales:
    type: aggregator
    code: |
      # Initialize aggregatedValue if None (first aggregation)
      if aggregatedValue is None:
        aggregatedValue = {"total_sales": 0.0, "total_quantity": 0, "transaction_count": 0, "products": {}}

      # Extract data from the sale
      product_id = value.get("product_id", "unknown")
      amount = value.get("amount", 0)
      quantity = value.get("quantity", 0)

      # Update aggregated values
      aggregatedValue["total_sales"] += amount
      aggregatedValue["total_quantity"] += quantity
      aggregatedValue["transaction_count"] += 1

      # Track per-product statistics
      if product_id not in aggregatedValue["products"]:
        aggregatedValue["products"][product_id] = {"sales": 0.0, "quantity": 0}

      aggregatedValue["products"][product_id]["sales"] += amount
      aggregatedValue["products"][product_id]["quantity"] += quantity

      # Calculate average sale
      aggregatedValue["avg_sale"] = round(aggregatedValue["total_sales"] / aggregatedValue["transaction_count"], 2)

      return aggregatedValue
    resultType: json

  transform_window_key:
    type: keyValueMapper
    code: |
      # Extract window information from the WindowedString key
      region = key["key"]
      window_start = key["startTime"]
      window_end = key["endTime"]

      # Create a new string key with window info
      new_key = f"{region}_{window_start}_{window_end}"

      # Add window metadata to the value
      new_value = {
        "region": region,
        "window_start": window_start,
        "window_end": window_end,
        "stats": value
      }
    expression: (new_key, new_value)
    resultType: (string, json)

pipelines:
  regional_sales_analytics:
    from: sales_events
    via:
      # Group by region instead of product ID
      - type: map
        mapper: extract_region
      - type: groupByKey
      # Use tumbling window of 1 minute for demo
      - type: windowByTime
        windowType: tumbling
        duration: 1m
        grace: 10s
      # Aggregate sales data
      - type: aggregate
        store:
          type: window
          windowSize: 1m
          retention: 10m
        initializer: initialize_sales_stats
        aggregator: aggregate_sales
      - type: toStream
      - type: map
        mapper: transform_window_key
      - type: peek
        forEach:
          code: |
            region = value["region"]
            stats = value["stats"]
            log.info("Region {} sales (1-min window): ${} from {} transactions", 
                     region, stats["total_sales"], stats["transaction_count"])
    # Output to region-specific topic
    to: sales_by_region

Pipeline flow:

  1. Input: Sales events with product_id as key
  2. Rekey: Map operation changes key to region
  3. Group: Group by the new region key
  4. Window: Apply 1-minute tumbling windows
  5. Aggregate: Calculate regional statistics per window
  6. Output: Windowed regional sales summaries

Performance Considerations

State Store Types

RocksDB (default)

  • Persistent, can handle large state
  • Slower than in-memory
  • Survives restarts

In-Memory

  • Fast but limited by heap size
  • Lost on restart (rebuilt from changelog)
  • Good for small, temporary state

Optimization Strategies

  1. Enable Caching

    store:
      caching: true  # Reduces downstream updates
    

  2. Tune Commit Intervals

    • Longer intervals = better throughput, higher latency
    • Shorter intervals = lower latency, more overhead

    To configure commit intervals, change your Kafka broker settings:

    # In your ksml-runner.yaml or application config
    kafka:
      commit.interval.ms: 30000  # 30 seconds for better throughput
      # or
      commit.interval.ms: 100    # 100ms for lower latency
    

  3. Pre-filter Data

    • Filter before grouping to reduce state size
    • Remove unnecessary fields early
  4. Choose Appropriate Window Sizes

    • Smaller windows = less memory
    • Consider business requirements vs resources

Memory Management

Monitor state store sizes:

  • Each unique key requires memory
  • Windowed aggregations multiply by number of windows
  • Use retention policies to limit window history

Common Pitfalls and Solutions

Forgetting to Group

Problem: Aggregation operations require grouped streams

Solution: Always use groupByKey or groupBy before aggregating

Null Value Handling

Problem: Null values can cause aggregation failures

Solution: Check for nulls in aggregator functions:

code: |
  if value is None:
    return aggregatedValue
  # ... rest of logic

Type Mismatches

Problem: Result type doesn't match expression output

Solution: Ensure resultType matches what your expression returns

Window Size vs Retention

Problem: Confusion between window size and retention

Solution:

  • Window size = duration of each window
  • Retention = how long to keep old windows
  • Retention should be > window size

Late Arriving Data

Problem: Data arrives after window closes

Solution: Configure appropriate grace periods:

grace: 5m  # Allow 5 minutes for late data

Conclusion

KSML aggregations enable powerful real-time analytics:

  • Count for frequency analysis
  • Reduce for simple combinations
  • Aggregate for complex statistics
  • Windowed operations for time-based analytics
  • Cogroup for multi-stream aggregations

Choose the appropriate aggregation type based on your use case, and always consider state management and performance implications.

Further Reading