Skip to content

State Stores

State stores are persistent or in-memory storage components used by stateful operations in Kafka Streams to maintain intermediate results, aggregations, and other stateful data. KSML provides flexible configuration options for state stores, allowing you to optimize performance, durability, and storage characteristics for your specific use cases.

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_ownership_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic owner_sensor_counts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_type_totals && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic owner_counts && \

Understanding State Stores in Kafka Streams

State stores serve several critical purposes in stream processing:

  • Aggregations: Store running totals, counts, averages, and other aggregate calculations
  • Windowing: Maintain data within time windows for temporal analysis
  • Joins: Cache data from one stream to join with another
  • Deduplication: Track processed records to eliminate duplicates
  • State Management: Persist application state across restarts

State stores can be backed by RocksDB (persistent) or kept in memory (non-persistent). Both types can optionally use changelog topics for fault tolerance and recovery.

State Store Types

KSML supports three types of state stores. For detailed information about each type and their parameters, see the State Store Reference.

Configuration Methods

KSML provides two ways to configure state stores:

1. Predefined Store Configuration

Define stores in the global stores section and reference them by name:

Sensor Ownership Producer (click to expand)
functions:
  generate_sensor_data:
    type: generator
    globalCode: |
      import random
      import time
      data_counter = 0
      owners = []
      owners.extend(["alice", "bob", "charlie", "diana", "eve"])
      sensor_types = []
      sensor_types.extend(["temperature", "humidity", "pressure", "light"])
    code: |
      global data_counter, owners, sensor_types

      data_counter += 1
      owner = random.choice(owners)
      sensor_type = random.choice(sensor_types)
      sensor_id = sensor_type + "_" + owner + "_" + str(random.randint(1, 3)).zfill(2)

      value = {}
      value["sensor_id"] = sensor_id
      value["owner"] = owner
      value["sensor_type"] = sensor_type
      value["value"] = round(random.uniform(10.0, 100.0), 2)
      value["timestamp"] = int(time.time() * 1000)

      key = sensor_id
    expression: (key, value)
    resultType: (string, json)

producers:
  sensor_producer:
    generator: generate_sensor_data
    interval: 2s
    to:
      topic: sensor_ownership_data
      keyType: string
      valueType: json
Predefined Store Processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json

# This example demonstrates predefined state store configuration for counting sensors by owner

streams:
  sensor_source:
    topic: sensor_ownership_data
    keyType: string
    valueType: json
    offsetResetPolicy: latest

  owner_counts:
    topic: owner_sensor_counts
    keyType: string
    valueType: long

stores:
  owner_count_store:
    name: owner_count_store
    type: keyValue
    keyType: string
    valueType: long
    retention: 5m
    caching: false
    persistent: true
    logging: true

pipelines:
  count_by_owner:
    from: sensor_source
    via:
      - type: groupBy
        name: group_by_owner
        mapper:
          code: |
            if value is None:
              return "unknown"
            if not "owner" in value:
              return "unknown"
          expression: value["owner"]
          resultType: string
      - type: count
        store: owner_count_store
      - type: toStream
      - type: peek
        forEach:
          code: |
            log.info("PREDEFINED STORE - Owner: {}, sensor count: {}", key, value)
    to: owner_counts

This example demonstrates:

  • Predefined store configuration: The owner_count_store is defined in the global stores section
  • Store reference: The count operation references the predefined store by name
  • Persistent storage: Data survives application restarts (persistent: true)
  • Changelog logging: State changes are replicated to a Kafka changelog topic for fault tolerance and recovery (logging: true)

You won't be able to read the output in owner_sensor_counts topic because it is binary, to read it you can consume the messages like this:

# Read owner sensor counts (Long values require specific deserializer)
docker exec broker /opt/bitnami/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server broker:9093 \
  --topic owner_sensor_counts \
  --from-beginning \
  --key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
  --value-deserializer org.apache.kafka.common.serialization.LongDeserializer \
  --property print.key=true \
  --property key.separator=": " \
  --timeout-ms 10000

2. Inline Store Configuration

Define stores directly within operations for single-use scenarios:

Sensor Ownership Producer (click to expand)
functions:
  generate_sensor_data:
    type: generator
    globalCode: |
      import random
      import time
      data_counter = 0
      owners = []
      owners.extend(["alice", "bob", "charlie", "diana", "eve"])
      sensor_types = []
      sensor_types.extend(["temperature", "humidity", "pressure", "light"])
    code: |
      global data_counter, owners, sensor_types

      data_counter += 1
      owner = random.choice(owners)
      sensor_type = random.choice(sensor_types)
      sensor_id = sensor_type + "_" + owner + "_" + str(random.randint(1, 3)).zfill(2)

      value = {}
      value["sensor_id"] = sensor_id
      value["owner"] = owner
      value["sensor_type"] = sensor_type
      value["value"] = round(random.uniform(10.0, 100.0), 2)
      value["timestamp"] = int(time.time() * 1000)

      key = sensor_id
    expression: (key, value)
    resultType: (string, json)

producers:
  sensor_producer:
    generator: generate_sensor_data
    interval: 2s
    to:
      topic: sensor_ownership_data
      keyType: string
      valueType: json
Inline Store Processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json

# This example demonstrates inline state store configuration for aggregating sensor data by type

streams:
  sensor_source:
    topic: sensor_ownership_data
    keyType: string
    valueType: json

  sensor_type_totals:
    topic: sensor_type_totals
    keyType: string
    valueType: json

functions:
  initialize_sum:
    type: initializer
    expression:
      total_value: 0.0
      count: 0
    resultType: json

  update_sum:
    type: aggregator
    code: |
      # Initialize if first time
      if aggregatedValue is None:
        aggregatedValue = {}
        aggregatedValue["total_value"] = 0.0
        aggregatedValue["count"] = 0

      # Update totals
      aggregatedValue["total_value"] += value.get("value", 0.0)
      aggregatedValue["count"] += 1

      # Calculate average
      aggregatedValue["average"] = round(aggregatedValue["total_value"] / aggregatedValue["count"], 2)

      result = aggregatedValue
    expression: result
    resultType: json

pipelines:
  aggregate_by_sensor_type:
    from: sensor_source
    via:
      - type: groupBy
        name: group_by_sensor_type
        mapper:
          code: |
            if value is None:
              return "unknown"  
            if not "sensor_type" in value:
              return "unknown"
          expression: value["sensor_type"]
          resultType: string
      - type: aggregate
        store:
          name: sensor_type_aggregates
          type: keyValue
          keyType: string
          valueType: json
          retention: 3m
          caching: true
          persistent: false
          logging: false
        initializer: initialize_sum
        aggregator: update_sum
      - type: toStream
      - type: peek
        forEach:
          code: |
            log.info("INLINE STORE - Type: {}, avg: {}, count: {}", key, value.get("average"), value.get("count"))
    to: sensor_type_totals

This example demonstrates:

  • Inline store configuration: Store defined directly within the aggregate operation
  • Custom aggregation: Uses initializer and aggregator functions for complex calculations
  • Memory-only storage: Non-persistent storage for temporary calculations (persistent: false)
  • Caching enabled: Improves performance by batching state updates (caching: true)

Configuration Parameters

For a complete list of configuration parameters for all store types, see the State Store Reference - Configuration Parameters.

Best Practices

When to Use Predefined Stores

  • Multiple operations need to access the same store
  • Store configuration is complex or frequently reused
  • You want centralized store management for maintainability

When to Use Inline Stores

  • Store is used by a single operation
  • Simple, one-off configurations
  • Prototyping or small applications

Memory Management

  • Set appropriate retention periods to prevent unbounded growth
  • Use non-persistent stores for temporary calculations
  • Monitor memory usage, especially with caching enabled

Fault Tolerance

  • Enable logging for critical business data
  • Use persistent stores for data that must survive restarts
  • Consider the trade-off between durability and performance

State stores are a powerful feature in KSML that enable sophisticated stateful stream processing patterns. By understanding the configuration options and trade-offs, you can build robust and efficient streaming applications.

Next Steps

Ready to explore more advanced state store patterns? Continue with:

  • Custom State Stores Tutorial - Learn advanced patterns including window stores, session stores, multi-store coordination, and optimization techniques for production applications