State Stores
State stores are persistent or in-memory storage components used by stateful operations in Kafka Streams to maintain intermediate results, aggregations, and other stateful data. KSML provides flexible configuration options for state stores, allowing you to optimize performance, durability, and storage characteristics for your specific use cases.
Prerequisites
Before starting this tutorial:
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_ownership_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic owner_sensor_counts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_type_totals && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic owner_counts && \
Understanding State Stores in Kafka Streams
State stores serve several critical purposes in stream processing:
- Aggregations: Store running totals, counts, averages, and other aggregate calculations
- Windowing: Maintain data within time windows for temporal analysis
- Joins: Cache data from one stream to join with another
- Deduplication: Track processed records to eliminate duplicates
- State Management: Persist application state across restarts
State stores can be backed by RocksDB (persistent) or kept in memory (non-persistent). Both types can optionally use changelog topics for fault tolerance and recovery.
State Store Types
KSML supports three types of state stores. For detailed information about each type and their parameters, see the State Store Reference.
Configuration Methods
KSML provides two ways to configure state stores:
1. Predefined Store Configuration
Define stores in the global stores
section and reference them by name:
Sensor Ownership Producer (click to expand)
functions:
generate_sensor_data:
type: generator
globalCode: |
import random
import time
data_counter = 0
owners = []
owners.extend(["alice", "bob", "charlie", "diana", "eve"])
sensor_types = []
sensor_types.extend(["temperature", "humidity", "pressure", "light"])
code: |
global data_counter, owners, sensor_types
data_counter += 1
owner = random.choice(owners)
sensor_type = random.choice(sensor_types)
sensor_id = sensor_type + "_" + owner + "_" + str(random.randint(1, 3)).zfill(2)
value = {}
value["sensor_id"] = sensor_id
value["owner"] = owner
value["sensor_type"] = sensor_type
value["value"] = round(random.uniform(10.0, 100.0), 2)
value["timestamp"] = int(time.time() * 1000)
key = sensor_id
expression: (key, value)
resultType: (string, json)
producers:
sensor_producer:
generator: generate_sensor_data
interval: 2s
to:
topic: sensor_ownership_data
keyType: string
valueType: json
Predefined Store Processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json
# This example demonstrates predefined state store configuration for counting sensors by owner
streams:
sensor_source:
topic: sensor_ownership_data
keyType: string
valueType: json
offsetResetPolicy: latest
owner_counts:
topic: owner_sensor_counts
keyType: string
valueType: long
stores:
owner_count_store:
name: owner_count_store
type: keyValue
keyType: string
valueType: long
retention: 5m
caching: false
persistent: true
logging: true
pipelines:
count_by_owner:
from: sensor_source
via:
- type: groupBy
name: group_by_owner
mapper:
code: |
if value is None:
return "unknown"
if not "owner" in value:
return "unknown"
expression: value["owner"]
resultType: string
- type: count
store: owner_count_store
- type: toStream
- type: peek
forEach:
code: |
log.info("PREDEFINED STORE - Owner: {}, sensor count: {}", key, value)
to: owner_counts
This example demonstrates:
- Predefined store configuration: The
owner_count_store
is defined in the globalstores
section - Store reference: The
count
operation references the predefined store by name - Persistent storage: Data survives application restarts (
persistent: true
) - Changelog logging: State changes are replicated to a Kafka changelog topic for fault tolerance and recovery (
logging: true
)
You won't be able to read the output in owner_sensor_counts
topic because it is binary, to read it you can consume the messages like this:
# Read owner sensor counts (Long values require specific deserializer)
docker exec broker /opt/bitnami/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker:9093 \
--topic owner_sensor_counts \
--from-beginning \
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
--value-deserializer org.apache.kafka.common.serialization.LongDeserializer \
--property print.key=true \
--property key.separator=": " \
--timeout-ms 10000
2. Inline Store Configuration
Define stores directly within operations for single-use scenarios:
Sensor Ownership Producer (click to expand)
functions:
generate_sensor_data:
type: generator
globalCode: |
import random
import time
data_counter = 0
owners = []
owners.extend(["alice", "bob", "charlie", "diana", "eve"])
sensor_types = []
sensor_types.extend(["temperature", "humidity", "pressure", "light"])
code: |
global data_counter, owners, sensor_types
data_counter += 1
owner = random.choice(owners)
sensor_type = random.choice(sensor_types)
sensor_id = sensor_type + "_" + owner + "_" + str(random.randint(1, 3)).zfill(2)
value = {}
value["sensor_id"] = sensor_id
value["owner"] = owner
value["sensor_type"] = sensor_type
value["value"] = round(random.uniform(10.0, 100.0), 2)
value["timestamp"] = int(time.time() * 1000)
key = sensor_id
expression: (key, value)
resultType: (string, json)
producers:
sensor_producer:
generator: generate_sensor_data
interval: 2s
to:
topic: sensor_ownership_data
keyType: string
valueType: json
Inline Store Processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json
# This example demonstrates inline state store configuration for aggregating sensor data by type
streams:
sensor_source:
topic: sensor_ownership_data
keyType: string
valueType: json
sensor_type_totals:
topic: sensor_type_totals
keyType: string
valueType: json
functions:
initialize_sum:
type: initializer
expression:
total_value: 0.0
count: 0
resultType: json
update_sum:
type: aggregator
code: |
# Initialize if first time
if aggregatedValue is None:
aggregatedValue = {}
aggregatedValue["total_value"] = 0.0
aggregatedValue["count"] = 0
# Update totals
aggregatedValue["total_value"] += value.get("value", 0.0)
aggregatedValue["count"] += 1
# Calculate average
aggregatedValue["average"] = round(aggregatedValue["total_value"] / aggregatedValue["count"], 2)
result = aggregatedValue
expression: result
resultType: json
pipelines:
aggregate_by_sensor_type:
from: sensor_source
via:
- type: groupBy
name: group_by_sensor_type
mapper:
code: |
if value is None:
return "unknown"
if not "sensor_type" in value:
return "unknown"
expression: value["sensor_type"]
resultType: string
- type: aggregate
store:
name: sensor_type_aggregates
type: keyValue
keyType: string
valueType: json
retention: 3m
caching: true
persistent: false
logging: false
initializer: initialize_sum
aggregator: update_sum
- type: toStream
- type: peek
forEach:
code: |
log.info("INLINE STORE - Type: {}, avg: {}, count: {}", key, value.get("average"), value.get("count"))
to: sensor_type_totals
This example demonstrates:
- Inline store configuration: Store defined directly within the
aggregate
operation - Custom aggregation: Uses initializer and aggregator functions for complex calculations
- Memory-only storage: Non-persistent storage for temporary calculations (
persistent: false
) - Caching enabled: Improves performance by batching state updates (
caching: true
)
Configuration Parameters
For a complete list of configuration parameters for all store types, see the State Store Reference - Configuration Parameters.
Best Practices
When to Use Predefined Stores
- Multiple operations need to access the same store
- Store configuration is complex or frequently reused
- You want centralized store management for maintainability
When to Use Inline Stores
- Store is used by a single operation
- Simple, one-off configurations
- Prototyping or small applications
Memory Management
- Set appropriate retention periods to prevent unbounded growth
- Use non-persistent stores for temporary calculations
- Monitor memory usage, especially with caching enabled
Fault Tolerance
- Enable logging for critical business data
- Use persistent stores for data that must survive restarts
- Consider the trade-off between durability and performance
State stores are a powerful feature in KSML that enable sophisticated stateful stream processing patterns. By understanding the configuration options and trade-offs, you can build robust and efficient streaming applications.
Next Steps
Ready to explore more advanced state store patterns? Continue with:
- Custom State Stores Tutorial - Learn advanced patterns including window stores, session stores, multi-store coordination, and optimization techniques for production applications