State Store Reference
State stores enable stateful stream processing in KSML by maintaining data across multiple messages. This reference covers state store configuration, usage patterns, and best practices.
For hands-on tutorials, please check out:
What are State Stores?
State stores allow KSML applications to:
- Maintain state between messages for aggregations, counts, and joins
- Track historical information for context-aware processing
- Build complex business logic that depends on previous events
State Store Types
KSML supports three types of state stores, each optimized for specific use cases:
Type | Description | Use Cases | Examples |
---|---|---|---|
keyValue |
Simple key-value storage | General lookups, non-windowed aggregations, manual state management | keyValue type state store |
window |
Time-windowed storage with automatic expiry | Time-based aggregations, windowed joins, temporal analytics | window type state store |
session |
Session-based storage with activity gaps | User session tracking, activity-based grouping | session type state store |
Configuration Methods
1. Pre-defined State Stores
Define once in the stores
section, reuse across operations:
stores:
owner_count_store:
name: owner_count_store
type: keyValue
keyType: string
valueType: long
retention: 5m
caching: false
persistent: true
logging: true
stores:
owner_count_store:
name: owner_count_store
type: keyValue
keyType: string
valueType: long
retention: 5m
caching: false
persistent: true
logging: true
Full example:
2. Inline State Stores
Define directly within operations for custom configurations:
- type: aggregate
store:
name: sensor_type_aggregates
type: keyValue
keyType: string
valueType: json
retention: 3m
caching: true
persistent: false
logging: false
Full example:
Configuration Parameters
Common Parameters (All Store Types)
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
name |
String | No | Operation name | Unique identifier for the state store |
type |
String | Yes | - | Store type: keyValue , window , or session |
keyType |
String | Yes | - | Data type for keys (see Data Types Reference) |
valueType |
String | Yes | - | Data type for values |
persistent |
Boolean | No | false |
If true , survives application restarts |
caching |
Boolean | No | false |
If true , improves read performance but delays updates |
logging |
Boolean | No | false |
If true , creates changelog topic for recovery |
timestamped |
Boolean | No | false |
If true , stores timestamp with each entry |
Window Store Specific Parameters
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
windowSize |
Duration | Yes | - | Size of the time window (must match operation's window duration) |
retention |
Duration | No | - | How long to retain window data (should be > windowSize + grace period) |
retainDuplicates |
Boolean | No | false |
Whether to keep duplicate entries in windows |
KeyValue Store Specific Parameters
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
versioned |
Boolean | No | false |
If true , maintains version history of values |
historyRetention |
Duration | No (Yes if versioned) | - | How long to keep old versions |
segmentInterval |
Duration | No | - | Segment size for versioned stores |
Important: Versioned stores (versioned: true
) cannot have caching enabled (caching: false
is required).
Session Store Specific Parameters
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
retention |
Duration | No | - | How long to retain session data |
Usage in Operations
With Aggregations
State stores are automatically used by aggregation operations:
pipelines:
calculate_statistics:
from: payment_stream
via:
- type: groupByKey
- type: aggregate
store:
type: keyValue
retention: 1h
caching: false
Full example:
With Manual State Access
Access state stores directly in functions for custom caching, enrichment, and state management:
stores:
user_profile_cache:
name: user_profile_cache
type: keyValue
keyType: string
valueType: json
persistent: true
logging: true
retention: 1h
functions:
enrich_and_cache:
type: valueTransformer
code: |
# Get cached data for this user
cached_data = user_profile_cache.get(key)
Full example
State Store with Manual State Access (click to expand)
# User events producer for manual state store access example
# Generates user activity events to demonstrate cache enrichment
streams:
user_events:
topic: user_events
keyType: string
valueType: json
functions:
generate_user_events:
type: generator
globalCode: |
import random
import time
users = ["user_001", "user_002", "user_003", "user_004", "user_005"]
actions = ["login", "view_product", "add_to_cart", "checkout", "logout"]
products = ["laptop", "phone", "tablet", "monitor", "keyboard"]
event_counter = 0
code: |
global event_counter
event_counter += 1
user_id = random.choice(users)
action = random.choice(actions)
event = {
"event_id": f"evt_{event_counter}",
"user_id": user_id,
"action": action,
"timestamp": int(time.time() * 1000)
}
# Add product info for relevant actions
if action in ["view_product", "add_to_cart"]:
event["product"] = random.choice(products)
event["price"] = round(random.uniform(50, 2000), 2)
return (user_id, event)
resultType: (string, json)
producers:
user_event_generator:
generator: generate_user_events
interval: 2s
to: user_events
State Store with Manual State Access (click to expand)
# Manual state store access example
# Demonstrates how to directly access state stores in functions for enrichment and caching
streams:
user_events_input:
topic: user_events
keyType: string
valueType: json
offsetResetPolicy: latest
enriched_events:
topic: enriched_user_events
keyType: string
valueType: json
stores:
user_profile_cache:
name: user_profile_cache
type: keyValue
keyType: string
valueType: json
persistent: true
logging: true
retention: 1h
functions:
enrich_and_cache:
type: valueTransformer
code: |
# Get cached data for this user
cached_data = user_profile_cache.get(key)
# Log the state access
if cached_data is not None:
log.info("Found cached data for user: {}", key)
else:
log.info("No cached data found for user: {}", key)
# Store the current event data in cache
user_profile_cache.put(key, value)
# Create enriched result
result = {
"event_id": value.get("event_id"),
"user_id": value.get("user_id"),
"action": value.get("action"),
"timestamp": value.get("timestamp"),
"has_cached_data": cached_data is not None
}
# Copy product fields if present
if value.get("product") is not None:
result["product"] = value.get("product")
result["price"] = value.get("price")
expression: result
resultType: json
stores:
- user_profile_cache
pipelines:
enrich_user_events:
from: user_events_input
via:
- type: transformValue
mapper: enrich_and_cache
- type: peek
forEach:
code: |
log.info("ENRICHED EVENT - User: {}, Action: {}, Has Cached Data: {}",
key,
value.get("action"),
value.get("has_cached_data"))
to: enriched_events
This example demonstrates:
- Store declaration: List store names in the function's
stores
parameter - State retrieval: Use
store_name.get(key)
to read cached data - State storage: Use
store_name.put(key, value)
to update cache - Null handling: Check if
cached_data is not None
for first-time detection
Key functions:
store.get(key)
- Retrieve value from state store (returns None if not found)store.put(key, value)
- Store key-value pair in state store
With Session Operations
Session stores track user activity with inactivity gaps and automatic timeout:
pipelines:
user_session_analysis:
from: user_clicks
via:
- type: groupByKey
- type: windowBySession
inactivityGap: 2m # Close session after 2 minutes of inactivity
grace: 30s
- type: count
store:
name: user_sessions
type: session
retention: 1h
caching: false
Full example:
Session store patterns:
- Activity tracking: Monitor user engagement and session duration
- Timeout detection: Identify inactive sessions for cleanup
- State aggregation: Accumulate metrics within session boundaries
With Windowed Operations
Window stores require specific configuration that varies by operation type:
For Aggregations (Count, Aggregate, etc.)
pipelines:
count_clicks_5min:
from: user_clicks
via:
- type: groupByKey
- type: windowByTime
windowType: tumbling
duration: 5m
grace: 30s
- type: count
store:
name: click_counts_5min
type: window
windowSize: 5m # Must match window duration
retention: 35m # windowSize + grace + buffer
caching: false
For Stream-Stream Joins
pipelines:
join_streams:
from: stream1
via:
- type: join
stream: stream2
timeDifference: 15m
grace: 5m
thisStore:
type: window
windowSize: 30m # Must be 2 × timeDifference
retention: 35m # 2 × timeDifference + grace
retainDuplicates: true
Retention Guidelines
For Window Aggregations:
For Stream-Stream Joins:
Full examples:
Performance Considerations
Caching Impact
Setting | Behavior | Use When |
---|---|---|
caching: true |
Batches updates, reduces downstream load | High-throughput with acceptable latency |
caching: false |
Immediate emission of all updates | Real-time requirements, debugging |
Persistence Trade-offs
Setting | Pros | Cons | Use When |
---|---|---|---|
persistent: true |
Survives restarts, enables recovery | Slower, uses disk space | Production, critical state |
persistent: false |
Fast, memory-only | Lost on restart | Temporary state, caching |