Skip to content

Custom State Stores in KSML

This tutorial explores how to implement and optimize custom state stores in KSML, allowing you to maintain and manage state in your stream processing applications with greater flexibility and control.

Introduction to State Stores

State stores are a critical component of stateful stream processing applications. They allow your application to:

  • Maintain data across multiple messages and events
  • Track historical information for context-aware processing
  • Implement stateful operations like aggregations and joins
  • Build sophisticated business logic that depends on previous events
  • Persist state for fault tolerance and recovery

KSML provides built-in state store capabilities that integrate seamlessly with Kafka Streams, offering exactly-once processing guarantees and automatic state management.

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_activity && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_session_stats && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic server_metrics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic windowed_metrics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_clicks && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic session_analytics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_alerts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic order_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic order_processing_results && \

Advanced Key-Value Store Patterns

Building on the basic key-value concepts from the State Stores Tutorial, this example demonstrates advanced user session tracking with complex business logic.

What it does:

  • Produces user activities: Creates events like login, page_view, click with user IDs, session IDs (changes every 10 events), browser/device info
  • Stores user profiles: Keeps running JSON data per user including total sessions, action counts, time spent, devices/browsers used
  • Detects session changes: When session_id changes from previous event, increments session counter and logs the transition
  • Tracks comprehensive stats: Updates action counters, adds new pages/devices/countries to lists, calculates total time across all sessions
  • Outputs session updates: Returns enriched user profile showing current session, lifetime statistics, and behavioral patterns whenever activity occurs
User Activity Producer - click to expand
# Producer for user session tracking - generates user activity events

functions:
  generate_user_activity:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      users = ["alice", "bob", "charlie", "diana", "eve"]
      actions = ["login", "page_view", "click", "search", "logout"]
    code: |
      global event_counter, users, actions

      event_counter += 1
      user_id = random.choice(users)
      action = random.choice(actions)

      # Generate structured JSON activity for better readability in Kowl UI
      activity = {
        "user_id": user_id,
        "action": action,
        "timestamp": int(time.time() * 1000),
        "session_id": f"session_{user_id}_{event_counter // 10}",  # Change session every 10 events
        "page": f"/page/{random.randint(1, 5)}",
        "duration_ms": random.randint(100, 5000),
        "event_id": f"evt_{event_counter:06d}",
        "browser": random.choice(["chrome", "firefox", "safari", "edge"]),
        "device_type": random.choice(["desktop", "tablet", "mobile"]),
        "location": {
          "country": random.choice(["US", "CA", "UK", "DE", "FR"]),
          "city": random.choice(["New York", "London", "Paris", "Berlin", "Toronto"])
        },
        "metadata": {
          "simulation": True,
          "session_tracking": True,
          "event_sequence": event_counter
        }
      }

    expression: (user_id, activity)
    resultType: (string, json)

producers:
  user_activity_producer:
    generator: generate_user_activity
    interval: 2s
    to:
      topic: user_activity
      keyType: string
      valueType: json
Basic Key-Value Store Processor - click to expand
# Processor demonstrating basic key-value store for session tracking

streams:
  user_activity:
    topic: user_activity
    keyType: string
    valueType: json

stores:
  user_session_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

functions:
  track_user_sessions:
    type: valueTransformer
    stores:
      - user_session_store
    code: |
      import json
      import time

      # Extract fields from JSON activity
      if not value:
        return None

      action = value.get("action")
      session_id = value.get("session_id")
      duration = value.get("duration_ms")
      timestamp = value.get("timestamp")
      event_id = value.get("event_id")
      page = value.get("page")
      browser = value.get("browser")
      device_type = value.get("device_type")
      location = value.get("location", {})

      if not action or not session_id or duration is None:
        return None

      # Get existing session data
      session_data_str = user_session_store.get(key)
      if session_data_str:
        session_data = json.loads(session_data_str)
      else:
        session_data = {
          "user_id": key,
          "current_session": None,
          "total_sessions": 0,
          "total_time_ms": 0,
          "actions_count": {},
          "first_seen": timestamp,
          "last_activity": timestamp,
          "devices_used": set(),
          "browsers_used": set(),
          "countries_visited": set(),
          "pages_visited": set()
        }
        # Convert sets to lists for JSON serialization
        session_data["devices_used"] = []
        session_data["browsers_used"] = []
        session_data["countries_visited"] = []
        session_data["pages_visited"] = []

      # Track session changes
      session_ended = False
      if session_data["current_session"] != session_id:
        if session_data["current_session"] is not None:
          # Session changed
          session_data["total_sessions"] += 1
          session_ended = True
          log.info("Session ended for user {}: {}", key, session_data["current_session"])

        session_data["current_session"] = session_id
        log.info("New session started for user {}: {}", key, session_id)

      # Update activity tracking
      session_data["total_time_ms"] += duration
      session_data["last_activity"] = timestamp

      if action in session_data["actions_count"]:
        session_data["actions_count"][action] += 1
      else:
        session_data["actions_count"][action] = 1

      # Track device/browser/location usage
      if device_type and device_type not in session_data["devices_used"]:
        session_data["devices_used"].append(device_type)
      if browser and browser not in session_data["browsers_used"]:
        session_data["browsers_used"].append(browser)
      if location.get("country") and location["country"] not in session_data["countries_visited"]:
        session_data["countries_visited"].append(location["country"])
      if page and page not in session_data["pages_visited"]:
        session_data["pages_visited"].append(page)

      # Store updated session data
      user_session_store.put(key, json.dumps(session_data))

      # Generate structured session summary
      result = {
        "stats_type": "USER_SESSION_STATS",
        "user_id": key,
        "current_session": session_id,
        "session_ended": session_ended,
        "activity": {
          "event_id": event_id,
          "action": action,
          "page": page,
          "duration_ms": duration,
          "timestamp": timestamp
        },
        "session_totals": {
          "total_sessions": session_data["total_sessions"],
          "total_time_ms": session_data["total_time_ms"],
          "unique_actions": len(session_data["actions_count"]),
          "action_counts": session_data["actions_count"]
        },
        "user_profile": {
          "first_seen": session_data["first_seen"],
          "last_activity": session_data["last_activity"],
          "devices_used": session_data["devices_used"],
          "browsers_used": session_data["browsers_used"],
          "countries_visited": session_data["countries_visited"],
          "pages_visited": len(session_data["pages_visited"]),
          "most_visited_pages": session_data["pages_visited"][-5:] if len(session_data["pages_visited"]) > 5 else session_data["pages_visited"]
        },
        "current_context": {
          "device_type": device_type,
          "browser": browser,
          "location": location
        }
      }

      return result

    expression: result if result else None
    resultType: json

pipelines:
  session_tracking_pipeline:
    from: user_activity
    via:
      - type: mapValues
        mapper: track_user_sessions
      - type: filter
        if:
          expression: value is not None
    to:
      topic: user_session_stats
      keyType: string
      valueType: json

Key concepts demonstrated:

  • JSON serialization for complex state objects
  • Session boundary detection
  • Persistent state with caching enabled

Window Store

Window stores organize data by time windows, enabling time-based aggregations and analytics.

What it does:

  • Produces server metrics: Creates metrics like cpu_usage, memory_usage, disk_io with varying values (base + sine wave + noise) for different servers
  • Creates time windows: Divides timestamps into 5-minute buckets (300,000ms), creates unique window keys like "server1:cpu_usage:1640995200000"
  • Accumulates window stats: For each metric in a time window, stores running count, sum, min, max, and recent sample list in state store
  • Calculates aggregates: When outputting, computes average from sum/count, range from max-min, tracks categorical data like datacenter/environment
  • Outputs window results: Returns complete window statistics only when window has enough samples, showing aggregated metrics with alerting thresholds and metadata
Metrics Data Producer - click to expand
# Producer for window store demo - generates time-series metrics

functions:
  generate_metrics:
    type: generator
    globalCode: |
      import random
      import time
      import math
      metric_counter = 0
      metrics = ["cpu_usage", "memory_usage", "disk_io", "network_io"]
      servers = ["server_001", "server_002", "server_003"]
    code: |
      global metric_counter, metrics, servers

      metric_counter += 1
      server = random.choice(servers)
      metric_name = random.choice(metrics)

      # Generate realistic metric values
      base_value = {
        "cpu_usage": 30,
        "memory_usage": 60,
        "disk_io": 1000,
        "network_io": 500
      }[metric_name]

      # Add some variation and trends
      trend = math.sin(metric_counter / 20) * 10
      noise = random.uniform(-5, 5)
      value = max(0, base_value + trend + noise)

      current_timestamp = int(time.time() * 1000)

      # Create structured JSON metric for better readability in Kowl UI
      metric = {
        "server_id": server,
        "metric_name": metric_name,
        "value": round(value, 2),
        "timestamp": current_timestamp,
        "metric_id": f"metric_{metric_counter:06d}",
        "unit": {
          "cpu_usage": "percent",
          "memory_usage": "percent", 
          "disk_io": "MB/s",
          "network_io": "MB/s"
        }[metric_name],
        "datacenter": random.choice(["dc1", "dc2", "dc3"]),
        "environment": random.choice(["prod", "staging", "test"]),
        "service": random.choice(["web", "api", "database", "cache"]),
        "alerting": {
          "enabled": True,
          "threshold_high": base_value * 1.5,
          "threshold_critical": base_value * 2.0
        },
        "metadata": {
          "simulation": True,
          "window_aggregation": True,
          "baseline_value": base_value,
          "trend_component": round(trend, 2),
          "noise_component": round(noise, 2)
        }
      }

    expression: (server, metric)
    resultType: (string, json)

producers:
  metrics_producer:
    generator: generate_metrics
    interval: 1s
    to:
      topic: server_metrics
      keyType: string
      valueType: json
Window Store Processor - click to expand
# Processor demonstrating window store for time-based aggregations

streams:
  server_metrics:
    topic: server_metrics
    keyType: string
    valueType: json

stores:
  metrics_window_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

functions:
  aggregate_metrics:
    type: valueTransformer
    stores:
      - metrics_window_store
    code: |
      import json
      import time

      # Extract fields from JSON metric
      if not value:
        return None

      metric_name = value.get("metric_name")
      metric_value = value.get("value")
      timestamp = value.get("timestamp")
      metric_id = value.get("metric_id")
      unit = value.get("unit")
      datacenter = value.get("datacenter")
      environment = value.get("environment")
      service = value.get("service")
      alerting = value.get("alerting", {})
      metadata = value.get("metadata", {})

      if not metric_name or metric_value is None or not timestamp:
        return None

      # Create window key (5-minute windows)
      window_size_ms = 5 * 60 * 1000  # 5 minutes
      window_start = (timestamp // window_size_ms) * window_size_ms
      window_key = f"{key}:{metric_name}:{window_start}"

      # Get existing window data
      window_data_str = metrics_window_store.get(window_key)
      if window_data_str:
        window_data = json.loads(window_data_str)
      else:
        window_data = {
          "server_id": key,
          "metric_name": metric_name,
          "unit": unit,
          "window_start": window_start,
          "window_end": window_start + window_size_ms,
          "window_size_ms": window_size_ms,
          "count": 0,
          "sum": 0,
          "min": metric_value,
          "max": metric_value,
          "first_timestamp": timestamp,
          "last_timestamp": timestamp,
          "values": [],
          "datacenters": set(),
          "environments": set(),
          "services": set()
        }
        # Convert sets to lists for JSON serialization
        window_data["datacenters"] = []
        window_data["environments"] = []
        window_data["services"] = []

      # Update window aggregates
      window_data["count"] += 1
      window_data["sum"] += metric_value
      window_data["min"] = min(window_data["min"], metric_value)
      window_data["max"] = max(window_data["max"], metric_value)
      window_data["last_timestamp"] = timestamp

      # Track categorical data
      if datacenter and datacenter not in window_data["datacenters"]:
        window_data["datacenters"].append(datacenter)
      if environment and environment not in window_data["environments"]:
        window_data["environments"].append(environment)
      if service and service not in window_data["services"]:
        window_data["services"].append(service)

      # Keep recent sample values for analysis (last 10 for memory efficiency)
      window_data["values"].append({
        "value": metric_value,
        "timestamp": timestamp,
        "metric_id": metric_id
      })
      if len(window_data["values"]) > 10:
        window_data["values"] = window_data["values"][-10:]

      # Calculate statistics
      avg = window_data["sum"] / window_data["count"]
      window_duration = window_data["last_timestamp"] - window_data["first_timestamp"]

      # Check against alerting thresholds
      alert_status = "normal"
      if alerting.get("enabled", False):
        if avg >= alerting.get("threshold_critical", float('inf')):
          alert_status = "critical"
        elif avg >= alerting.get("threshold_high", float('inf')):
          alert_status = "warning"

      # Store updated window
      metrics_window_store.put(window_key, json.dumps(window_data))

      # Generate comprehensive window aggregation result
      result = {
        "aggregation_type": "WINDOW_AGGREGATION",
        "server_id": key,
        "metric_name": metric_name,
        "unit": unit,
        "window": {
          "start_timestamp": window_start,
          "end_timestamp": window_start + window_size_ms,
          "duration_ms": window_size_ms,
          "actual_duration_ms": window_duration,
          "window_key": window_key
        },
        "statistics": {
          "count": window_data["count"],
          "average": round(avg, 2),
          "minimum": round(window_data["min"], 2),
          "maximum": round(window_data["max"], 2),
          "sum": round(window_data["sum"], 2),
          "range": round(window_data["max"] - window_data["min"], 2)
        },
        "alerting": {
          "status": alert_status,
          "thresholds": alerting,
          "current_average": round(avg, 2)
        },
        "distribution": {
          "datacenters": window_data["datacenters"],
          "environments": window_data["environments"],
          "services": window_data["services"]
        },
        "sampling": {
          "recent_values": window_data["values"][-3:],  # Show last 3 values
          "total_samples": window_data["count"]
        },
        "processing_info": {
          "window_updated": True,
          "latest_metric_id": metric_id,
          "processing_timestamp": int(time.time() * 1000)
        }
      }

      return result

    expression: result if result else None
    resultType: json

pipelines:
  window_aggregation_pipeline:
    from: server_metrics
    via:
      - type: mapValues
        mapper: aggregate_metrics
      - type: filter
        if:
          expression: value is not None
    to:
      topic: windowed_metrics
      keyType: string
      valueType: json

Key concepts demonstrated:

  • Time window calculation and management
  • Running aggregations (min, max, sum, count)
  • Memory-efficient value storage

Session Store

Session stores organize data by session windows, automatically handling session boundaries based on inactivity gaps.

What it does:

  • Produces click events: Creates user page visits with timestamps, occasionally adding 1-5 second gaps (20% chance) to simulate session breaks
  • Tracks session timeouts: Uses 30-second inactivity threshold - if time since last click > 30s, starts new session and increments session counter
  • Stores session state: Keeps running data per user including current session ID, start time, page count, total duration, devices used
  • Detects session boundaries: When timeout exceeded, logs session end, resets counters, starts fresh session with new session ID
  • Outputs session analytics: Returns comprehensive session data showing current session metrics, lifetime totals, device/page patterns, and conversion events
User Clicks Producer - click to expand
# Producer for session store demo - generates user click events with gaps

functions:
  generate_click_events:
    type: generator
    globalCode: |
      import random
      import time
      click_counter = 0
      users = ["user_A", "user_B", "user_C"]
      pages = ["/home", "/products", "/cart", "/checkout", "/profile"]
    code: |
      global click_counter, users, pages

      click_counter += 1
      user_id = random.choice(users)
      page = random.choice(pages)

      # Simulate session gaps by occasionally pausing
      gap_probability = 0.2  # 20% chance of gap
      has_gap = False
      gap_duration = 0
      if random.random() < gap_probability:
        # Simulate longer gap between sessions
        gap_duration = random.randint(1, 5)  # 1-5 second gap
        has_gap = True
        log.info("Simulating {}s gap for user {}", gap_duration, user_id)

      current_timestamp = int(time.time() * 1000)
      duration_on_page = random.randint(1000, 30000)  # 1-30 seconds

      # Create structured JSON click event for better readability in Kowl UI
      click_event = {
        "user_id": user_id,
        "page": page,
        "timestamp": current_timestamp,
        "click_id": f"click_{click_counter:04d}",
        "duration_on_page": duration_on_page,
        "sequence_number": click_counter,
        "user_agent": random.choice(["Chrome/91.0", "Firefox/89.0", "Safari/14.1", "Edge/91.0"]),
        "device_info": {
          "type": random.choice(["desktop", "tablet", "mobile"]),
          "os": random.choice(["Windows", "MacOS", "Linux", "iOS", "Android"]),
          "screen_resolution": random.choice(["1920x1080", "1366x768", "1440x900", "375x812"])
        },
        "referrer": random.choice([None, "/home", "/products", "/search", "external"]),
        "session_info": {
          "has_simulated_gap": has_gap,
          "gap_duration_hint": gap_duration if has_gap else 0,
          "expected_session_timeout_ms": 30000
        },
        "page_metadata": {
          "category": {
            "/home": "landing",
            "/products": "catalog", 
            "/cart": "commerce",
            "/checkout": "commerce",
            "/profile": "account"
          }.get(page, "other"),
          "requires_auth": page in ["/cart", "/checkout", "/profile"],
          "is_conversion_page": page == "/checkout"
        },
        "interaction": {
          "click_type": random.choice(["navigation", "button", "link", "form"]),
          "coordinates": {
            "x": random.randint(0, 1920),
            "y": random.randint(0, 1080)
          }
        },
        "metadata": {
          "simulation": True,
          "session_tracking": True,
          "gap_simulation": has_gap
        }
      }

    expression: (user_id, click_event)
    resultType: (string, json)

producers:
  click_event_producer:
    generator: generate_click_events
    interval: 3s  # 3 second intervals to create natural session boundaries
    to:
      topic: user_clicks
      keyType: string
      valueType: json
Session Store Processor - click to expand
# Processor demonstrating session store for session-based analytics

streams:
  user_clicks:
    topic: user_clicks
    keyType: string
    valueType: json

stores:
  user_session_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

functions:
  track_user_sessions:
    type: valueTransformer
    stores:
      - user_session_store
    code: |
      import json
      import time

      # Extract fields from JSON click event
      if not value:
        return None

      page = value.get("page")
      duration = value.get("duration_on_page")
      timestamp = value.get("timestamp")
      click_id = value.get("click_id")
      sequence_number = value.get("sequence_number")
      user_agent = value.get("user_agent")
      device_info = value.get("device_info", {})
      referrer = value.get("referrer")
      session_info = value.get("session_info", {})
      page_metadata = value.get("page_metadata", {})
      interaction = value.get("interaction", {})

      if not page or duration is None or not timestamp:
        return None

      # Session timeout: 30 seconds of inactivity
      session_timeout_ms = 30 * 1000
      current_time = int(time.time() * 1000)

      # Get existing session data
      session_data_str = user_session_store.get(key)
      if session_data_str:
        session_data = json.loads(session_data_str)
      else:
        session_data = {
          "user_id": key,
          "current_session_id": None,
          "session_start": None,
          "last_activity": 0,
          "session_page_count": 0,
          "session_total_duration": 0,
          "total_sessions": 0,
          "pages_visited": [],
          "devices_used": [],
          "user_agents": [],
          "page_categories": [],
          "conversion_events": 0,
          "referrer_sources": []
        }

      # Check if this starts a new session
      last_activity = session_data.get("last_activity", 0)
      time_since_last = timestamp - last_activity
      session_ended = False

      if (session_data["current_session_id"] is None or 
          time_since_last > session_timeout_ms):

        # End previous session if exists
        if session_data["current_session_id"] is not None:
          session_duration = last_activity - session_data["session_start"]
          log.info("Session ended for {}: duration={}ms, pages={}", 
                   key, session_duration, session_data["session_page_count"])
          session_data["total_sessions"] += 1
          session_ended = True

        # Start new session
        session_data["current_session_id"] = f"session_{key}_{timestamp}"
        session_data["session_start"] = timestamp
        session_data["session_page_count"] = 0
        session_data["session_total_duration"] = 0
        session_data["pages_visited"] = []

        log.info("New session started for {}: {}", key, session_data["current_session_id"])

      # Update current session
      session_data["last_activity"] = timestamp
      session_data["session_page_count"] += 1
      session_data["session_total_duration"] += duration
      session_data["pages_visited"].append(page)

      # Track additional session metadata
      device_type = device_info.get("type")
      if device_type and device_type not in session_data["devices_used"]:
        session_data["devices_used"].append(device_type)

      if user_agent and user_agent not in session_data["user_agents"]:
        session_data["user_agents"].append(user_agent)

      page_category = page_metadata.get("category")
      if page_category and page_category not in session_data["page_categories"]:
        session_data["page_categories"].append(page_category)

      if page_metadata.get("is_conversion_page", False):
        session_data["conversion_events"] += 1

      if referrer and referrer != "None" and referrer not in session_data["referrer_sources"]:
        session_data["referrer_sources"].append(referrer)

      # Store updated session
      user_session_store.put(key, json.dumps(session_data))

      # Calculate session metrics
      session_duration_so_far = timestamp - session_data["session_start"]

      # Generate comprehensive session analytics result
      result = {
        "analytics_type": "SESSION_ANALYTICS",
        "user_id": key,
        "session_id": session_data["current_session_id"],
        "session_status": "ended" if session_ended else "active",
        "current_event": {
          "click_id": click_id,
          "page": page,
          "page_category": page_category,
          "duration_on_page": duration,
          "timestamp": timestamp,
          "sequence_number": sequence_number,
          "interaction_type": interaction.get("click_type"),
          "requires_auth": page_metadata.get("requires_auth", False),
          "is_conversion": page_metadata.get("is_conversion_page", False)
        },
        "session_metrics": {
          "pages_visited_count": session_data["session_page_count"],
          "total_time_ms": session_data["session_total_duration"],
          "session_duration_ms": session_duration_so_far,
          "pages_visited": session_data["pages_visited"][-5:],  # Last 5 pages
          "unique_categories": session_data["page_categories"],
          "conversion_events": session_data["conversion_events"],
          "session_start": session_data["session_start"],
          "last_activity": session_data["last_activity"]
        },
        "user_profile": {
          "total_sessions": session_data["total_sessions"],
          "devices_used": session_data["devices_used"],
          "user_agents": session_data["user_agents"],
          "referrer_sources": session_data["referrer_sources"]
        },
        "device_context": {
          "current_device": device_info,
          "current_user_agent": user_agent,
          "current_referrer": referrer
        },
        "session_insights": {
          "time_since_last_activity": time_since_last,
          "session_timeout_ms": session_timeout_ms,
          "has_simulated_gap": session_info.get("has_simulated_gap", False),
          "avg_time_per_page": session_data["session_total_duration"] / session_data["session_page_count"] if session_data["session_page_count"] > 0 else 0
        }
      }

      return result

    expression: result if result else None
    resultType: json

pipelines:
  session_analytics_pipeline:
    from: user_clicks
    via:
      - type: mapValues
        mapper: track_user_sessions
      - type: filter
        if:
          expression: value is not None
    to:
      topic: session_analytics
      keyType: string
      valueType: json

Key concepts demonstrated:

  • Session timeout handling
  • Automatic session boundary detection
  • Session lifecycle management

Optimized Store Configuration

For high-volume scenarios, proper store configuration is crucial for performance.

What it does:

  • Produces device events: Creates high-frequency events (sensor_reading, status_update, error, heartbeat) for multiple devices with facility/zone info
  • Stores compact state: Keeps minimal JSON per device with just current status, last_temp, error_count, heartbeat_count, location info
  • Processes selectively: Updates state for all events, but only outputs alerts when specific conditions met (temp >75°C, errors, status changes)
  • Optimizes for volume: Uses efficient JSON storage, processes fast, emits only critical alerts to reduce downstream message volume
  • Tracks device health: Monitors temperature trends, error accumulation, heartbeat patterns, status transitions with location context
High Volume Events Producer - click to expand
# Producer for optimized store demo - generates high-volume events

functions:
  generate_high_volume_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      device_ids = [f"device_{i:03d}" for i in range(1, 21)]  # 20 devices
      event_types = ["sensor_reading", "status_update", "error", "heartbeat"]
    code: |
      global event_counter, device_ids, event_types

      event_counter += 1
      device_id = random.choice(device_ids)
      event_type = random.choice(event_types)

      # Generate JSON event data for better readability in Kowl UI
      current_timestamp = int(time.time() * 1000)

      if event_type == "sensor_reading":
        value_data = round(random.uniform(20.0, 80.0), 2)  # Temperature
        unit = "celsius"
      elif event_type == "status_update":
        value_data = random.choice(["online", "offline", "maintenance"])
        unit = None
      elif event_type == "error":
        value_data = f"error_code_{random.randint(100, 999)}"
        unit = None
      else:  # heartbeat
        value_data = "ok"
        unit = None

      # Create structured JSON event for better readability in Kowl UI
      device_event = {
        "device_id": device_id,
        "event_type": event_type,
        "value": value_data,
        "timestamp": current_timestamp,
        "event_id": f"evt_{event_counter:06d}",
        "unit": unit,
        "facility": random.choice(["factory_a", "factory_b", "warehouse_c"]),
        "zone": random.choice(["zone_1", "zone_2", "zone_3", "zone_4"]),
        "metadata": {
          "simulation": True,
          "high_volume": True,
          "optimized_processing": True,
          "sequence": event_counter
        }
      }

    expression: (device_id, device_event)
    resultType: (string, json)

producers:
  high_volume_producer:
    generator: generate_high_volume_events
    interval: 1s  # High frequency
    to:
      topic: device_events
      keyType: string
      valueType: json
Optimized Store Processor - click to expand
# Processor demonstrating optimized state store configuration

streams:
  device_events:
    topic: device_events
    keyType: string
    valueType: json

stores:
  device_state_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true
    # Optimized for high-volume scenarios

functions:
  process_device_events:
    type: valueTransformer
    stores:
      - device_state_store
    code: |
      import json
      import time

      # Extract fields from JSON event
      if not value:
        return None

      event_type = value.get("event_type")
      event_value = value.get("value")
      timestamp = value.get("timestamp")
      event_id = value.get("event_id")
      facility = value.get("facility")
      zone = value.get("zone")

      if not event_type or event_value is None or not timestamp:
        return None

      # Get existing device state
      state_str = device_state_store.get(key)
      if state_str:
        device_state = json.loads(state_str)
      else:
        device_state = {
          "device_id": key,
          "status": "unknown", 
          "last_temp": None, 
          "error_count": 0, 
          "last_seen": timestamp,
          "heartbeat_count": 0,
          "facility": facility,
          "zone": zone
        }

      # Process event efficiently with JSON output
      result_event = None

      if event_type == "sensor_reading":
        device_state["last_temp"] = float(event_value)
        device_state["status"] = "active"

        # Check for temperature alerts
        if device_state["last_temp"] > 75:
          result_event = {
            "alert_type": "TEMPERATURE_ALERT",
            "device_id": key,
            "temperature": device_state["last_temp"],
            "threshold": 75,
            "facility": facility,
            "zone": zone,
            "event_id": event_id,
            "timestamp": timestamp
          }
          log.warn("Temperature alert for device {}: {:.1f}C", key, device_state["last_temp"])

      elif event_type == "status_update":
        device_state["status"] = event_value
        result_event = {
          "alert_type": "STATUS_UPDATE",
          "device_id": key,
          "status": event_value,
          "facility": facility,
          "zone": zone,
          "event_id": event_id,
          "timestamp": timestamp
        }

      elif event_type == "error":
        device_state["error_count"] += 1
        device_state["status"] = "error"
        result_event = {
          "alert_type": "ERROR",
          "device_id": key,
          "error_code": event_value,
          "error_count": device_state["error_count"],
          "facility": facility,
          "zone": zone,
          "event_id": event_id,
          "timestamp": timestamp
        }
        log.error("Error on device {}: {} (total errors: {})", key, event_value, device_state["error_count"])

      elif event_type == "heartbeat":
        device_state["status"] = "online"
        device_state["heartbeat_count"] = device_state.get("heartbeat_count", 0) + 1
        # Only emit heartbeat summary every 10th heartbeat to reduce output volume
        if device_state["heartbeat_count"] % 10 == 0:
          result_event = {
            "alert_type": "HEARTBEAT_SUMMARY",
            "device_id": key,
            "status": "online",
            "heartbeat_count": device_state["heartbeat_count"],
            "facility": facility,
            "zone": zone,
            "timestamp": timestamp
          }

      # Update last seen and store state
      device_state["last_seen"] = timestamp
      device_state_store.put(key, json.dumps(device_state))

      return result_event

    expression: result if result else None
    resultType: json

pipelines:
  optimized_processing_pipeline:
    from: device_events
    via:
      - type: mapValues
        mapper: process_device_events
      - type: filter
        if:
          expression: value is not None
    to:
      topic: device_alerts
      keyType: string
      valueType: json

Key concepts demonstrated:

  • Compact state representation for performance
  • Selective event emission
  • Error counting and alerting

Multi-Store Pattern

Complex applications often require multiple state stores working together to manage different aspects of state.

What it does:

  • Produces order events: Creates order status updates (created, shipped, delivered) with product IDs, quantities, prices as pipe-delimited strings
  • Uses three state stores: Updates order_state_store (current order status), customer_metrics_store (totals per customer), product_inventory_store (stock levels)
  • Coordinates updates: For each order event, atomically updates all three stores - order status, customer spending totals, inventory levels
  • Tracks relationships: Maps orders to customers via hash function, maintains order history lists, tracks inventory changes per product
  • Outputs comprehensive results: Returns formatted string combining data from all three stores showing order details, customer analytics, and inventory impact
Order Events Producer - click to expand
# Producer for multi-store demo - generates order processing events as pipe-delimited strings

functions:
  generate_order_events:
    type: generator
    globalCode: |
      import random
      import time
      order_counter = 0
      products = ["prod_A", "prod_B", "prod_C", "prod_D"]
      statuses = ["created", "paid", "shipped", "delivered", "cancelled"]
    code: |
      global order_counter, products, statuses

      order_counter += 1

      # Generate order event as pipe-delimited string: "status:product_id:quantity:price:timestamp"
      status = random.choice(statuses)
      product_id = random.choice(products)
      quantity = random.randint(1, 5)
      price = round(random.uniform(10.0, 100.0), 2)
      timestamp = int(time.time() * 1000)

      # Create pipe-delimited string format expected by processor
      order_value = f"{status}:{product_id}:{quantity}:{price}:{timestamp}"
      order_key = f"order_{order_counter:04d}"

      log.info("Generating order: key={}, value={}", order_key, order_value)

    expression: (order_key, order_value)
    resultType: (string, string)

producers:
  order_event_producer:
    generator: generate_order_events
    interval: 2s
    to:
      topic: order_events
      keyType: string
      valueType: string
Multi-Store Processor - click to expand
# Processor demonstrating multi-store pattern for complex order processing

streams:
  order_events:
    topic: order_events
    keyType: string
    valueType: string

stores:
  # Store for current order state
  order_state_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

  # Store for customer metrics
  customer_metrics_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

  # Store for product inventory tracking
  product_inventory_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

functions:
  process_order_with_multiple_stores:
    type: valueTransformer
    stores:
      - order_state_store
      - customer_metrics_store
      - product_inventory_store
    code: |
      import json

      # Parse order event: "status:product_id:quantity:price:timestamp"
      parts = value.split(":")
      if len(parts) != 5:
        return None

      status = parts[0]
      product_id = parts[1]
      quantity = int(parts[2])
      price = float(parts[3])
      timestamp = int(parts[4])

      order_id = key

      # 1. Update order state store
      order_state = {
        "order_id": order_id,
        "status": status,
        "product_id": product_id,
        "quantity": quantity,
        "price": price,
        "last_updated": timestamp
      }
      order_state_store.put(order_id, json.dumps(order_state))

      # 2. Update customer metrics (extract customer from order_id or use a lookup)
      customer_id = f"cust_{hash(order_id) % 4 + 1:03d}"  # Simple customer mapping

      customer_data_str = customer_metrics_store.get(customer_id)
      if customer_data_str:
        customer_data = json.loads(customer_data_str)
      else:
        customer_data = {
          "customer_id": customer_id,
          "total_orders": 0,
          "total_spent": 0,
          "order_history": []
        }

      # Update customer metrics based on status
      if status == "created":
        customer_data["total_orders"] += 1
        customer_data["order_history"].append(order_id)
        # Keep only last 10 orders for memory efficiency
        if len(customer_data["order_history"]) > 10:
          customer_data["order_history"] = customer_data["order_history"][-10:]

      elif status == "paid":
        customer_data["total_spent"] += price * quantity

      customer_metrics_store.put(customer_id, json.dumps(customer_data))

      # 3. Update product inventory
      inventory_data_str = product_inventory_store.get(product_id)
      if inventory_data_str:
        inventory_data = json.loads(inventory_data_str)
      else:
        inventory_data = {
          "product_id": product_id,
          "reserved_quantity": 0,
          "sold_quantity": 0,
          "available_stock": 100  # Default stock
        }

      # Update inventory based on order status
      if status == "created":
        inventory_data["reserved_quantity"] += quantity
        inventory_data["available_stock"] -= quantity
      elif status == "shipped":
        inventory_data["reserved_quantity"] -= quantity
        inventory_data["sold_quantity"] += quantity
      elif status == "cancelled":
        inventory_data["reserved_quantity"] -= quantity
        inventory_data["available_stock"] += quantity

      product_inventory_store.put(product_id, json.dumps(inventory_data))

      # Generate comprehensive order summary using all stores
      summary = {
        "order_id": order_id,
        "status": status,
        "customer_total_orders": customer_data["total_orders"],
        "customer_total_spent": customer_data["total_spent"],
        "product_available": inventory_data["available_stock"],
        "product_reserved": inventory_data["reserved_quantity"]
      }

      result = f"ORDER_PROCESSED:{order_id}:status={status}:customer_orders={customer_data['total_orders']}:stock_left={inventory_data['available_stock']}"

      log.info("Processed order {} with status {}: customer has {} orders, product {} has {} stock left", 
               order_id, status, customer_data["total_orders"], product_id, inventory_data["available_stock"])

      return result

    expression: result if result else None
    resultType: string

pipelines:
  multi_store_order_processing:
    from: order_events
    via:
      - type: mapValues
        mapper: process_order_with_multiple_stores
      - type: filter
        if:
          expression: value is not None
    to:
      topic: order_processing_results
      keyType: string
      valueType: string

Key concepts demonstrated:

  • Multiple state stores in single function
  • Coordinated state updates
  • Cross-store data correlation

State Store Types Summary

Store Type Use Case Key Features
Key-Value General state, caching, counters Simple key-to-value mapping
Window Time-based aggregations Automatic time partitioning
Session User sessions, activity tracking Inactivity-based boundaries

Conclusion

Custom state stores in KSML provide powerful capabilities for building stateful stream processing applications. By understanding the different store types, configuration options, and optimization techniques, you can build efficient and scalable applications that maintain state effectively across events.

For foundational concepts and basic configuration patterns, refer back to the State Stores Tutorial.