Event-Driven Applications with KSML
This tutorial demonstrates how to build event-driven applications using KSML. You'll learn how to detect specific events in your data streams and trigger appropriate actions in response.
Introduction
Event-driven architecture is a powerful paradigm for building responsive, real-time applications. In this approach:
- Systems react to events as they occur
- Components communicate through events rather than direct calls
- Business logic is triggered by changes in state
- Applications can scale and evolve independently
KSML is particularly well-suited for event-driven applications because it allows you to:
- Process streams of events in real-time
- Detect complex patterns and conditions
- Transform events into actionable insights
- Trigger downstream processes automatically
Prerequisites
Before starting this tutorial, you should:
- Understand basic KSML concepts (streams, functions, pipelines)
- Have completed the KSML Basics Tutorial
- Be familiar with Filtering and Transforming
- Have a basic understanding of Complex Event Processing
The Use Case
In this tutorial, we'll build an event-driven inventory management system for an e-commerce platform. The system will:
- Monitor product inventory levels in real-time
- Detect when items are running low
- Generate reorder events for the procurement system
- Alert warehouse staff about critical inventory situations
- Update inventory dashboards in real-time
Defining the Data Models
Inventory Update Events
{
"product_id": "prod-123",
"product_name": "Wireless Headphones",
"category": "electronics",
"current_stock": 15,
"warehouse_id": "wh-east-1",
"timestamp": 1625097600000,
"unit_price": 79.99
}
Order Events
{
"order_id": "order-456",
"customer_id": "cust-789",
"items": [
{
"product_id": "prod-123",
"quantity": 2,
"unit_price": 79.99
}
],
"order_total": 159.98,
"timestamp": 1625097600000
}
Reorder Events (Output)
{
"event_id": "reorder-789",
"product_id": "prod-123",
"product_name": "Wireless Headphones",
"current_stock": 5,
"reorder_quantity": 50,
"priority": "normal",
"warehouse_id": "wh-east-1",
"timestamp": 1625097600000
}
Alert Events (Output)
{
"alert_id": "alert-123",
"alert_type": "critical_inventory",
"product_id": "prod-123",
"product_name": "Wireless Headphones",
"current_stock": 2,
"threshold": 5,
"warehouse_id": "wh-east-1",
"timestamp": 1625097600000,
"message": "Critical inventory level: Wireless Headphones (2 units remaining)"
}
Creating the KSML Definition
Now, let's create our KSML definition file:
Inventory event processor (click to expand)
streams:
order_events:
topic: order_events
keyType: string # order_id
valueType: json # order data
inventory_updates:
topic: inventory_updates
keyType: string # product_id
valueType: json # alert data
reorder_events:
topic: reorder_events
keyType: string # event_id
valueType: json # reorder data
inventory_alerts:
topic: inventory_alerts
keyType: string # alert_id
valueType: json # alert data
tables:
product_catalog:
topic: product_catalog
keyType: string # product_id
valueType: json # product details including thresholds
stores:
inventory_store:
type: keyValue
keyType: string
valueType: json
pipelines:
# Pipeline for updating inventory based on orders
order_pipeline:
from: order_events
via:
- type: transformKeyValueToKeyValueList
mapper:
code: |
# For each item in the order, emit a key-value pair with product_id as key and ordered item as value
result = []
for item in value.get("items", []):
product_id = item.get("product_id")
if product_id:
result.append((product_id, {**item, "order_id":key}))
return result
resultType: list(tuple(string, struct))
- type: peek
forEach:
code: |
log.info("Ordered item: {}", value)
- type: transformValue
mapper:
code: |
inventory = inventory_store.get(key)
if not inventory:
inventory = {
"product_id": key,
"current_stock": 0
}
return {
"order_item": value,
"inventory": inventory
}
resultType: struct
stores:
- inventory_store
- type: peek
forEach:
code: |
log.info("Looked up inventory: {}", value)
- type: mapValues
mapper:
code: |
# Get the product_id from the key
product_id = key
# Get the ordered item from the value
order_item = value.get("order_item")
if order_item is None:
return None
# Get the inventory from the value, or set to zero if no inventory found
inventory = value.get("inventory")
# Get the ordered amount
ordered_quantity = order_item.get("quantity")
if ordered_quantity == 0:
return None
# Update inventory with new stock level
current_stock = inventory.get("current_stock", 0)
new_stock = max(0, current_stock - ordered_quantity)
# Update the inventory in the store
value = {
**inventory,
"current_stock": new_stock,
"last_order_id": order_item.get("order_id"),
"last_updated": int(time.time() * 1000)
}
inventory_store.put(key, value)
return value
resultType: struct
stores:
- inventory_store
# Only propagate valid inventory updates to the topic
- type: filter
if:
expression: value is not None
- type: peek
forEach:
code: |
log.info("Updating inventory: {}", value)
to: inventory_updates
# Pipeline for detecting low inventory and generating reorder events
reorder_pipeline:
from: inventory_updates
via:
- type: join
table: product_catalog
valueJoiner:
expression: |
{
"product": value2,
"inventory": value1
}
resultType: struct
- type: filter
if:
code: |
product = value.get("product")
inventory = value.get("inventory")
return inventory.get("current_stock", 0) <= product.get("reorder_threshold", 0)
- type: transformKeyValue
mapper:
globalCode: |
import uuid
import time
code: |
# Get reorder threshold and quantity from product reference data
product = value.get("product")
reorder_threshold = product.get("reorder_threshold", 10)
reorder_quantity = product.get("reorder_quantity", 50)
# Determine priority based on current stock
inventory = value.get("inventory")
current_stock = inventory.get("current_stock", 0)
priority = "urgent" if current_stock <= 5 else "normal"
# Generate event ID
event_id = f"reorder-{uuid.uuid4().hex[:8]}"
# Create reorder event
value = {
"event_id": event_id,
"product_id": product.get("id"),
"product_name": product.get("name"),
"current_stock": current_stock,
"reorder_quantity": reorder_quantity,
"priority": priority,
"timestamp": int(time.time() * 1000)
}
expression: (value.get("event_id"), value)
resultType: (string, struct)
to: reorder_events
# Pipeline for detecting critical inventory levels and generating alerts
alert_pipeline:
from: inventory_updates
via:
- type: join
table: product_catalog
valueJoiner:
expression: |
{
"product": value2,
"inventory": value1
}
resultType: struct
- type: filter
if:
code: |
product = value.get("product")
inventory = value.get("inventory")
return inventory.get("current_stock", 0) <= product.get("critical_threshold", 0)
- type: mapValues
mapper:
code: |
import uuid
import time
# Get critical threshold from product reference data
product = value.get("product")
critical_threshold = product.get("critical_threshold", 5)
product_name = product.get("name", "Unknown")
# Get current stock
inventory = value.get("inventory")
current_stock = inventory.get("current_stock", 0)
# Generate alert ID
alert_id = f"alert-{uuid.uuid4().hex[:8]}"
# Create alert message
message = f"Critical inventory level: {product_name} ({current_stock} units remaining)"
# Create alert event
return {
"alert_id": alert_id,
"alert_type": "critical_inventory",
"product_id": product.get("id"),
"product_name": product_name,
"current_stock": current_stock,
"threshold": critical_threshold,
"timestamp": int(time.time() * 1000),
"message": message
}
resultType: struct
to: inventory_alerts
restock_pipeline:
from: reorder_events
via:
- type: transformKey
mapper:
expression: value.get("product_id")
resultType: string
- type: transformValue
mapper:
code: |
inventory = inventory_store.get(key)
if not inventory:
inventory = {
"product_id": key,
"current_stock": 0
}
return {
"reorder": value,
"inventory": inventory
}
resultType: struct
stores:
- inventory_store
- type: transformValue
mapper:
code: |
reorder = value.get("reorder")
inventory = value.get("inventory")
value = {
**inventory,
"current_stock": inventory.get("current_stock", 0) + reorder.get("reorder_quantity", 0),
"last_reorder_id": reorder.get("event_id"),
"last_updated": int(time.time() * 1000)
}
inventory_store.put(key, value)
return value
stores:
- inventory_store
- type: peek
forEach:
code: |
log.info("Restocked product: {}", value)
to: inventory_updates
inventory_monitor:
from: inventory_updates
forEach:
code: |
log.info("Inventory updated: {}", value)
Setting up the producers for test data
To test out the topology above, we create a test data producer definition.
The definition consists of two producers. The first producer is a single shot producer that generates three records
for the product_catalog
topic. The second producer produces a message every second to the order_events
topic, using
a randomly generated product order:
Product and order event producer (click to expand)
name: "Product and Order Event Producer"
version: "1.0"
description: |
This producer is part of the Event-Driven Applications Use Case in KSML documentation. It produces a fixed series of
records to the product_catalog topic. Next to that, every second a random product order is generated and sent to the
order_events topic.
functions:
global_function:
globalCode: |
# Global product catalog
products = [
{"id": "prod-123", "name": "Wireless Headphones", "category": "electronics", "price": 79.99, "reorder_threshold": 10, "reorder_quantity": 50, "critical_threshold": 5},
{"id": "prod-456", "name": "Laptop Charger", "category": "electronics", "price": 55.99, "reorder_threshold": 5, "reorder_quantity": 25, "critical_threshold": 3},
{"id": "prod-789", "name": "Phone Cover", "category": "accessories", "price": 12.99, "reorder_threshold": 20, "reorder_quantity": 100, "critical_threshold": 10}
]
product_catalog_generator:
globalCode: |
import time
count = 0
code: |
global count
count = (count + 1) % len(products)
value = products[count]
key = value.get("id")
return (key, value)
resultType: (string, struct)
order_event_generator:
globalCode: |
import random
code: |
items = []
total_price = 0
for item in range(random.randrange(1,4)):
product = random.choice(products)
quantity = random.randrange(1,10)
price = product.get("price") * quantity
total_price += price
items += [{
"product_id": product.get("id"),
"quantity": quantity,
"unit_price": product.get("price")
}]
# Return order event
value = {
"order_id": "order-"+str(random.randrange(999999)),
"customer_id": "cust-"+str(random.randrange(999999)),
"items": items,
"order_total": total_price,
"timestamp": int(time.time() * 1000)
}
expression: (value.get("order_id"), value)
resultType: (string, struct)
producers:
product_catalog_producer:
generator: product_catalog_generator
interval: 1
count: 3
to:
topic: product_catalog
keyType: string # Product id
valueType: json # Product information including reorder info
order_event_producer:
generator: order_event_generator
interval: 1s
to:
topic: order_events
keyType: string # stock key (warehouse id + product id)
valueType: json # JSON inventory data
Running the Application
To run the application:
- Save the processor definition to
inventory-event-processors.yaml
. - Save the producers to
product-and-order-event-producer
andcustomer-data-producer.yaml
. - Set up your
ksml-runner.yaml
configuration, pointing to your Kafka installation.
KSML runner configuration (click to expand)
ksml:
enableProducers: true # Set to true to allow producer definitions to be parsed in the KSML definitions and be executed.
enablePipelines: true # Set to true to allow pipeline definitions to be parsed in the KSML definitions and be executed.
definitions:
segment_data_producer: segment-data-producer.yaml
customer_data_producer: customer-data-producer.yaml
data_transformation: data-transformation.yaml
kafka:
bootstrap.servers: broker:9093
application.id: your.app.id
- Start the
customer_segment_producer
to produce the sample segment information to Kafka. - Start the
legacy_customer_data_producer
to produce some sample data to the input topic. - Start the
data_transformation
topology to initiate the continuous data transformation logic. - Monitor the output topic to see the transformed data.
Extending the Event-Driven System
Integration with External Systems
To make this event-driven system truly useful, you can integrate it with external systems:
- Procurement System: Connect the reorder events to your procurement system to automatically create purchase orders
- Notification Service: Send the alerts to a notification service that can email or text warehouse staff
- Analytics Platform: Stream all events to an analytics platform for business intelligence
- Dashboard: Connect to a real-time dashboard for inventory visualization
Adding More Event Types
You can extend the system with additional event types:
- Price Change Events: Automatically adjust prices based on inventory levels or competitor data
- Promotion Events: Trigger promotions for overstocked items
- Fraud Detection Events: Flag suspicious order patterns
- Shipping Delay Events: Notify customers about potential delays due to inventory issues
Best Practices for Event-Driven Applications
When building event-driven applications with KSML, consider these best practices:
- Event Schema Design: Design your events to be self-contained and include all necessary context
- Idempotent Processing: Ensure your event handlers can process the same event multiple times without side effects
- Event Versioning: Include version information in your events to handle schema evolution
- Monitoring and Observability: Add logging and metrics to track event flow and processing
- Error Handling: Implement proper error handling and dead-letter queues for failed events
Conclusion
In this tutorial, you've learned how to:
- Build an event-driven application using KSML
- Detect specific conditions in your data streams
- Generate events in response to those conditions
- Process events to update state and trigger further actions
- Design an end-to-end event-driven architecture
Event-driven applications are a powerful use case for KSML, allowing you to build responsive, real-time systems that react automatically to changing conditions.
Next Steps
- Learn about Real-Time Analytics to analyze your event data
- Explore Data Transformation for more complex event processing
- Check out External Integration for connecting your events to external systems