Skip to content

Real-Time Analytics with KSML

This tutorial demonstrates how to build a real-time analytics application using KSML. You'll learn how to process streaming data, calculate metrics in real-time, and visualize the results.

Introduction

Real-time analytics is one of the most common use cases for stream processing. By analyzing data as it arrives, you can:

  • Detect trends and patterns as they emerge
  • Respond quickly to changing conditions
  • Make data-driven decisions with minimal latency
  • Provide up-to-date dashboards and visualizations

In this tutorial, we'll build a real-time analytics pipeline that processes a stream of e-commerce transactions, calculates various metrics, and makes the results available for visualization.

Prerequisites

Before starting this tutorial, you should:

The Use Case

Imagine you're running an e-commerce platform and want to analyze transaction data in real-time. You want to track:

  • Total sales by product category
  • Average order value
  • Transaction volume by region
  • Conversion rates from different marketing channels

Define the topics for the use case

In earlier tutorials, you created a Docker Compose file with all the necessary containers. For this use case guide, some other topics are needed. To have these created, open the docker-compose.yml in the examples directory, and find the definitions for the kafka-setup container which creates the topics.
Change the definition so that the startup command for the setup container (the command section) looks like the following:

command section for the kafka-setup container (click to expand)
command: "bash -c 'echo Creating topics... && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ecommerce_transactions && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sales_by_category && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic avg_order_value && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic transactions_by_region'"

Defining the Data Model

Our transaction data will have the following structure:

{
  "transaction_id": "12345",
  "timestamp": 1625097600000,
  "customer_id": "cust-789",
  "product_id": "prod-456",
  "product_category": "electronics",
  "quantity": 1,
  "price": 499.99,
  "region": "north_america",
  "marketing_channel": "social_media"
}

Creating the KSML Definition

Now, let's create our KSML definition file:

Real-time analytics processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

streams:
  transactions:
    topic: ecommerce_transactions
    keyType: string  # transaction_id
    valueType: json  # transaction data

  sales_by_category:
    topic: sales_by_category
    keyType: string  # product_category
    valueType: json  # aggregated sales data

  avg_order_value:
    topic: avg_order_value
    keyType: windowed(string)  # time window
    valueType: json  # average order value

  transactions_by_region:
    topic: transactions_by_region
    keyType: string  # region
    valueType: string  # transaction count

functions:
  calculate_total:
    type: aggregator
    resultType: json
    code: |
      if aggregatedValue is None:
        return {"total_sales": value.get("price") * value.get("quantity"), "count": 1}
      else:
        return {
          "total_sales": aggregatedValue.get("total_sales") + (value.get("price") * value.get("quantity")),
          "count": aggregatedValue.get("count") + 1
        }

  calculate_window_total:
    type: aggregator
    resultType: json
    code: |
      if aggregate is None:
        return {"total_sales": value.get("price") * value.get("quantity"), "count": 1}
      else:
        return {
          "total_sales": aggregate.get("total_sales") + (value.get("price") * value.get("quantity")),
          "count": aggregate.get("count") + 1
        }

pipelines:
  # Pipeline for sales by category
  sales_by_category_pipeline:
    from: transactions
    via:
      - type: selectKey
        mapper:
          expression: value.get("product_category")
      - type: groupByKey
      - type: aggregate
        initializer:
          expression: '{"total_sales": 0, "count": 0}'
          resultType: json
        aggregator: calculate_total
        store:
          name: category_store
          type: keyValue
          keyType: string
          valueType: json
      - type: toStream
    to: sales_by_category

  # Pipeline for average order value (windowed)
  avg_order_value_pipeline:
    from: transactions
    via:
      - type: groupBy
        mapper:
          expression: '"all"'  # Use a constant key to aggregate all transactions
          resultType: string
      - type: windowByTime
        windowType: tumbling
        duration: 10s
      - type: aggregate
        store:
          name: avg_order_store_10s
          type: window
          keyType: string
          valueType: json
          windowSize: 10s
          retention: 30s
        initializer:
          expression: '{"total_sales": 0.0, "count": 0}'
          resultType: json
        aggregator:
          expression: '{"total_sales": aggregatedValue.get("total_sales") + (value.get("price") * value.get("quantity")), "count": aggregatedValue.get("count") + 1}'
          resultType: json
      - type: suppress
        until: windowCloses
      - type: toStream
      - type: convertKey
        into: json:windowed(string)
      - type: mapValues
        mapper:
          expression: '{"avg_order_value": value.get("total_sales") / value.get("count") if value and value.get("count") and value.get("count") > 0 else 0}'
          resultType: json
    to: avg_order_value

  #   Pipeline for transactions by region
  transactions_by_region_pipeline:
    from: transactions
    via:
      - type: selectKey
        mapper:
          expression: value.get("region")
      - type: groupByKey
      - type: count
      - type: toStream
      - type: convertValue
        into: string
    to: transactions_by_region

Running the Application

You can use the following producer pipeline example as a starting point to write some simulated sale data; it will write four hard coded sales records into the input stream and exit.

Transaction data producer (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

streams:
  transactions:
    topic: ecommerce_transactions
    keyType: string  # transaction_id
    valueType: json  # transaction data

# set up global data and a function that will loop over it once
functions:
  generate_transactions:
    type: generic
    resultType: boolean
    expression: False
    globalCode: |
      global count
      count = 0
      global done
      done = False
      global orders
      orders = [
        {
          "transaction_id": "12345",
          "timestamp": 1625097600000,
          "customer_id": "cust-789",
          "product_id": "prod-456",
          "product_category": "electronics",
          "quantity": 1,
          "price": 499.99,
          "region": "north_america",
          "marketing_channel": "social_media"
        },
        {
          "transaction_id": "12346",
          "timestamp": 1625097606000,
          "customer_id": "cust-788",
          "product_id": "prod-457",
          "product_category": "gardening",
          "quantity": 1,
          "price": 229.99,
          "region": "europe",
          "marketing_channel": "social_media"
        },
        {
          "transaction_id": "12349",
          "timestamp": 1625097606000,
          "customer_id": "cust-788",
          "product_id": "prod-457",
          "product_category": "electronics",
          "quantity": 1,
          "price": 229.99,
          "region": "europe",
          "marketing_channel": "social_media"
        },
        {
          "transaction_id": "12347",
          "timestamp": 1625097612000,
          "customer_id": "cust-789",
          "product_id": "prod-458",
          "product_category": "gardening",
          "quantity": 1,
          "price": 12.99,
          "region": "north_america",
          "marketing_channel": "social_media"
        }
      ]
      nr_orders = len(orders)

      def nextOrder():
         global count
         global done
         result = orders[count]
         count = count + 1
         if count >= len(orders):
           count = 0
           done = True    # loop once over the data
         return result

producers:
  produce_order:
    to: transactions
    interval: 7000  # Emit every 7 seconds to ensure windows close
    until:
      expression:
        done
    generator:
      code: |
        order = nextOrder()
      expression: '(str(count), order)'
      resultType: (string, json)

To run the application:

  1. Save the KSML definition to definitions/analytics.yaml
  2. Add the example-data-producer.yaml above if desired, or create test data in some other way
  3. Create a configuration file at config/application.properties with your Kafka connection details
  4. Start the Docker Compose environment: docker-compose up -d
  5. Monitor the output topics to see the real-time analytics results

Visualizing the Results

Setting up visualizations is outside the scope of this tutorial. You can connect the output topics to visualization tools like:

  • Grafana
  • Kibana
  • Custom dashboards using web frameworks

For example, to create a simple dashboard with Grafana:

  1. Set up Grafana to connect to your Kafka topics
  2. Create dashboards for each metric:
  3. Bar chart for sales by category
  4. Line chart for average order value over time
  5. Map visualization for transactions by region
  6. Gauge charts for conversion rates

Extending the Application

You can extend this application in several ways:

  • Add anomaly detection to identify unusual patterns
  • Implement trend analysis to detect changing consumer behavior
  • Create alerts for specific conditions (e.g., sales dropping below a threshold)
  • Enrich the data with additional information from reference data sources

Conclusion

In this tutorial, you've learned how to:

  • Process streaming transaction data in real-time
  • Calculate various business metrics using KSML
  • Structure a real-time analytics application
  • Prepare the results for visualization

Real-time analytics is a powerful use case for KSML, allowing you to gain immediate insights from your streaming data without complex coding.

Next Steps