Skip to content

State Store Reference

State stores enable stateful stream processing in KSML by maintaining data across multiple messages. This reference covers state store configuration, usage patterns, and best practices.

For hands-on tutorials, please check out:

What are State Stores?

State stores allow KSML applications to:

  • Maintain state between messages for aggregations, counts, and joins
  • Track historical information for context-aware processing
  • Build complex business logic that depends on previous events

State Store Types

KSML supports three types of state stores, each optimized for specific use cases:

Type Description Use Cases Examples
keyValue Simple key-value storage General lookups, non-windowed aggregations, manual state management keyValue type state store
window Time-windowed storage with automatic expiry Time-based aggregations, windowed joins, temporal analytics window type state store
session Session-based storage with activity gaps User session tracking, activity-based grouping session type state store

Configuration Methods

1. Pre-defined State Stores

Define once in the stores section, reuse across operations:

stores:
  owner_count_store:
    name: owner_count_store
    type: keyValue
    keyType: string
    valueType: long
    retention: 5m
    caching: false
    persistent: true
    logging: true
stores:
  owner_count_store:
    name: owner_count_store
    type: keyValue
    keyType: string
    valueType: long
    retention: 5m
    caching: false
    persistent: true
    logging: true

Full example:

2. Inline State Stores

Define directly within operations for custom configurations:

      - type: aggregate
        store:
          name: sensor_type_aggregates
          type: keyValue
          keyType: string
          valueType: json
          retention: 3m
          caching: true
          persistent: false
          logging: false

Full example:

Configuration Parameters

Common Parameters (All Store Types)

Parameter Type Required Default Description
name String No Operation name Unique identifier for the state store
type String Yes - Store type: keyValue, window, or session
keyType String Yes - Data type for keys (see Data Types Reference)
valueType String Yes - Data type for values
persistent Boolean No false If true, survives application restarts
caching Boolean No false If true, improves read performance but delays updates
logging Boolean No false If true, creates changelog topic for recovery
timestamped Boolean No false If true, stores timestamp with each entry

Window Store Specific Parameters

Parameter Type Required Default Description
windowSize Duration Yes - Size of the time window (must match operation's window duration)
retention Duration No - How long to retain window data (should be > windowSize + grace period)
retainDuplicates Boolean No false Whether to keep duplicate entries in windows

KeyValue Store Specific Parameters

Parameter Type Required Default Description
versioned Boolean No false If true, maintains version history of values
historyRetention Duration No (Yes if versioned) - How long to keep old versions
segmentInterval Duration No - Segment size for versioned stores

Important: Versioned stores (versioned: true) cannot have caching enabled (caching: false is required).

Session Store Specific Parameters

Parameter Type Required Default Description
retention Duration No - How long to retain session data

Usage in Operations

With Aggregations

State stores are automatically used by aggregation operations:

pipelines:
  calculate_statistics:
    from: payment_stream
    via:
      - type: groupByKey
      - type: aggregate
        store:
          type: keyValue
          retention: 1h
          caching: false

Full example:

With Manual State Access

Access state stores directly in functions for custom caching, enrichment, and state management:

stores:
  user_profile_cache:
    name: user_profile_cache
    type: keyValue
    keyType: string
    valueType: json
    persistent: true
    logging: true
    retention: 1h
functions:
  enrich_and_cache:
    type: valueTransformer
    code: |
      # Get cached data for this user
      cached_data = user_profile_cache.get(key)

Full example

State Store with Manual State Access (click to expand)
# User events producer for manual state store access example
# Generates user activity events to demonstrate cache enrichment

streams:
  user_events:
    topic: user_events
    keyType: string
    valueType: json

functions:
  generate_user_events:
    type: generator
    globalCode: |
      import random
      import time

      users = ["user_001", "user_002", "user_003", "user_004", "user_005"]
      actions = ["login", "view_product", "add_to_cart", "checkout", "logout"]
      products = ["laptop", "phone", "tablet", "monitor", "keyboard"]
      event_counter = 0
    code: |
      global event_counter
      event_counter += 1

      user_id = random.choice(users)
      action = random.choice(actions)

      event = {
        "event_id": f"evt_{event_counter}",
        "user_id": user_id,
        "action": action,
        "timestamp": int(time.time() * 1000)
      }

      # Add product info for relevant actions
      if action in ["view_product", "add_to_cart"]:
        event["product"] = random.choice(products)
        event["price"] = round(random.uniform(50, 2000), 2)

      return (user_id, event)
    resultType: (string, json)

producers:
  user_event_generator:
    generator: generate_user_events
    interval: 2s
    to: user_events
State Store with Manual State Access (click to expand)
# Manual state store access example
# Demonstrates how to directly access state stores in functions for enrichment and caching

streams:
  user_events_input:
    topic: user_events
    keyType: string
    valueType: json
    offsetResetPolicy: latest

  enriched_events:
    topic: enriched_user_events
    keyType: string
    valueType: json

stores:
  user_profile_cache:
    name: user_profile_cache
    type: keyValue
    keyType: string
    valueType: json
    persistent: true
    logging: true
    retention: 1h

functions:
  enrich_and_cache:
    type: valueTransformer
    code: |
      # Get cached data for this user
      cached_data = user_profile_cache.get(key)

      # Log the state access
      if cached_data is not None:
        log.info("Found cached data for user: {}", key)
      else:
        log.info("No cached data found for user: {}", key)

      # Store the current event data in cache
      user_profile_cache.put(key, value)

      # Create enriched result
      result = {
        "event_id": value.get("event_id"),
        "user_id": value.get("user_id"),
        "action": value.get("action"),
        "timestamp": value.get("timestamp"),
        "has_cached_data": cached_data is not None
      }

      # Copy product fields if present
      if value.get("product") is not None:
        result["product"] = value.get("product")
        result["price"] = value.get("price")
    expression: result
    resultType: json
    stores:
      - user_profile_cache

pipelines:
  enrich_user_events:
    from: user_events_input
    via:
      - type: transformValue
        mapper: enrich_and_cache

      - type: peek
        forEach:
          code: |
            log.info("ENRICHED EVENT - User: {}, Action: {}, Has Cached Data: {}", 
                     key, 
                     value.get("action"),
                     value.get("has_cached_data"))

    to: enriched_events

This example demonstrates:

  • Store declaration: List store names in the function's stores parameter
  • State retrieval: Use store_name.get(key) to read cached data
  • State storage: Use store_name.put(key, value) to update cache
  • Null handling: Check if cached_data is not None for first-time detection

Key functions:

  • store.get(key) - Retrieve value from state store (returns None if not found)
  • store.put(key, value) - Store key-value pair in state store

With Session Operations

Session stores track user activity with inactivity gaps and automatic timeout:

pipelines:
  user_session_analysis:
    from: user_clicks
    via:
      - type: groupByKey
      - type: windowBySession
        inactivityGap: 2m  # Close session after 2 minutes of inactivity
        grace: 30s
      - type: count
        store:
          name: user_sessions
          type: session
          retention: 1h
          caching: false

Full example:

Session store patterns:

  • Activity tracking: Monitor user engagement and session duration
  • Timeout detection: Identify inactive sessions for cleanup
  • State aggregation: Accumulate metrics within session boundaries

With Windowed Operations

Window stores require specific configuration that varies by operation type:

For Aggregations (Count, Aggregate, etc.)

pipelines:
  count_clicks_5min:
    from: user_clicks
    via:
      - type: groupByKey
      - type: windowByTime
        windowType: tumbling
        duration: 5m
        grace: 30s
      - type: count
        store:
          name: click_counts_5min
          type: window
          windowSize: 5m          # Must match window duration
          retention: 35m          # windowSize + grace + buffer
          caching: false

For Stream-Stream Joins

pipelines:
  join_streams:
    from: stream1
    via:
      - type: join
        stream: stream2
        timeDifference: 15m
        grace: 5m
        thisStore:
          type: window
          windowSize: 30m         # Must be 2 × timeDifference
          retention: 35m          # 2 × timeDifference + grace
          retainDuplicates: true

Retention Guidelines

For Window Aggregations:

retention = windowSize + gracePeriod + processingBuffer

For Stream-Stream Joins:

windowSize = 2 × timeDifference
retention = 2 × timeDifference + gracePeriod

Full examples:

Performance Considerations

Caching Impact

Setting Behavior Use When
caching: true Batches updates, reduces downstream load High-throughput with acceptable latency
caching: false Immediate emission of all updates Real-time requirements, debugging

Persistence Trade-offs

Setting Pros Cons Use When
persistent: true Survives restarts, enables recovery Slower, uses disk space Production, critical state
persistent: false Fast, memory-only Lost on restart Temporary state, caching