Custom State Stores in KSML
This tutorial explores how to implement and optimize custom state stores in KSML, allowing you to maintain and manage state in your stream processing applications with greater flexibility and control.
Introduction to State Stores
State stores are a critical component of stateful stream processing applications. They allow your application to:
- Maintain data across multiple messages and events
- Track historical information for context-aware processing
- Implement stateful operations like aggregations and joins
- Build sophisticated business logic that depends on previous events
- Persist state for fault tolerance and recovery
KSML provides built-in state store capabilities that integrate seamlessly with Kafka Streams, offering exactly-once processing guarantees and automatic state management.
Prerequisites
Before starting this tutorial:
- Complete the State Stores Tutorial - This tutorial builds on fundamental state store concepts, configuration methods, and basic patterns covered in the intermediate tutorial
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_activity && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_session_stats && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic server_metrics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic windowed_metrics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_clicks && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic session_analytics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_alerts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic order_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic order_processing_results && \
Advanced Key-Value Store Patterns
Building on the basic key-value concepts from the State Stores Tutorial, this example demonstrates advanced user session tracking with complex business logic.
What it does:
- Produces user activities: Creates events like login, page_view, click with user IDs, session IDs (changes every 10 events), browser/device info
- Stores user profiles: Keeps running JSON data per user including total sessions, action counts, time spent, devices/browsers used
- Detects session changes: When session_id changes from previous event, increments session counter and logs the transition
- Tracks comprehensive stats: Updates action counters, adds new pages/devices/countries to lists, calculates total time across all sessions
- Outputs session updates: Returns enriched user profile showing current session, lifetime statistics, and behavioral patterns whenever activity occurs
User Activity Producer - click to expand
# Producer for user session tracking - generates user activity events
functions:
generate_user_activity:
type: generator
globalCode: |
import random
import time
event_counter = 0
users = ["alice", "bob", "charlie", "diana", "eve"]
actions = ["login", "page_view", "click", "search", "logout"]
code: |
global event_counter, users, actions
event_counter += 1
user_id = random.choice(users)
action = random.choice(actions)
# Generate structured JSON activity for better readability in Kowl UI
activity = {
"user_id": user_id,
"action": action,
"timestamp": int(time.time() * 1000),
"session_id": f"session_{user_id}_{event_counter // 10}", # Change session every 10 events
"page": f"/page/{random.randint(1, 5)}",
"duration_ms": random.randint(100, 5000),
"event_id": f"evt_{event_counter:06d}",
"browser": random.choice(["chrome", "firefox", "safari", "edge"]),
"device_type": random.choice(["desktop", "tablet", "mobile"]),
"location": {
"country": random.choice(["US", "CA", "UK", "DE", "FR"]),
"city": random.choice(["New York", "London", "Paris", "Berlin", "Toronto"])
},
"metadata": {
"simulation": True,
"session_tracking": True,
"event_sequence": event_counter
}
}
expression: (user_id, activity)
resultType: (string, json)
producers:
user_activity_producer:
generator: generate_user_activity
interval: 2s
to:
topic: user_activity
keyType: string
valueType: json
Basic Key-Value Store Processor - click to expand
# Processor demonstrating basic key-value store for session tracking
streams:
user_activity:
topic: user_activity
keyType: string
valueType: json
stores:
user_session_store:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
functions:
track_user_sessions:
type: valueTransformer
stores:
- user_session_store
code: |
import json
import time
# Extract fields from JSON activity
if not value:
return None
action = value.get("action")
session_id = value.get("session_id")
duration = value.get("duration_ms")
timestamp = value.get("timestamp")
event_id = value.get("event_id")
page = value.get("page")
browser = value.get("browser")
device_type = value.get("device_type")
location = value.get("location", {})
if not action or not session_id or duration is None:
return None
# Get existing session data
session_data_str = user_session_store.get(key)
if session_data_str:
session_data = json.loads(session_data_str)
else:
session_data = {
"user_id": key,
"current_session": None,
"total_sessions": 0,
"total_time_ms": 0,
"actions_count": {},
"first_seen": timestamp,
"last_activity": timestamp,
"devices_used": set(),
"browsers_used": set(),
"countries_visited": set(),
"pages_visited": set()
}
# Convert sets to lists for JSON serialization
session_data["devices_used"] = []
session_data["browsers_used"] = []
session_data["countries_visited"] = []
session_data["pages_visited"] = []
# Track session changes
session_ended = False
if session_data["current_session"] != session_id:
if session_data["current_session"] is not None:
# Session changed
session_data["total_sessions"] += 1
session_ended = True
log.info("Session ended for user {}: {}", key, session_data["current_session"])
session_data["current_session"] = session_id
log.info("New session started for user {}: {}", key, session_id)
# Update activity tracking
session_data["total_time_ms"] += duration
session_data["last_activity"] = timestamp
if action in session_data["actions_count"]:
session_data["actions_count"][action] += 1
else:
session_data["actions_count"][action] = 1
# Track device/browser/location usage
if device_type and device_type not in session_data["devices_used"]:
session_data["devices_used"].append(device_type)
if browser and browser not in session_data["browsers_used"]:
session_data["browsers_used"].append(browser)
if location.get("country") and location["country"] not in session_data["countries_visited"]:
session_data["countries_visited"].append(location["country"])
if page and page not in session_data["pages_visited"]:
session_data["pages_visited"].append(page)
# Store updated session data
user_session_store.put(key, json.dumps(session_data))
# Generate structured session summary
result = {
"stats_type": "USER_SESSION_STATS",
"user_id": key,
"current_session": session_id,
"session_ended": session_ended,
"activity": {
"event_id": event_id,
"action": action,
"page": page,
"duration_ms": duration,
"timestamp": timestamp
},
"session_totals": {
"total_sessions": session_data["total_sessions"],
"total_time_ms": session_data["total_time_ms"],
"unique_actions": len(session_data["actions_count"]),
"action_counts": session_data["actions_count"]
},
"user_profile": {
"first_seen": session_data["first_seen"],
"last_activity": session_data["last_activity"],
"devices_used": session_data["devices_used"],
"browsers_used": session_data["browsers_used"],
"countries_visited": session_data["countries_visited"],
"pages_visited": len(session_data["pages_visited"]),
"most_visited_pages": session_data["pages_visited"][-5:] if len(session_data["pages_visited"]) > 5 else session_data["pages_visited"]
},
"current_context": {
"device_type": device_type,
"browser": browser,
"location": location
}
}
return result
expression: result if result else None
resultType: json
pipelines:
session_tracking_pipeline:
from: user_activity
via:
- type: mapValues
mapper: track_user_sessions
- type: filter
if:
expression: value is not None
to:
topic: user_session_stats
keyType: string
valueType: json
Key concepts demonstrated:
- JSON serialization for complex state objects
- Session boundary detection
- Persistent state with caching enabled
Window Store
Window stores organize data by time windows, enabling time-based aggregations and analytics.
What it does:
- Produces server metrics: Creates metrics like cpu_usage, memory_usage, disk_io with varying values (base + sine wave + noise) for different servers
- Creates time windows: Divides timestamps into 5-minute buckets (300,000ms), creates unique window keys like "server1:cpu_usage:1640995200000"
- Accumulates window stats: For each metric in a time window, stores running count, sum, min, max, and recent sample list in state store
- Calculates aggregates: When outputting, computes average from sum/count, range from max-min, tracks categorical data like datacenter/environment
- Outputs window results: Returns complete window statistics only when window has enough samples, showing aggregated metrics with alerting thresholds and metadata
Metrics Data Producer - click to expand
# Producer for window store demo - generates time-series metrics
functions:
generate_metrics:
type: generator
globalCode: |
import random
import time
import math
metric_counter = 0
metrics = ["cpu_usage", "memory_usage", "disk_io", "network_io"]
servers = ["server_001", "server_002", "server_003"]
code: |
global metric_counter, metrics, servers
metric_counter += 1
server = random.choice(servers)
metric_name = random.choice(metrics)
# Generate realistic metric values
base_value = {
"cpu_usage": 30,
"memory_usage": 60,
"disk_io": 1000,
"network_io": 500
}[metric_name]
# Add some variation and trends
trend = math.sin(metric_counter / 20) * 10
noise = random.uniform(-5, 5)
value = max(0, base_value + trend + noise)
current_timestamp = int(time.time() * 1000)
# Create structured JSON metric for better readability in Kowl UI
metric = {
"server_id": server,
"metric_name": metric_name,
"value": round(value, 2),
"timestamp": current_timestamp,
"metric_id": f"metric_{metric_counter:06d}",
"unit": {
"cpu_usage": "percent",
"memory_usage": "percent",
"disk_io": "MB/s",
"network_io": "MB/s"
}[metric_name],
"datacenter": random.choice(["dc1", "dc2", "dc3"]),
"environment": random.choice(["prod", "staging", "test"]),
"service": random.choice(["web", "api", "database", "cache"]),
"alerting": {
"enabled": True,
"threshold_high": base_value * 1.5,
"threshold_critical": base_value * 2.0
},
"metadata": {
"simulation": True,
"window_aggregation": True,
"baseline_value": base_value,
"trend_component": round(trend, 2),
"noise_component": round(noise, 2)
}
}
expression: (server, metric)
resultType: (string, json)
producers:
metrics_producer:
generator: generate_metrics
interval: 1s
to:
topic: server_metrics
keyType: string
valueType: json
Window Store Processor - click to expand
# Processor demonstrating window store for time-based aggregations
streams:
server_metrics:
topic: server_metrics
keyType: string
valueType: json
stores:
metrics_window_store:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
functions:
aggregate_metrics:
type: valueTransformer
stores:
- metrics_window_store
code: |
import json
import time
# Extract fields from JSON metric
if not value:
return None
metric_name = value.get("metric_name")
metric_value = value.get("value")
timestamp = value.get("timestamp")
metric_id = value.get("metric_id")
unit = value.get("unit")
datacenter = value.get("datacenter")
environment = value.get("environment")
service = value.get("service")
alerting = value.get("alerting", {})
metadata = value.get("metadata", {})
if not metric_name or metric_value is None or not timestamp:
return None
# Create window key (5-minute windows)
window_size_ms = 5 * 60 * 1000 # 5 minutes
window_start = (timestamp // window_size_ms) * window_size_ms
window_key = f"{key}:{metric_name}:{window_start}"
# Get existing window data
window_data_str = metrics_window_store.get(window_key)
if window_data_str:
window_data = json.loads(window_data_str)
else:
window_data = {
"server_id": key,
"metric_name": metric_name,
"unit": unit,
"window_start": window_start,
"window_end": window_start + window_size_ms,
"window_size_ms": window_size_ms,
"count": 0,
"sum": 0,
"min": metric_value,
"max": metric_value,
"first_timestamp": timestamp,
"last_timestamp": timestamp,
"values": [],
"datacenters": set(),
"environments": set(),
"services": set()
}
# Convert sets to lists for JSON serialization
window_data["datacenters"] = []
window_data["environments"] = []
window_data["services"] = []
# Update window aggregates
window_data["count"] += 1
window_data["sum"] += metric_value
window_data["min"] = min(window_data["min"], metric_value)
window_data["max"] = max(window_data["max"], metric_value)
window_data["last_timestamp"] = timestamp
# Track categorical data
if datacenter and datacenter not in window_data["datacenters"]:
window_data["datacenters"].append(datacenter)
if environment and environment not in window_data["environments"]:
window_data["environments"].append(environment)
if service and service not in window_data["services"]:
window_data["services"].append(service)
# Keep recent sample values for analysis (last 10 for memory efficiency)
window_data["values"].append({
"value": metric_value,
"timestamp": timestamp,
"metric_id": metric_id
})
if len(window_data["values"]) > 10:
window_data["values"] = window_data["values"][-10:]
# Calculate statistics
avg = window_data["sum"] / window_data["count"]
window_duration = window_data["last_timestamp"] - window_data["first_timestamp"]
# Check against alerting thresholds
alert_status = "normal"
if alerting.get("enabled", False):
if avg >= alerting.get("threshold_critical", float('inf')):
alert_status = "critical"
elif avg >= alerting.get("threshold_high", float('inf')):
alert_status = "warning"
# Store updated window
metrics_window_store.put(window_key, json.dumps(window_data))
# Generate comprehensive window aggregation result
result = {
"aggregation_type": "WINDOW_AGGREGATION",
"server_id": key,
"metric_name": metric_name,
"unit": unit,
"window": {
"start_timestamp": window_start,
"end_timestamp": window_start + window_size_ms,
"duration_ms": window_size_ms,
"actual_duration_ms": window_duration,
"window_key": window_key
},
"statistics": {
"count": window_data["count"],
"average": round(avg, 2),
"minimum": round(window_data["min"], 2),
"maximum": round(window_data["max"], 2),
"sum": round(window_data["sum"], 2),
"range": round(window_data["max"] - window_data["min"], 2)
},
"alerting": {
"status": alert_status,
"thresholds": alerting,
"current_average": round(avg, 2)
},
"distribution": {
"datacenters": window_data["datacenters"],
"environments": window_data["environments"],
"services": window_data["services"]
},
"sampling": {
"recent_values": window_data["values"][-3:], # Show last 3 values
"total_samples": window_data["count"]
},
"processing_info": {
"window_updated": True,
"latest_metric_id": metric_id,
"processing_timestamp": int(time.time() * 1000)
}
}
return result
expression: result if result else None
resultType: json
pipelines:
window_aggregation_pipeline:
from: server_metrics
via:
- type: mapValues
mapper: aggregate_metrics
- type: filter
if:
expression: value is not None
to:
topic: windowed_metrics
keyType: string
valueType: json
Key concepts demonstrated:
- Time window calculation and management
- Running aggregations (min, max, sum, count)
- Memory-efficient value storage
Session Store
Session stores organize data by session windows, automatically handling session boundaries based on inactivity gaps.
What it does:
- Produces click events: Creates user page visits with timestamps, occasionally adding 1-5 second gaps (20% chance) to simulate session breaks
- Tracks session timeouts: Uses 30-second inactivity threshold - if time since last click > 30s, starts new session and increments session counter
- Stores session state: Keeps running data per user including current session ID, start time, page count, total duration, devices used
- Detects session boundaries: When timeout exceeded, logs session end, resets counters, starts fresh session with new session ID
- Outputs session analytics: Returns comprehensive session data showing current session metrics, lifetime totals, device/page patterns, and conversion events
User Clicks Producer - click to expand
# Producer for session store demo - generates user click events with gaps
functions:
generate_click_events:
type: generator
globalCode: |
import random
import time
click_counter = 0
users = ["user_A", "user_B", "user_C"]
pages = ["/home", "/products", "/cart", "/checkout", "/profile"]
code: |
global click_counter, users, pages
click_counter += 1
user_id = random.choice(users)
page = random.choice(pages)
# Simulate session gaps by occasionally pausing
gap_probability = 0.2 # 20% chance of gap
has_gap = False
gap_duration = 0
if random.random() < gap_probability:
# Simulate longer gap between sessions
gap_duration = random.randint(1, 5) # 1-5 second gap
has_gap = True
log.info("Simulating {}s gap for user {}", gap_duration, user_id)
current_timestamp = int(time.time() * 1000)
duration_on_page = random.randint(1000, 30000) # 1-30 seconds
# Create structured JSON click event for better readability in Kowl UI
click_event = {
"user_id": user_id,
"page": page,
"timestamp": current_timestamp,
"click_id": f"click_{click_counter:04d}",
"duration_on_page": duration_on_page,
"sequence_number": click_counter,
"user_agent": random.choice(["Chrome/91.0", "Firefox/89.0", "Safari/14.1", "Edge/91.0"]),
"device_info": {
"type": random.choice(["desktop", "tablet", "mobile"]),
"os": random.choice(["Windows", "MacOS", "Linux", "iOS", "Android"]),
"screen_resolution": random.choice(["1920x1080", "1366x768", "1440x900", "375x812"])
},
"referrer": random.choice([None, "/home", "/products", "/search", "external"]),
"session_info": {
"has_simulated_gap": has_gap,
"gap_duration_hint": gap_duration if has_gap else 0,
"expected_session_timeout_ms": 30000
},
"page_metadata": {
"category": {
"/home": "landing",
"/products": "catalog",
"/cart": "commerce",
"/checkout": "commerce",
"/profile": "account"
}.get(page, "other"),
"requires_auth": page in ["/cart", "/checkout", "/profile"],
"is_conversion_page": page == "/checkout"
},
"interaction": {
"click_type": random.choice(["navigation", "button", "link", "form"]),
"coordinates": {
"x": random.randint(0, 1920),
"y": random.randint(0, 1080)
}
},
"metadata": {
"simulation": True,
"session_tracking": True,
"gap_simulation": has_gap
}
}
expression: (user_id, click_event)
resultType: (string, json)
producers:
click_event_producer:
generator: generate_click_events
interval: 3s # 3 second intervals to create natural session boundaries
to:
topic: user_clicks
keyType: string
valueType: json
Session Store Processor - click to expand
# Processor demonstrating session store for session-based analytics
streams:
user_clicks:
topic: user_clicks
keyType: string
valueType: json
stores:
user_session_store:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
functions:
track_user_sessions:
type: valueTransformer
stores:
- user_session_store
code: |
import json
import time
# Extract fields from JSON click event
if not value:
return None
page = value.get("page")
duration = value.get("duration_on_page")
timestamp = value.get("timestamp")
click_id = value.get("click_id")
sequence_number = value.get("sequence_number")
user_agent = value.get("user_agent")
device_info = value.get("device_info", {})
referrer = value.get("referrer")
session_info = value.get("session_info", {})
page_metadata = value.get("page_metadata", {})
interaction = value.get("interaction", {})
if not page or duration is None or not timestamp:
return None
# Session timeout: 30 seconds of inactivity
session_timeout_ms = 30 * 1000
current_time = int(time.time() * 1000)
# Get existing session data
session_data_str = user_session_store.get(key)
if session_data_str:
session_data = json.loads(session_data_str)
else:
session_data = {
"user_id": key,
"current_session_id": None,
"session_start": None,
"last_activity": 0,
"session_page_count": 0,
"session_total_duration": 0,
"total_sessions": 0,
"pages_visited": [],
"devices_used": [],
"user_agents": [],
"page_categories": [],
"conversion_events": 0,
"referrer_sources": []
}
# Check if this starts a new session
last_activity = session_data.get("last_activity", 0)
time_since_last = timestamp - last_activity
session_ended = False
if (session_data["current_session_id"] is None or
time_since_last > session_timeout_ms):
# End previous session if exists
if session_data["current_session_id"] is not None:
session_duration = last_activity - session_data["session_start"]
log.info("Session ended for {}: duration={}ms, pages={}",
key, session_duration, session_data["session_page_count"])
session_data["total_sessions"] += 1
session_ended = True
# Start new session
session_data["current_session_id"] = f"session_{key}_{timestamp}"
session_data["session_start"] = timestamp
session_data["session_page_count"] = 0
session_data["session_total_duration"] = 0
session_data["pages_visited"] = []
log.info("New session started for {}: {}", key, session_data["current_session_id"])
# Update current session
session_data["last_activity"] = timestamp
session_data["session_page_count"] += 1
session_data["session_total_duration"] += duration
session_data["pages_visited"].append(page)
# Track additional session metadata
device_type = device_info.get("type")
if device_type and device_type not in session_data["devices_used"]:
session_data["devices_used"].append(device_type)
if user_agent and user_agent not in session_data["user_agents"]:
session_data["user_agents"].append(user_agent)
page_category = page_metadata.get("category")
if page_category and page_category not in session_data["page_categories"]:
session_data["page_categories"].append(page_category)
if page_metadata.get("is_conversion_page", False):
session_data["conversion_events"] += 1
if referrer and referrer != "None" and referrer not in session_data["referrer_sources"]:
session_data["referrer_sources"].append(referrer)
# Store updated session
user_session_store.put(key, json.dumps(session_data))
# Calculate session metrics
session_duration_so_far = timestamp - session_data["session_start"]
# Generate comprehensive session analytics result
result = {
"analytics_type": "SESSION_ANALYTICS",
"user_id": key,
"session_id": session_data["current_session_id"],
"session_status": "ended" if session_ended else "active",
"current_event": {
"click_id": click_id,
"page": page,
"page_category": page_category,
"duration_on_page": duration,
"timestamp": timestamp,
"sequence_number": sequence_number,
"interaction_type": interaction.get("click_type"),
"requires_auth": page_metadata.get("requires_auth", False),
"is_conversion": page_metadata.get("is_conversion_page", False)
},
"session_metrics": {
"pages_visited_count": session_data["session_page_count"],
"total_time_ms": session_data["session_total_duration"],
"session_duration_ms": session_duration_so_far,
"pages_visited": session_data["pages_visited"][-5:], # Last 5 pages
"unique_categories": session_data["page_categories"],
"conversion_events": session_data["conversion_events"],
"session_start": session_data["session_start"],
"last_activity": session_data["last_activity"]
},
"user_profile": {
"total_sessions": session_data["total_sessions"],
"devices_used": session_data["devices_used"],
"user_agents": session_data["user_agents"],
"referrer_sources": session_data["referrer_sources"]
},
"device_context": {
"current_device": device_info,
"current_user_agent": user_agent,
"current_referrer": referrer
},
"session_insights": {
"time_since_last_activity": time_since_last,
"session_timeout_ms": session_timeout_ms,
"has_simulated_gap": session_info.get("has_simulated_gap", False),
"avg_time_per_page": session_data["session_total_duration"] / session_data["session_page_count"] if session_data["session_page_count"] > 0 else 0
}
}
return result
expression: result if result else None
resultType: json
pipelines:
session_analytics_pipeline:
from: user_clicks
via:
- type: mapValues
mapper: track_user_sessions
- type: filter
if:
expression: value is not None
to:
topic: session_analytics
keyType: string
valueType: json
Key concepts demonstrated:
- Session timeout handling
- Automatic session boundary detection
- Session lifecycle management
Optimized Store Configuration
For high-volume scenarios, proper store configuration is crucial for performance.
What it does:
- Produces device events: Creates high-frequency events (sensor_reading, status_update, error, heartbeat) for multiple devices with facility/zone info
- Stores compact state: Keeps minimal JSON per device with just current status, last_temp, error_count, heartbeat_count, location info
- Processes selectively: Updates state for all events, but only outputs alerts when specific conditions met (temp >75°C, errors, status changes)
- Optimizes for volume: Uses efficient JSON storage, processes fast, emits only critical alerts to reduce downstream message volume
- Tracks device health: Monitors temperature trends, error accumulation, heartbeat patterns, status transitions with location context
High Volume Events Producer - click to expand
# Producer for optimized store demo - generates high-volume events
functions:
generate_high_volume_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
device_ids = [f"device_{i:03d}" for i in range(1, 21)] # 20 devices
event_types = ["sensor_reading", "status_update", "error", "heartbeat"]
code: |
global event_counter, device_ids, event_types
event_counter += 1
device_id = random.choice(device_ids)
event_type = random.choice(event_types)
# Generate JSON event data for better readability in Kowl UI
current_timestamp = int(time.time() * 1000)
if event_type == "sensor_reading":
value_data = round(random.uniform(20.0, 80.0), 2) # Temperature
unit = "celsius"
elif event_type == "status_update":
value_data = random.choice(["online", "offline", "maintenance"])
unit = None
elif event_type == "error":
value_data = f"error_code_{random.randint(100, 999)}"
unit = None
else: # heartbeat
value_data = "ok"
unit = None
# Create structured JSON event for better readability in Kowl UI
device_event = {
"device_id": device_id,
"event_type": event_type,
"value": value_data,
"timestamp": current_timestamp,
"event_id": f"evt_{event_counter:06d}",
"unit": unit,
"facility": random.choice(["factory_a", "factory_b", "warehouse_c"]),
"zone": random.choice(["zone_1", "zone_2", "zone_3", "zone_4"]),
"metadata": {
"simulation": True,
"high_volume": True,
"optimized_processing": True,
"sequence": event_counter
}
}
expression: (device_id, device_event)
resultType: (string, json)
producers:
high_volume_producer:
generator: generate_high_volume_events
interval: 1s # High frequency
to:
topic: device_events
keyType: string
valueType: json
Optimized Store Processor - click to expand
# Processor demonstrating optimized state store configuration
streams:
device_events:
topic: device_events
keyType: string
valueType: json
stores:
device_state_store:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
# Optimized for high-volume scenarios
functions:
process_device_events:
type: valueTransformer
stores:
- device_state_store
code: |
import json
import time
# Extract fields from JSON event
if not value:
return None
event_type = value.get("event_type")
event_value = value.get("value")
timestamp = value.get("timestamp")
event_id = value.get("event_id")
facility = value.get("facility")
zone = value.get("zone")
if not event_type or event_value is None or not timestamp:
return None
# Get existing device state
state_str = device_state_store.get(key)
if state_str:
device_state = json.loads(state_str)
else:
device_state = {
"device_id": key,
"status": "unknown",
"last_temp": None,
"error_count": 0,
"last_seen": timestamp,
"heartbeat_count": 0,
"facility": facility,
"zone": zone
}
# Process event efficiently with JSON output
result_event = None
if event_type == "sensor_reading":
device_state["last_temp"] = float(event_value)
device_state["status"] = "active"
# Check for temperature alerts
if device_state["last_temp"] > 75:
result_event = {
"alert_type": "TEMPERATURE_ALERT",
"device_id": key,
"temperature": device_state["last_temp"],
"threshold": 75,
"facility": facility,
"zone": zone,
"event_id": event_id,
"timestamp": timestamp
}
log.warn("Temperature alert for device {}: {:.1f}C", key, device_state["last_temp"])
elif event_type == "status_update":
device_state["status"] = event_value
result_event = {
"alert_type": "STATUS_UPDATE",
"device_id": key,
"status": event_value,
"facility": facility,
"zone": zone,
"event_id": event_id,
"timestamp": timestamp
}
elif event_type == "error":
device_state["error_count"] += 1
device_state["status"] = "error"
result_event = {
"alert_type": "ERROR",
"device_id": key,
"error_code": event_value,
"error_count": device_state["error_count"],
"facility": facility,
"zone": zone,
"event_id": event_id,
"timestamp": timestamp
}
log.error("Error on device {}: {} (total errors: {})", key, event_value, device_state["error_count"])
elif event_type == "heartbeat":
device_state["status"] = "online"
device_state["heartbeat_count"] = device_state.get("heartbeat_count", 0) + 1
# Only emit heartbeat summary every 10th heartbeat to reduce output volume
if device_state["heartbeat_count"] % 10 == 0:
result_event = {
"alert_type": "HEARTBEAT_SUMMARY",
"device_id": key,
"status": "online",
"heartbeat_count": device_state["heartbeat_count"],
"facility": facility,
"zone": zone,
"timestamp": timestamp
}
# Update last seen and store state
device_state["last_seen"] = timestamp
device_state_store.put(key, json.dumps(device_state))
return result_event
expression: result if result else None
resultType: json
pipelines:
optimized_processing_pipeline:
from: device_events
via:
- type: mapValues
mapper: process_device_events
- type: filter
if:
expression: value is not None
to:
topic: device_alerts
keyType: string
valueType: json
Key concepts demonstrated:
- Compact state representation for performance
- Selective event emission
- Error counting and alerting
Multi-Store Pattern
Complex applications often require multiple state stores working together to manage different aspects of state.
What it does:
- Produces order events: Creates order status updates (created, shipped, delivered) with product IDs, quantities, prices as pipe-delimited strings
- Uses three state stores: Updates order_state_store (current order status), customer_metrics_store (totals per customer), product_inventory_store (stock levels)
- Coordinates updates: For each order event, atomically updates all three stores - order status, customer spending totals, inventory levels
- Tracks relationships: Maps orders to customers via hash function, maintains order history lists, tracks inventory changes per product
- Outputs comprehensive results: Returns formatted string combining data from all three stores showing order details, customer analytics, and inventory impact
Order Events Producer - click to expand
# Producer for multi-store demo - generates order processing events as pipe-delimited strings
functions:
generate_order_events:
type: generator
globalCode: |
import random
import time
order_counter = 0
products = ["prod_A", "prod_B", "prod_C", "prod_D"]
statuses = ["created", "paid", "shipped", "delivered", "cancelled"]
code: |
global order_counter, products, statuses
order_counter += 1
# Generate order event as pipe-delimited string: "status:product_id:quantity:price:timestamp"
status = random.choice(statuses)
product_id = random.choice(products)
quantity = random.randint(1, 5)
price = round(random.uniform(10.0, 100.0), 2)
timestamp = int(time.time() * 1000)
# Create pipe-delimited string format expected by processor
order_value = f"{status}:{product_id}:{quantity}:{price}:{timestamp}"
order_key = f"order_{order_counter:04d}"
log.info("Generating order: key={}, value={}", order_key, order_value)
expression: (order_key, order_value)
resultType: (string, string)
producers:
order_event_producer:
generator: generate_order_events
interval: 2s
to:
topic: order_events
keyType: string
valueType: string
Multi-Store Processor - click to expand
# Processor demonstrating multi-store pattern for complex order processing
streams:
order_events:
topic: order_events
keyType: string
valueType: string
stores:
# Store for current order state
order_state_store:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
# Store for customer metrics
customer_metrics_store:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
# Store for product inventory tracking
product_inventory_store:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
functions:
process_order_with_multiple_stores:
type: valueTransformer
stores:
- order_state_store
- customer_metrics_store
- product_inventory_store
code: |
import json
# Parse order event: "status:product_id:quantity:price:timestamp"
parts = value.split(":")
if len(parts) != 5:
return None
status = parts[0]
product_id = parts[1]
quantity = int(parts[2])
price = float(parts[3])
timestamp = int(parts[4])
order_id = key
# 1. Update order state store
order_state = {
"order_id": order_id,
"status": status,
"product_id": product_id,
"quantity": quantity,
"price": price,
"last_updated": timestamp
}
order_state_store.put(order_id, json.dumps(order_state))
# 2. Update customer metrics (extract customer from order_id or use a lookup)
customer_id = f"cust_{hash(order_id) % 4 + 1:03d}" # Simple customer mapping
customer_data_str = customer_metrics_store.get(customer_id)
if customer_data_str:
customer_data = json.loads(customer_data_str)
else:
customer_data = {
"customer_id": customer_id,
"total_orders": 0,
"total_spent": 0,
"order_history": []
}
# Update customer metrics based on status
if status == "created":
customer_data["total_orders"] += 1
customer_data["order_history"].append(order_id)
# Keep only last 10 orders for memory efficiency
if len(customer_data["order_history"]) > 10:
customer_data["order_history"] = customer_data["order_history"][-10:]
elif status == "paid":
customer_data["total_spent"] += price * quantity
customer_metrics_store.put(customer_id, json.dumps(customer_data))
# 3. Update product inventory
inventory_data_str = product_inventory_store.get(product_id)
if inventory_data_str:
inventory_data = json.loads(inventory_data_str)
else:
inventory_data = {
"product_id": product_id,
"reserved_quantity": 0,
"sold_quantity": 0,
"available_stock": 100 # Default stock
}
# Update inventory based on order status
if status == "created":
inventory_data["reserved_quantity"] += quantity
inventory_data["available_stock"] -= quantity
elif status == "shipped":
inventory_data["reserved_quantity"] -= quantity
inventory_data["sold_quantity"] += quantity
elif status == "cancelled":
inventory_data["reserved_quantity"] -= quantity
inventory_data["available_stock"] += quantity
product_inventory_store.put(product_id, json.dumps(inventory_data))
# Generate comprehensive order summary using all stores
summary = {
"order_id": order_id,
"status": status,
"customer_total_orders": customer_data["total_orders"],
"customer_total_spent": customer_data["total_spent"],
"product_available": inventory_data["available_stock"],
"product_reserved": inventory_data["reserved_quantity"]
}
result = f"ORDER_PROCESSED:{order_id}:status={status}:customer_orders={customer_data['total_orders']}:stock_left={inventory_data['available_stock']}"
log.info("Processed order {} with status {}: customer has {} orders, product {} has {} stock left",
order_id, status, customer_data["total_orders"], product_id, inventory_data["available_stock"])
return result
expression: result if result else None
resultType: string
pipelines:
multi_store_order_processing:
from: order_events
via:
- type: mapValues
mapper: process_order_with_multiple_stores
- type: filter
if:
expression: value is not None
to:
topic: order_processing_results
keyType: string
valueType: string
Key concepts demonstrated:
- Multiple state stores in single function
- Coordinated state updates
- Cross-store data correlation
State Store Types Summary
Store Type | Use Case | Key Features |
---|---|---|
Key-Value | General state, caching, counters | Simple key-to-value mapping |
Window | Time-based aggregations | Automatic time partitioning |
Session | User sessions, activity tracking | Inactivity-based boundaries |
Conclusion
Custom state stores in KSML provide powerful capabilities for building stateful stream processing applications. By understanding the different store types, configuration options, and optimization techniques, you can build efficient and scalable applications that maintain state effectively across events.
For foundational concepts and basic configuration patterns, refer back to the State Stores Tutorial.