Skip to content

Event-Driven Applications with KSML

This tutorial demonstrates how to build event-driven applications using KSML. You'll learn how to detect specific events in your data streams and trigger appropriate actions in response.

Introduction

Event-driven architecture is a powerful paradigm for building responsive, real-time applications. In this approach:

  • Systems react to events as they occur
  • Components communicate through events rather than direct calls
  • Business logic is triggered by changes in state
  • Applications can scale and evolve independently

KSML is particularly well-suited for event-driven applications because it allows you to:

  • Process streams of events in real-time
  • Detect complex patterns and conditions
  • Transform events into actionable insights
  • Trigger downstream processes automatically

Prerequisites

Before starting this tutorial, you should:

The Use Case

In this tutorial, we'll build an event-driven inventory management system for an e-commerce platform. The system will:

  1. Monitor product inventory levels in real-time
  2. Detect when items are running low
  3. Generate reorder events for the procurement system
  4. Alert warehouse staff about critical inventory situations
  5. Update inventory dashboards in real-time

Defining the Data Models

Inventory Update Events

{
  "product_id": "prod-123",
  "product_name": "Wireless Headphones",
  "category": "electronics",
  "current_stock": 15,
  "warehouse_id": "wh-east-1",
  "timestamp": 1625097600000,
  "unit_price": 79.99
}

Order Events

{
  "order_id": "order-456",
  "customer_id": "cust-789",
  "items": [
    {
      "product_id": "prod-123",
      "quantity": 2,
      "unit_price": 79.99
    }
  ],
  "order_total": 159.98,
  "timestamp": 1625097600000
}

Reorder Events (Output)

{
  "event_id": "reorder-789",
  "product_id": "prod-123",
  "product_name": "Wireless Headphones",
  "current_stock": 5,
  "reorder_quantity": 50,
  "priority": "normal",
  "warehouse_id": "wh-east-1",
  "timestamp": 1625097600000
}

Alert Events (Output)

{
  "alert_id": "alert-123",
  "alert_type": "critical_inventory",
  "product_id": "prod-123",
  "product_name": "Wireless Headphones",
  "current_stock": 2,
  "threshold": 5,
  "warehouse_id": "wh-east-1",
  "timestamp": 1625097600000,
  "message": "Critical inventory level: Wireless Headphones (2 units remaining)"
}

Creating the KSML Definition

Now, let's create our KSML definition file:

Inventory event processor (click to expand)
streams:
  order_events:
    topic: order_events
    keyType: string  # order_id
    valueType: json  # order data

  inventory_updates:
    topic: inventory_updates
    keyType: string  # product_id
    valueType: json  # alert data

  reorder_events:
    topic: reorder_events
    keyType: string  # event_id
    valueType: json  # reorder data

  inventory_alerts:
    topic: inventory_alerts
    keyType: string  # alert_id
    valueType: json  # alert data

tables:
  product_catalog:
    topic: product_catalog
    keyType: string  # product_id
    valueType: json  # product details including thresholds

stores:
  inventory_store:
    type: keyValue
    keyType: string
    valueType: json

pipelines:
  # Pipeline for updating inventory based on orders
  order_pipeline:
    from: order_events
    via:
      - type: transformKeyValueToKeyValueList
        mapper:
          code: |
            # For each item in the order, emit a key-value pair with product_id as key and ordered item as value
            result = []
            for item in value.get("items", []):
              product_id = item.get("product_id")
              if product_id:
                result.append((product_id, {**item, "order_id":key}))
            return result
          resultType: list(tuple(string, struct))
      - type: peek
        forEach:
          code: |
            log.info("Ordered item: {}", value)
      - type: transformValue
        mapper:
          code: |
            inventory = inventory_store.get(key)
            if not inventory:
              inventory = {
                "product_id": key,
                "current_stock": 0
              }
            return {            
              "order_item": value,
              "inventory": inventory
            }
          resultType: struct
          stores:
            - inventory_store
      - type: peek
        forEach:
          code: |
            log.info("Looked up inventory: {}", value)
      - type: mapValues
        mapper:
          code: |
            # Get the product_id from the key
            product_id = key

            # Get the ordered item from the value
            order_item = value.get("order_item")
            if order_item is None:
              return None

            # Get the inventory from the value, or set to zero if no inventory found
            inventory = value.get("inventory")

            # Get the ordered amount
            ordered_quantity = order_item.get("quantity")
            if ordered_quantity == 0:
              return None

            # Update inventory with new stock level
            current_stock = inventory.get("current_stock", 0)
            new_stock = max(0, current_stock - ordered_quantity)

            # Update the inventory in the store
            value = {
              **inventory,
              "current_stock": new_stock,
              "last_order_id": order_item.get("order_id"),
              "last_updated": int(time.time() * 1000)
            }

            inventory_store.put(key, value)
            return value
          resultType: struct
          stores:
            - inventory_store
      # Only propagate valid inventory updates to the topic
      - type: filter
        if:
          expression: value is not None
      - type: peek
        forEach:
          code: |
            log.info("Updating inventory: {}", value)
    to: inventory_updates

  # Pipeline for detecting low inventory and generating reorder events
  reorder_pipeline:
    from: inventory_updates
    via:
      - type: join
        table: product_catalog
        valueJoiner:
          expression: |
            {
              "product": value2,
              "inventory": value1
            }
          resultType: struct
      - type: filter
        if:
          code: |
            product = value.get("product")
            inventory = value.get("inventory")
            return inventory.get("current_stock", 0) <= product.get("reorder_threshold", 0)
      - type: transformKeyValue
        mapper:
          globalCode: |
            import uuid
            import time
          code: |
            # Get reorder threshold and quantity from product reference data
            product = value.get("product")
            reorder_threshold = product.get("reorder_threshold", 10)
            reorder_quantity = product.get("reorder_quantity", 50)

            # Determine priority based on current stock
            inventory = value.get("inventory")
            current_stock = inventory.get("current_stock", 0)
            priority = "urgent" if current_stock <= 5 else "normal"

            # Generate event ID
            event_id = f"reorder-{uuid.uuid4().hex[:8]}"

            # Create reorder event
            value = {
              "event_id": event_id,
              "product_id": product.get("id"),
              "product_name": product.get("name"),
              "current_stock": current_stock,
              "reorder_quantity": reorder_quantity,
              "priority": priority,
              "timestamp": int(time.time() * 1000)
            }
          expression: (value.get("event_id"), value)
          resultType: (string, struct)
    to: reorder_events

  # Pipeline for detecting critical inventory levels and generating alerts
  alert_pipeline:
    from: inventory_updates
    via:
      - type: join
        table: product_catalog
        valueJoiner:
          expression: |
            {
              "product": value2,
              "inventory": value1
            }
          resultType: struct
      - type: filter
        if:
          code: |
            product = value.get("product")
            inventory = value.get("inventory")
            return inventory.get("current_stock", 0) <= product.get("critical_threshold", 0)
      - type: mapValues
        mapper:
          code: |
            import uuid
            import time

            # Get critical threshold from product reference data
            product = value.get("product")
            critical_threshold = product.get("critical_threshold", 5)
            product_name = product.get("name", "Unknown")

            # Get current stock
            inventory = value.get("inventory")
            current_stock = inventory.get("current_stock", 0)

            # Generate alert ID
            alert_id = f"alert-{uuid.uuid4().hex[:8]}"

            # Create alert message
            message = f"Critical inventory level: {product_name} ({current_stock} units remaining)"

            # Create alert event
            return {
              "alert_id": alert_id,
              "alert_type": "critical_inventory",
              "product_id": product.get("id"),
              "product_name": product_name,
              "current_stock": current_stock,
              "threshold": critical_threshold,
              "timestamp": int(time.time() * 1000),
              "message": message
            }
          resultType: struct
    to: inventory_alerts

  restock_pipeline:
    from: reorder_events
    via:
      - type: transformKey
        mapper:
          expression: value.get("product_id")
          resultType: string
      - type: transformValue
        mapper:
          code: |
            inventory = inventory_store.get(key)
            if not inventory:
              inventory = {
                "product_id": key,
                "current_stock": 0
              }
            return {
              "reorder": value,
              "inventory": inventory
            }
          resultType: struct
          stores:
            - inventory_store
      - type: transformValue
        mapper:
          code: |
            reorder = value.get("reorder")
            inventory = value.get("inventory")
            value = {
              **inventory,
              "current_stock": inventory.get("current_stock", 0) + reorder.get("reorder_quantity", 0),
              "last_reorder_id": reorder.get("event_id"),
              "last_updated": int(time.time() * 1000)
            }

            inventory_store.put(key, value)

            return value
          stores:
            - inventory_store
      - type: peek
        forEach:
          code: |
            log.info("Restocked product: {}", value)
    to: inventory_updates

  inventory_monitor:
    from: inventory_updates
    forEach:
      code: |
        log.info("Inventory updated: {}", value)

Setting up the producers for test data

To test out the topology above, we create a test data producer definition.

The definition consists of two producers. The first producer is a single shot producer that generates three records for the product_catalog topic. The second producer produces a message every second to the order_events topic, using a randomly generated product order:

Product and order event producer (click to expand)
name: "Product and Order Event Producer"
version: "1.0"
description: |
  This producer is part of the Event-Driven Applications Use Case in KSML documentation. It produces a fixed series of
  records to the product_catalog topic. Next to that, every second a random product order is generated and sent to the
  order_events topic.

functions:
  global_function:
    globalCode: |
      # Global product catalog
      products = [
        {"id": "prod-123", "name": "Wireless Headphones", "category": "electronics", "price": 79.99, "reorder_threshold": 10, "reorder_quantity": 50, "critical_threshold": 5},
        {"id": "prod-456", "name": "Laptop Charger", "category": "electronics", "price": 55.99, "reorder_threshold": 5, "reorder_quantity": 25, "critical_threshold": 3},
        {"id": "prod-789", "name": "Phone Cover", "category": "accessories", "price": 12.99, "reorder_threshold": 20, "reorder_quantity": 100, "critical_threshold": 10}
      ]

  product_catalog_generator:
    globalCode: |
      import time
      count = 0
    code: |
      global count
      count = (count + 1) % len(products)
      value = products[count]
      key = value.get("id")
      return (key, value)
    resultType: (string, struct)

  order_event_generator:
    globalCode: |
      import random
    code: |
      items = []
      total_price = 0
      for item in range(random.randrange(1,4)):
        product = random.choice(products)
        quantity = random.randrange(1,10)
        price = product.get("price") * quantity
        total_price += price
        items += [{
          "product_id": product.get("id"),
          "quantity": quantity,
          "unit_price": product.get("price")
        }]

      # Return order event
      value = {
        "order_id": "order-"+str(random.randrange(999999)),
        "customer_id": "cust-"+str(random.randrange(999999)),
        "items": items,
        "order_total": total_price,
        "timestamp": int(time.time() * 1000)
      }
    expression: (value.get("order_id"), value)
    resultType: (string, struct)

producers:
  product_catalog_producer:
    generator: product_catalog_generator
    interval: 1
    count: 3
    to:
      topic: product_catalog
      keyType: string  # Product id
      valueType: json  # Product information including reorder info

  order_event_producer:
    generator: order_event_generator
    interval: 1s
    to:
      topic: order_events
      keyType: string  # stock key (warehouse id + product id)
      valueType: json  # JSON inventory data

Running the Application

To run the application:

  1. Save the processor definition to inventory-event-processors.yaml.
  2. Save the producers to product-and-order-event-producer and customer-data-producer.yaml.
  3. Set up your ksml-runner.yaml configuration, pointing to your Kafka installation.
KSML runner configuration (click to expand)
ksml:
  enableProducers: true              # Set to true to allow producer definitions to be parsed in the KSML definitions and be executed.
  enablePipelines: true              # Set to true to allow pipeline definitions to be parsed in the KSML definitions and be executed.
  definitions:
    segment_data_producer: segment-data-producer.yaml
    customer_data_producer: customer-data-producer.yaml
    data_transformation: data-transformation.yaml
kafka:
  bootstrap.servers: broker:9093
  application.id: your.app.id
  1. Start the customer_segment_producer to produce the sample segment information to Kafka.
  2. Start the legacy_customer_data_producer to produce some sample data to the input topic.
  3. Start the data_transformation topology to initiate the continuous data transformation logic.
  4. Monitor the output topic to see the transformed data.

Extending the Event-Driven System

Integration with External Systems

To make this event-driven system truly useful, you can integrate it with external systems:

  1. Procurement System: Connect the reorder events to your procurement system to automatically create purchase orders
  2. Notification Service: Send the alerts to a notification service that can email or text warehouse staff
  3. Analytics Platform: Stream all events to an analytics platform for business intelligence
  4. Dashboard: Connect to a real-time dashboard for inventory visualization

Adding More Event Types

You can extend the system with additional event types:

  • Price Change Events: Automatically adjust prices based on inventory levels or competitor data
  • Promotion Events: Trigger promotions for overstocked items
  • Fraud Detection Events: Flag suspicious order patterns
  • Shipping Delay Events: Notify customers about potential delays due to inventory issues

Best Practices for Event-Driven Applications

When building event-driven applications with KSML, consider these best practices:

  1. Event Schema Design: Design your events to be self-contained and include all necessary context
  2. Idempotent Processing: Ensure your event handlers can process the same event multiple times without side effects
  3. Event Versioning: Include version information in your events to handle schema evolution
  4. Monitoring and Observability: Add logging and metrics to track event flow and processing
  5. Error Handling: Implement proper error handling and dead-letter queues for failed events

Conclusion

In this tutorial, you've learned how to:

  • Build an event-driven application using KSML
  • Detect specific conditions in your data streams
  • Generate events in response to those conditions
  • Process events to update state and trigger further actions
  • Design an end-to-end event-driven architecture

Event-driven applications are a powerful use case for KSML, allowing you to build responsive, real-time systems that react automatically to changing conditions.

Next Steps