Skip to the content.

« Back to index

Operations

Table of Contents

  1. Introduction
  2. Operations
  3. Sink Operations

Introduction

Pipelines in KSML have a beginning, a middle and (optionally) an end. Operations form the middle part of pipelines. They are modeled as separate YAML entities, where each operation takes input from the previous operation and applies its own logic. The returned stream then serves as input for the next operation.

Transform Operations

Transformations are operations that take an input stream and convert it to an output stream. This section lists all supported transformations. Each one states the type of stream it returns.

Parameter Value Type Description
name string The name of the transformation operation.

Note that not all combinations of output/input streams are supported by Kafka Streams. The user that writes the KSML definition needs to make sure that streams that result from one operations can actually serve as input to the next. KSML does type checking and will exit with an error when operations that can not be chained together are listed after another in the KSML definition.

aggregate

This operations aggregates multiple values into a single one by repeatedly calling an aggregator function. It can operate on a range of stream types.

Stream Type Returns Parameter Value Type Required Description
KGroupedStream<K,V> KTable<K,VR> store Store configuration No The Store configuration.
    initializer Inline or reference Yes The Initializer function.
    aggregator Inline or reference Yes The Aggregator function.
KGroupedTable<K,V> KTable<K,VR> store Store configuration No The Store configuration.
    initializer Inline or reference Yes The Initializer function.
    adder Inline or reference Yes The Reducer function that adds two values.
    subtractor Inline or reference Yes The Reducer function that subtracts two values.
SessionWindowedKStream<K,V> KTable<Windowed<K>,VR> store Store configuration No The Store configuration.
    initializer Inline or reference Yes The Initializer function.
    aggregator Inline or reference Yes The Aggregator function.
    merger Inline or reference Yes The Merger function.
TimeWindowedKStreamObject<K,V> KTable<Windowed<K>,VR> store Store configuration No The Store configuration.
    initializer Inline or reference Yes The Initializer function.
    aggregator Inline or reference Yes The Aggregator function.

Example:

from: input_stream
via:
  - type: groupBy
    mapper: my_mapper_function
  - type: aggregate
    initializer:
      expression: 0
    aggregator:
      expression: value1+value2
  - type: toStream
to: output_stream

convertKey

This built-in operation takes a message and converts the key into a given type.

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<KR,V> into string The type to convert to

Example:

from: input_stream
via:
  - type: convertKey
    into: string
to: output_stream

convertKeyValue

This built-in operation takes a message and converts the key and value into a given type.

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<KR,VR> into string The type to convert to

Example:

from: input_stream
via:
  - type: convertKeyValue
    into: (string,avro:SensorData)
to: output_stream

convertValue

This built-in operation takes a message and converts the value into a given type.

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<KR,V> into string The type to convert to

Example:

from: input_stream
via:
  - type: convertValue
    into: avro:SensorData
to: output_stream

count

This operations counts the number of messages and returns a table multiple values into a single one by repeatedly calling an aggregator function. It can operate on a range of stream types.

Stream Type Returns Parameter Value Type Required Description
KGroupedStream<K,V> KTable<K,Long> store Store configuration No The Store configuration.
KGroupedTable<K,V> KTable<K,Long> store Store configuration No The Store configuration.
SessionWindowedKStream<K,V> KTable<Windowed<K>,Long> store Store configuration No The Store configuration.
TimeWindowedKStreamObject<K,V> KTable<Windowed<K>,Long> store Store configuration No The Store configuration.

Example:

from: input_stream
via:
  - type: groupBy
    mapper: my_mapper_function
  - type: count
  - type: toStream
to: output_stream

filter

Filter all incoming messages according to some predicate. The predicate function is called for every message. Only when the predicate returns true, then the message will be sent to the output stream.

Stream Type Returns Parameter Value Type Required Description
KStream<K,V> KStream<K,V> predicate Yes Inline or reference The Predicate function.
KTable<K,V> KTable<K,V> predicate Yes Inline or reference The Predicate function.

Example:

from: input_stream
via:
  - type: filter
    predicate: my_filter_function
  - type: filter
    predicate:
      expression: key.startswith('a')
to: output_stream

filterNot

This transformation works exactly like filter, but negates all predicates before applying them. See filter for details on how to implement.

groupBy

Group the records of a stream on a new key that is selected using the provided KeyValueMapper.

Stream Type Returns Parameter Value Type Required Description
KStream<K,V> KGroupedStream<K,V> store Store configuration No The Store configuration.
    mapper Inline or reference Yes The KeyValueMapper function.
KTable<K,V> KGroupedTable<K,V> store Store configuration No The Store configuration.
    mapper Inline or reference Yes The KeyValueMapper function.

Example:

from: input_stream
via:
  - type: groupBy
    mapper: my_mapper_function
  - type: aggregate
    initializer:
      expression: 0
    aggregator:
      expression: value1+value2
  - type: toStream
to: output_stream

groupByKey

Group the records of a stream on the stream’s key.

Stream Type Returns Parameter Value Type Required Description
KStream<K,V> KGroupedStream<K,V> store Store configuration No The Store configuration.
    mapper Inline or reference Yes The KeyValueMapper function.

Example:

from: input_stream
via:
  - type: groupByKey
  - type: aggregate
    initializer:
      expression: 0
    aggregator:
      expression: value1+value2
  - type: toStream
to: output_stream

join

Join records of this stream with another stream’s records using windowed inner equi join. The join is computed on the records’ key with join predicate thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

Stream Type Returns Parameter Value Type Required Description
KStream<K,V> KStream<K,V> store Store configuration No The Store configuration.
    stream string Yes The name of the stream to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.
    duration string Yes The Duration of the windows to join.
KStream<K,V> KStream<K,V> store Store configuration No The Store configuration.
    table string Yes The name of the table to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.
    duration string Yes The Duration of the windows to join.
KStream<K,V> KStream<K,V> store Store configuration No The Store configuration.
    globalTable string Yes The name of the global table to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.
    duration string Yes The Duration of the windows to join.
KTable<K,V> KTable<K,V> store Store configuration No The Store configuration.
    table string Yes The name of the table to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.

Example:

from: input_stream
via:
  - type: join
    stream: second_stream
    valueJoiner: my_key_value_mapper
    duration: 1m
to: output_stream

leftJoin

Join records of this stream with another stream’s records using windowed left equi join. In contrast to inner-join, all records from this stream will produce at least one output record. The join is computed on the records’ key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

Stream Type Returns Parameter Value Type Required Description
KStream<K,V> KStream<K,V> store Store configuration No The Store configuration.
    stream string Yes The name of the stream to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.
    duration string Yes The Duration of the windows to join.
KStream<K,V> KStream<K,V> store Store configuration No The Store configuration.
    table string Yes The name of the table to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.
    duration string Yes The Duration of the windows to join.
KStream<K,V> KStream<K,V> store Store configuration No The Store configuration.
    globalTable string Yes The name of the global table to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.
    duration string Yes The Duration of the windows to join.
KTable<K,V> KTable<K,V> store Store configuration No The Store configuration.
    table string Yes The name of the table to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.

Example:

[yaml]
----
from: input_stream
via:
  - type: leftJoin
    stream: second_stream
    valueJoiner: my_join_function
    duration: 1m
to: output_stream

map

This is an alias for transformKeyValue.

mapKey

This is an alias for transformKey.

mapValues

This is an alias for transformValue.

merge

Merge this stream and the given stream into one larger stream. There is no ordering guarantee between records from this stream and records from the provided stream in the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<K,V> stream string The name of the stream to merge with.

Example:

from: input_stream
via:
  - type: merge
    stream: second_stream
to: output_stream

outerJoin

Join records of this stream with another stream’s records using windowed outer equi join. In contrast to inner-join or left-join, all records from both streams will produce at least one output record. The join is computed on the records’ key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

Stream Type Returns Parameter Value Type Required Description
KStream<K,V> KStream<K,V> store Store configuration No The Store configuration.
    stream string Yes The name of the stream to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.
    duration string Yes The Duration of the windows to join.
KTable<K,V> KTable<K,V> store Store configuration No The Store configuration.
    table string Yes The name of the table to join with.
    valueJoiner Inline or reference Yes The KeyValueMapper function.

Example:

[yaml]
----
from: input_stream
via:
  - type: outerJoin
    stream: second_stream
    valueJoiner: my_join_function
    duration: 1m
to: output_stream

peek

Perform an action on each record of a stream. This is a stateless record-by-record operation. Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<K,V> forEach Inline or reference The [ForEach] function that will be called for every message.

Example:

from: input_stream
via:
  - type: peek
    forEach: print_key_and_value
to: output_stream

reduce

Combine the values of records in this stream by the grouped key. Records with null key or value are ignored. Combining implies that the type of the aggregate result is the same as the type of the input value, similar to aggregate(Initializer, Aggregator).

Stream Type Returns Parameter Value Type Required Description
KGroupedStream<K,V> KTable<K,VR> store Store configuration No The Store configuration.
    reducer Inline or reference Yes The Reducer function.
KGroupedTable<K,V> KTable<K,VR> store Store configuration No The Store configuration.
    adder Inline or reference Yes The Reducer function that adds two values.
    subtractor Inline or reference Yes The Reducer function that subtracts two values.
SessionWindowedKStream<K,V> KTable<Windowed<K>,VR> store Store configuration No The Store configuration.
    reducer Inline or reference Yes The Reducer function.
TimeWindowedKStreamObject<K,V> KTable<Windowed<K>,VR> store Store configuration No The Store configuration.
    initializer Inline or reference Yes The Reducer function.

Example:

[yaml]
----
from: input_stream
via:
  - type: groupBy
    mapper: my_mapper_function
  - type: reduce
    reducer:
      expression: value1+value2
  - type: toStream
to: output_stream

repartition

Materialize this stream to an auto-generated repartition topic and create a new KStream from the auto-generated topic using key serde, value serde, StreamPartitioner, number of partitions, and topic name part. The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. The topic will be named as “${applicationId}--repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "" is either provided via Repartitioned.as(String) or an internally generated name, and "-repartition" is a fixed suffix.

Stream Type Returns Parameter Value Type Required Description
KStream<K,V> KStream<K,V> store Store configuration No The Store configuration.
    name string Yes The name used as part of repartition topic and processor name.
    partitioner Inline or reference Yes The StreamPartitioner function.

Example:

from: input_stream
via:
  - type: repartition
    name: my_repartitioner
    partitioner: my_own_partitioner
  - type: peek
    forEach: print_key_and_value
  - type: toStream
to: output_stream

selectKey

This is an alias for transformKey.

suppress

Suppress some updates from this changelog stream, determined by the supplied Suppressed configuration. When windowCloses is selected and no further restrictions are provided, then this is interpreted as Suppressed.untilWindowCloses(unbounded()).

Stream Type Returns Parameter Value Type Description
KTable<K,V> KTable<K,V> until string This value can either be timeLimit or windowCloses. Note that timeLimit suppression works on any stream, while windowCloses suppression works only on Windowed streams. For the latter, see windowedBy.
    duration string The Duration to suppress updates (only when until==timeLimit)
    maxBytes int (Optional) The maximum number of bytes to suppress updates
    maxRecords int (Optional) The maximum number of records to suppress updates
    bufferFullStrategy string (Optional) Can be one of emitEarlyWhenFull, shutdownWhenFull

Example:

from: input_table
via:
  - type: suppress
    until: timeLimit
    duration: 30s
    maxBytes: 128000
    maxRecords: 10000
    bufferFullStrategy: emitEarlyWhenFull
  - type: peek
    forEach: print_key_and_value
  - type: toStream
to: output_stream

toStream

Convert a KTable into a KStream object.

Stream Type Returns Parameter Value Type Description
KTable<K,V> KStream<K,V> mapper Inline or reference (Optional)The KeyValueMapper function. If no mapper is specified, K will be used.

Example:

from: input_table
via:
  - type: toStream
to: output_stream

transformKey

This operation takes a message and transforms the key into a new key, which can be potentially of different type.

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<KR,V> mapper Inline or reference The KeyValueMapper function.
    name string (Optional) The name of the processor node.

Example:

from: input_stream
via:
  - type: transformKey
    mapper:
      expression: key=str(key)   # convert key from Integer to String
to: output_stream

transformKeyValue

This operation takes a message and transforms the key and value into a new key and value, which can each be potentially of different type.

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<KR,VR> mapper Inline or reference The KeyValueMapper function.
    name string (Optional) The name of the processor node.

Example:

from: input_stream
via:
  - type: transformKeyValue
    mapper:
      expression: (str(key), str(value))   # convert key and value from Integer to String
to: output_stream

transformKeyValueToKeyValueList

This operation takes a message and transforms it into zero, one or more new messages, which can be potentially of different type.

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<KR,VR> mapper Inline or reference The KeyValueToKeyValueListTransformer function.
    name string (Optional) The name of the processor node.

Example:

from: input_stream
via:
  - type: transformKeyValueToKeyValueList
    mapper:
    expression: [(key,value),(key,value)]   # duplicate all incoming messages
to: output_stream

transformKeyValueToValueList

This operation takes a message and generates a new list of values for the key, which can be potentially of different type.

Stream Type Returns Parameter Value Type Description
KStream<K,V> KStream<K,VR> mapper Inline or reference The KeyValueToValueListTransformer function.
    name string (Optional) The name of the processor node.

Example:

from: input_stream
via:
  - type: transformKeyValueToValueList
    mapper:
      expression: [value+1,value+2,value+3]   # creates 3 new messages [key,VR] for every input message
to: output_stream

transformValue

This operation takes a message and transforms the value into a new value, which can be potentially of different type.

Stream Type Returns Parameter Value Type Required Description
KStream<K,V> KStream<K,VR> store Store configuration No The Store configuration.
    mapper Inline or reference Yes The KeyValueToKeyValueListTransformer function.
    name string No The name of the processor node.

Example:

from: input_stream
via:
  - type: transformValue
    mapper:
      expression: value=str(key)   # convert value from Integer to String
to: output_stream

windowBySession

Create a new windowed KStream instance that can be used to perform windowed aggregations. For more details on the different types of windows, please refer to WindowTypes [this page].
Stream Type Returns Parameter Value Type Description
[KGroupedStream][KGroupedStream::windowedBySession] SessionWindowedKStream<K,V> inactivityGap Duration The inactivity gap parameter for the SessionWindows object.
    grace Duration (Optional) The grace parameter for the SessionWindows object.
CogroupedKStream SessionWindowedCogroupedKStream<K,V> inactivityGap Duration The inactivity gap parameter for the SessionWindows object.
    grace Duration (Optional) The grace parameter for the SessionWindows object.

Example:

from: input_stream
via:
  - type: groupBy
    mapper: my_mapper_function
  - type: windowedBy
    windowType: time
    duration: 1h
    advanceBy: 15m
    grace: 5m
  - type: reduce
    reducer: my_reducer_function
  - type: toStream
to: output_stream

windowByTime

Create a new windowed KStream instance that can be used to perform windowed aggregations. For more details on the different types of windows, please refer to WindowTypes [this page].
Stream Type Returns Parameter Value Type Description
KGroupedStream TimeWindowedKStream<K,V> windowType string Fixed value sliding.
    timeDifference Duration The time difference parameter for the SlidingWindows object.
    grace Duration (Optional) The grace parameter for the SlidingWindows object.
KGroupedStream TimeWindowedKStream<K,V> windowType string Fixed value hopping.
    advanceBy Duration The amount by which each window is advanced. If this value is not specified, then it will be equal to duration, which gives tumbling windows. If you make this value smaller than duration you will get hopping windows.
    grace Duration (Optional) The grace parameter for the TimeWindows object.
KGroupedStream TimeWindowedKStream<K,V> windowType string Fixed value tumbling.
    duration Duration The duration parameter for the TimeWindows object.
    grace Duration (Optional) The grace parameter for the TimeWindows object.
[CogroupedKStream][CogroupedKStream::windowedBySliding] TimeWindowedCogroupedKStream<K,V> windowType string Fixed value sliding.
    timeDifference Duration The time difference parameter for the SlidingWindows object.
    grace Duration (Optional) The grace parameter for the SlidingWindows object.
[CogroupedKStream][CogroupedKStream::windowedByDuration] TimeWindowedCogroupedKStream<K,V> windowType string Fixed value hopping.
    advanceBy Duration The amount by which each window is advanced. If this value is not specified, then it will be equal to duration, which gives tumbling windows. If you make this value smaller than duration you will get hopping windows.
    grace Duration (Optional) The grace parameter for the TimeWindows object.
[CogroupedKStream][CogroupedKStream::windowedByDuration] TimeWindowedCogroupedKStream<K,V> windowType string Fixed value tumbling.
    duration Duration The duration parameter for the TimeWindows object.
    grace Duration (Optional) The grace parameter for the TimeWindows object.

Example:

from: input_stream
via:
  - type: groupBy
    mapper: my_mapper_function
  - type: windowedBy
    windowType: time
    duration: 1h
    advanceBy: 15m
    grace: 5m
  - type: reduce
    reducer: my_reducer_function
  - type: toStream
to: output_stream

Sink Operations

branch

Branches out messages from the input stream into several branches based on predicates. Each branch is defined as a list item below the branch operation. Branch predicates are defined using the if keyword. Messages are only processed by one of the branches, namely the first one for which the predicate returns true.

Applies to Value Type Description
KStream<K,V> List of branch definitions See for description of branch definitions below.

Branches in KSML are nested pipelines, which are parsed without the requirement of a source attribute. Each branch accepts the following parameters:

Branch element Value Type Description
if Inline Predicate or reference The Predicate function that determines if the message is sent down this branch, or is passed on to the next branch in line.
Inline All pipeline parameters, see [Pipeline] The inlined pipeline describes the topology of the specific branch.

Example:

from: some_source_topic
branch:
  - if:
      expression: value['color'] == 'blue'
    to: ksml_sensordata_blue
  - if:
      expression: value['color'] == 'red'
    to: ksml_sensordata_red
  - forEach:
      code: |
        print('Unknown color sensor: '+str(value))

In this example, the first two branches are entered if the respective predicate matches (the color attribute of value matches a certain color). If the predicate returns false, then the next predicate/branch is tried. Only the last branch in the list can be a sink operation.

forEach

This sends each message to a custom defined function. This function is expected to handle each message as its final step. The function does not (need to) return anything.

Applies to Value Type Description
KStream<K,V> Inline or reference The [ForEach] function that is called for every record on the source stream.

Examples:

forEach: my_print_function
forEach:
  code: print(value)

to

Messages are sent directly to a named Stream.

Applies to Value Type Description
KStream<K,V> string The name of a defined Stream.

Example:

to: my_target_topic

toExtractor

Messages are passed onto a user function, which returns the name of the topic that message needs to be sent to. This operation acts as a Sink and is always the last operation in a pipeline.

Applies to Value Type Description
KStream<K,V> Inline or reference The [TopicNameExtractor] function that is called for every message and returns the topic name to which the message shall be written.

Examples:

toExtractor: my_extractor_function
toExtractor:
  code: |
    if key == 'sensor1':
      return 'ksml_sensordata_sensor1'
    elif key == 'sensor2':
      return 'ksml_sensordata_sensor2'
    else:
      return 'ksml_sensordata_sensor0'