Skip to the content.
  • Home
  • Introduction
  • Stream Types
  • Functions
  • Pipelines
  • Operations
  • Data Types
  • Runners
  • Language Specification
  • State Stores

    Introduction

    Several stream operations use state stores to retain (intermediate) results of their calculations. State stores are typically configured with some additional parameters to limit their data storage (retention time) and/or determine when data is emitted from them to the next stream operation.

    stores:
      owner_count_store:
        name: owner_count
        retention: 3m
        caching: false
    

    Configuration

    State store configurations are defined by the following tags:

    Parameter Value Type Default Required Description
    name string none Optional The name of the state store. This field is not mandatory, but operations that use the state store configuration will require a name for their store. If the store configuration does not specify an explicit name, then the operation will default back to the operation’s name, specified with its name attribute. If that name is unspecified, then an exception will be thrown. In general, it is considered good practice to always specify the store name explicitly with its definition.
    type string none Required The type of the state store. Possible types are keyValue, session and window.
    persistent boolean false Optional true if the state store should be retained on disk. See [link] for more information on how Kafka Streams maintains state store state in a state directory. When this parameter is false or undefined, the state store is (re)built up in memory during upon KSML start.
    timestamped boolean false Optional (Only relevant for keyValue and window stores) true if all messages in the state store need to be timestamped. This effectively changes the state store from type <key, value> to <key, timestamp+value>. The timestamp contains the last timestamp that updated the aggregated value in the window.
    versioned boolean false Optional (Only relevant for keyValue stores) true if elements in the store are versioned, false otherwise
    keyType string none Required The key type of the state store. See Types for more information.
    valueType string none Required The value type of the state store. See Types for more information.
    caching boolean false Optional This parameter controls the internal state store caching. When true, the state store caches entries and does not emit every state change but only. When false all changes to the state store will be emitted immediately.
    logging boolean false Optional This parameter determines whether state changes are written out to a changelog topic, or not. When true all state store changes are produced to a changelog topic. The changelog topic is named appId-storeName-changelog. When false no changelog topic is written to.

    Example:

    stores:
      owner_count_store:
        name: owner_count
        retention: 3m
        caching: false
    
    pipelines:
      main:
        from: sensor_source
        via:
          - type: groupBy
            name: ksml_sensordata_grouped
            mapper:
              expression: value["owner"]
              resultType: string
          - type: windowedBy
            windowType: time
            duration: 20s
            grace: 40s
          - type: aggregate
            store: owner_count_store     # refer to predefined store configuration above
            initializer:
              expression: 0
              resultType: long
            aggregator:
              expression: aggregatedValue+1
              resultType: long
    

    Instead of referring to predefined state store configurations, you may also use an inline definition for the store:

          - type: aggregate
            store:
              name: owner_count
              retention: 3m
              caching: false
            initializer:
              expression: 0
              resultType: long
            aggregator:
              expression: aggregatedValue+1
              resultType: long