Operations
Table of Contents
- Introduction
- Operations
- aggregate
- convertKey
- convertKeyValue
- convertValue
- count
- filter
- filterNot
- groupBy
- groupByKey
- join
- leftJoin
- map
- mapKey
- mapValues
- merge
- outerJoin
- peek
- reduce
- repartition
- selectKey
- suppress
- toStream
- transformKey
- transformKeyValue
- transformKeyValueToKeyValueList
- transformKeyValueToValueList
- transformValue
- windowBySession
- windowByTime
- Sink Operations
Introduction
Pipelines in KSML have a beginning, a middle and (optionally) an end. Operations form the middle part of pipelines. They are modeled as separate YAML entities, where each operation takes input from the previous operation and applies its own logic. The returned stream then serves as input for the next operation.
Transform Operations
Transformations are operations that take an input stream and convert it to an output stream. This section lists all supported transformations. Each one states the type of stream it returns.
Parameter | Value Type | Description |
---|---|---|
name |
string | The name of the transformation operation. |
Note that not all combinations of output/input streams are supported by Kafka Streams. The user that writes the KSML definition needs to make sure that streams that result from one operations can actually serve as input to the next. KSML does type checking and will exit with an error when operations that can not be chained together are listed after another in the KSML definition.
aggregate
This operations aggregates multiple values into a single one by repeatedly calling an aggregator function. It can operate on a range of stream types.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KGroupedStream<K,V> |
KTable<K,VR> |
store |
Store configuration | No | The Store configuration. |
initializer |
Inline or reference | Yes | The Initializer function. | ||
aggregator |
Inline or reference | Yes | The Aggregator function. | ||
KGroupedTable<K,V> |
KTable<K,VR> |
store |
Store configuration | No | The Store configuration. |
initializer |
Inline or reference | Yes | The Initializer function. | ||
adder |
Inline or reference | Yes | The Reducer function that adds two values. | ||
subtractor |
Inline or reference | Yes | The Reducer function that subtracts two values. | ||
SessionWindowedKStream<K,V> |
KTable<Windowed<K>,VR> |
store |
Store configuration | No | The Store configuration. |
initializer |
Inline or reference | Yes | The Initializer function. | ||
aggregator |
Inline or reference | Yes | The Aggregator function. | ||
merger |
Inline or reference | Yes | The Merger function. | ||
TimeWindowedKStreamObject<K,V> |
KTable<Windowed<K>,VR> |
store |
Store configuration | No | The Store configuration. |
initializer |
Inline or reference | Yes | The Initializer function. | ||
aggregator |
Inline or reference | Yes | The Aggregator function. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: aggregate
initializer:
expression: 0
aggregator:
expression: value1+value2
- type: toStream
to: output_stream
convertKey
This built-in operation takes a message and converts the key into a given type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<KR,V> |
into |
string | The type to convert to |
Example:
from: input_stream
via:
- type: convertKey
into: string
to: output_stream
convertKeyValue
This built-in operation takes a message and converts the key and value into a given type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<KR,VR> |
into |
string | The type to convert to |
Example:
from: input_stream
via:
- type: convertKeyValue
into: (string,avro:SensorData)
to: output_stream
convertValue
This built-in operation takes a message and converts the value into a given type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<KR,V> |
into |
string | The type to convert to |
Example:
from: input_stream
via:
- type: convertValue
into: avro:SensorData
to: output_stream
count
This operations counts the number of messages and returns a table multiple values into a single one by repeatedly calling an aggregator function. It can operate on a range of stream types.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KGroupedStream<K,V> |
KTable<K,Long> |
store |
Store configuration | No | The Store configuration. |
KGroupedTable<K,V> |
KTable<K,Long> |
store |
Store configuration | No | The Store configuration. |
SessionWindowedKStream<K,V> |
KTable<Windowed<K>,Long> |
store |
Store configuration | No | The Store configuration. |
TimeWindowedKStreamObject<K,V> |
KTable<Windowed<K>,Long> |
store |
Store configuration | No | The Store configuration. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: count
- type: toStream
to: output_stream
filter
Filter all incoming messages according to some predicate. The predicate function is called for every message. Only when the predicate returns true
, then the message will be sent to the output stream.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
predicate |
Yes | Inline or reference | The Predicate function. |
KTable<K,V> |
KTable<K,V> |
predicate |
Yes | Inline or reference | The Predicate function. |
Example:
from: input_stream
via:
- type: filter
predicate: my_filter_function
- type: filter
predicate:
expression: key.startswith('a')
to: output_stream
filterNot
This transformation works exactly like filter, but negates all predicates before applying them. See filter for details on how to implement.
groupBy
Group the records of a stream on a new key that is selected using the provided KeyValueMapper.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KGroupedStream<K,V> |
store |
Store configuration | No | The Store configuration. |
mapper |
Inline or reference | Yes | The KeyValueMapper function. | ||
KTable<K,V> |
KGroupedTable<K,V> |
store |
Store configuration | No | The Store configuration. |
mapper |
Inline or reference | Yes | The KeyValueMapper function. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: aggregate
initializer:
expression: 0
aggregator:
expression: value1+value2
- type: toStream
to: output_stream
groupByKey
Group the records of a stream on the stream’s key.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KGroupedStream<K,V> |
store |
Store configuration | No | The Store configuration. |
mapper |
Inline or reference | Yes | The KeyValueMapper function. |
Example:
from: input_stream
via:
- type: groupByKey
- type: aggregate
initializer:
expression: 0
aggregator:
expression: value1+value2
- type: toStream
to: output_stream
join
Join records of this stream with another stream’s records using windowed inner equi join. The join is computed on the records’ key with join predicate thisKStream.key == otherKStream.key
. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
store |
Store configuration | No | The Store configuration. |
stream |
string |
Yes | The name of the stream to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. | ||
duration |
string |
Yes | The Duration of the windows to join. | ||
KStream<K,V> |
KStream<K,V> |
store |
Store configuration | No | The Store configuration. |
table |
string |
Yes | The name of the table to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. | ||
duration |
string |
Yes | The Duration of the windows to join. | ||
KStream<K,V> |
KStream<K,V> |
store |
Store configuration | No | The Store configuration. |
globalTable |
string |
Yes | The name of the global table to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. | ||
duration |
string |
Yes | The Duration of the windows to join. | ||
KTable<K,V> |
KTable<K,V> |
store |
Store configuration | No | The Store configuration. |
table |
string |
Yes | The name of the table to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. |
Example:
from: input_stream
via:
- type: join
stream: second_stream
valueJoiner: my_key_value_mapper
duration: 1m
to: output_stream
leftJoin
Join records of this stream with another stream’s records using windowed left equi join. In contrast to inner-join, all records from this stream will produce at least one output record. The join is computed on the records’ key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
store |
Store configuration | No | The Store configuration. |
stream |
string |
Yes | The name of the stream to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. | ||
duration |
string |
Yes | The Duration of the windows to join. | ||
KStream<K,V> |
KStream<K,V> |
store |
Store configuration | No | The Store configuration. |
table |
string |
Yes | The name of the table to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. | ||
duration |
string |
Yes | The Duration of the windows to join. | ||
KStream<K,V> |
KStream<K,V> |
store |
Store configuration | No | The Store configuration. |
globalTable |
string |
Yes | The name of the global table to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. | ||
duration |
string |
Yes | The Duration of the windows to join. | ||
KTable<K,V> |
KTable<K,V> |
store |
Store configuration | No | The Store configuration. |
table |
string |
Yes | The name of the table to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. |
Example:
[yaml]
----
from: input_stream
via:
- type: leftJoin
stream: second_stream
valueJoiner: my_join_function
duration: 1m
to: output_stream
map
This is an alias for transformKeyValue.
mapKey
This is an alias for transformKey.
mapValues
This is an alias for transformValue.
merge
Merge this stream and the given stream into one larger stream. There is no ordering guarantee between records from this stream and records from the provided stream in the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
stream |
string |
The name of the stream to merge with. |
Example:
from: input_stream
via:
- type: merge
stream: second_stream
to: output_stream
outerJoin
Join records of this stream with another stream’s records using windowed outer equi join. In contrast to inner-join or left-join, all records from both streams will produce at least one output record. The join is computed on the records’ key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
store |
Store configuration | No | The Store configuration. |
stream |
string |
Yes | The name of the stream to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. | ||
duration |
string |
Yes | The Duration of the windows to join. | ||
KTable<K,V> |
KTable<K,V> |
store |
Store configuration | No | The Store configuration. |
table |
string |
Yes | The name of the table to join with. | ||
valueJoiner |
Inline or reference | Yes | The KeyValueMapper function. |
Example:
[yaml]
----
from: input_stream
via:
- type: outerJoin
stream: second_stream
valueJoiner: my_join_function
duration: 1m
to: output_stream
peek
Perform an action on each record of a stream. This is a stateless record-by-record operation. Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
forEach |
Inline or reference | The [ForEach] function that will be called for every message. |
Example:
from: input_stream
via:
- type: peek
forEach: print_key_and_value
to: output_stream
reduce
Combine the values of records in this stream by the grouped key. Records with null key or value are ignored. Combining implies that the type of the aggregate result is the same as the type of the input value, similar to aggregate(Initializer, Aggregator).
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KGroupedStream<K,V> |
KTable<K,VR> |
store |
Store configuration | No | The Store configuration. |
reducer |
Inline or reference | Yes | The Reducer function. | ||
KGroupedTable<K,V> |
KTable<K,VR> |
store |
Store configuration | No | The Store configuration. |
adder |
Inline or reference | Yes | The Reducer function that adds two values. | ||
subtractor |
Inline or reference | Yes | The Reducer function that subtracts two values. | ||
SessionWindowedKStream<K,V> |
KTable<Windowed<K>,VR> |
store |
Store configuration | No | The Store configuration. |
reducer |
Inline or reference | Yes | The Reducer function. | ||
TimeWindowedKStreamObject<K,V> |
KTable<Windowed<K>,VR> |
store |
Store configuration | No | The Store configuration. |
initializer |
Inline or reference | Yes | The Reducer function. |
Example:
[yaml]
----
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: reduce
reducer:
expression: value1+value2
- type: toStream
to: output_stream
repartition
Materialize this stream to an auto-generated repartition topic and create a new KStream from the auto-generated topic using key serde, value serde, StreamPartitioner, number of partitions, and topic name part.
The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. The topic will be named as “${applicationId}-
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
store |
Store configuration | No | The Store configuration. |
name |
string |
Yes | The name used as part of repartition topic and processor name. | ||
partitioner |
Inline or reference | Yes | The StreamPartitioner function. |
Example:
from: input_stream
via:
- type: repartition
name: my_repartitioner
partitioner: my_own_partitioner
- type: peek
forEach: print_key_and_value
- type: toStream
to: output_stream
selectKey
This is an alias for transformKey.
suppress
Suppress some updates from this changelog stream, determined by the supplied Suppressed configuration. When windowCloses is selected and no further restrictions are provided, then this is interpreted as Suppressed.untilWindowCloses(unbounded()).
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KTable<K,V> |
KTable<K,V> |
until |
string |
This value can either be timeLimit or windowCloses . Note that timeLimit suppression works on any stream, while windowCloses suppression works only on Windowed streams. For the latter, see windowedBy. |
duration |
string |
The Duration to suppress updates (only when until ==timeLimit ) |
||
maxBytes |
int |
(Optional) The maximum number of bytes to suppress updates | ||
maxRecords |
int |
(Optional) The maximum number of records to suppress updates | ||
bufferFullStrategy |
string |
(Optional) Can be one of emitEarlyWhenFull , shutdownWhenFull |
Example:
from: input_table
via:
- type: suppress
until: timeLimit
duration: 30s
maxBytes: 128000
maxRecords: 10000
bufferFullStrategy: emitEarlyWhenFull
- type: peek
forEach: print_key_and_value
- type: toStream
to: output_stream
toStream
Convert a KTable into a KStream object.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KTable<K,V> |
KStream<K,V> |
mapper |
Inline or reference | (Optional)The KeyValueMapper function. If no mapper is specified, K will be used. |
Example:
from: input_table
via:
- type: toStream
to: output_stream
transformKey
This operation takes a message and transforms the key into a new key, which can be potentially of different type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<KR,V> |
mapper |
Inline or reference | The KeyValueMapper function. |
name |
string |
(Optional) The name of the processor node. |
Example:
from: input_stream
via:
- type: transformKey
mapper:
expression: key=str(key) # convert key from Integer to String
to: output_stream
transformKeyValue
This operation takes a message and transforms the key and value into a new key and value, which can each be potentially of different type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<KR,VR> |
mapper |
Inline or reference | The KeyValueMapper function. |
name |
string |
(Optional) The name of the processor node. |
Example:
from: input_stream
via:
- type: transformKeyValue
mapper:
expression: (str(key), str(value)) # convert key and value from Integer to String
to: output_stream
transformKeyValueToKeyValueList
This operation takes a message and transforms it into zero, one or more new messages, which can be potentially of different type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<KR,VR> |
mapper |
Inline or reference | The KeyValueToKeyValueListTransformer function. |
name |
string |
(Optional) The name of the processor node. |
Example:
from: input_stream
via:
- type: transformKeyValueToKeyValueList
mapper:
expression: [(key,value),(key,value)] # duplicate all incoming messages
to: output_stream
transformKeyValueToValueList
This operation takes a message and generates a new list of values for the key, which can be potentially of different type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
mapper |
Inline or reference | The KeyValueToValueListTransformer function. |
name |
string |
(Optional) The name of the processor node. |
Example:
from: input_stream
via:
- type: transformKeyValueToValueList
mapper:
expression: [value+1,value+2,value+3] # creates 3 new messages [key,VR] for every input message
to: output_stream
transformValue
This operation takes a message and transforms the value into a new value, which can be potentially of different type.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
store |
Store configuration | No | The Store configuration. |
mapper |
Inline or reference | Yes | The KeyValueToKeyValueListTransformer function. | ||
name |
string |
No | The name of the processor node. |
Example:
from: input_stream
via:
- type: transformValue
mapper:
expression: value=str(key) # convert value from Integer to String
to: output_stream
windowBySession
Create a new windowed KStream instance that can be used to perform windowed aggregations. For more details on the different types of windows, please refer to WindowTypes | [this page]. |
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
[KGroupedStream][KGroupedStream::windowedBySession] | SessionWindowedKStream<K,V> |
inactivityGap | Duration | The inactivity gap parameter for the SessionWindows object. |
grace | Duration | (Optional) The grace parameter for the SessionWindows object. | ||
CogroupedKStream | SessionWindowedCogroupedKStream<K,V> |
inactivityGap | Duration | The inactivity gap parameter for the SessionWindows object. |
grace | Duration | (Optional) The grace parameter for the SessionWindows object. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: windowedBy
windowType: time
duration: 1h
advanceBy: 15m
grace: 5m
- type: reduce
reducer: my_reducer_function
- type: toStream
to: output_stream
windowByTime
Create a new windowed KStream instance that can be used to perform windowed aggregations. For more details on the different types of windows, please refer to WindowTypes | [this page]. |
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KGroupedStream | TimeWindowedKStream<K,V> |
windowType |
string |
Fixed value sliding . |
timeDifference | Duration | The time difference parameter for the SlidingWindows object. | ||
grace | Duration | (Optional) The grace parameter for the SlidingWindows object. | ||
KGroupedStream | TimeWindowedKStream<K,V> |
windowType |
string |
Fixed value hopping . |
advanceBy | Duration | The amount by which each window is advanced. If this value is not specified, then it will be equal to duration, which gives tumbling windows. If you make this value smaller than duration you will get hopping windows. | ||
grace | Duration | (Optional) The grace parameter for the TimeWindows object. | ||
KGroupedStream | TimeWindowedKStream<K,V> |
windowType |
string |
Fixed value tumbling . |
duration | Duration | The duration parameter for the TimeWindows object. | ||
grace | Duration | (Optional) The grace parameter for the TimeWindows object. | ||
[CogroupedKStream][CogroupedKStream::windowedBySliding] | TimeWindowedCogroupedKStream<K,V> |
windowType |
string |
Fixed value sliding . |
timeDifference | Duration | The time difference parameter for the SlidingWindows object. | ||
grace | Duration | (Optional) The grace parameter for the SlidingWindows object. | ||
[CogroupedKStream][CogroupedKStream::windowedByDuration] | TimeWindowedCogroupedKStream<K,V> |
windowType |
string |
Fixed value hopping . |
advanceBy | Duration | The amount by which each window is advanced. If this value is not specified, then it will be equal to duration, which gives tumbling windows. If you make this value smaller than duration you will get hopping windows. | ||
grace | Duration | (Optional) The grace parameter for the TimeWindows object. | ||
[CogroupedKStream][CogroupedKStream::windowedByDuration] | TimeWindowedCogroupedKStream<K,V> |
windowType |
string |
Fixed value tumbling . |
duration | Duration | The duration parameter for the TimeWindows object. | ||
grace | Duration | (Optional) The grace parameter for the TimeWindows object. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: windowedBy
windowType: time
duration: 1h
advanceBy: 15m
grace: 5m
- type: reduce
reducer: my_reducer_function
- type: toStream
to: output_stream
Sink Operations
branch
Branches out messages from the input stream into several branches based on predicates. Each branch is defined as a list item below the branch operation. Branch predicates are defined using the if
keyword. Messages are only processed by one of the branches, namely the first one for which the predicate returns true
.
Applies to | Value Type | Description |
---|---|---|
KStream<K,V> |
List of branch definitions | See for description of branch definitions below. |
Branches in KSML are nested pipelines, which are parsed without the requirement of a source attribute. Each branch accepts the following parameters:
Branch element | Value Type | Description |
---|---|---|
if |
Inline Predicate or reference | The Predicate function that determines if the message is sent down this branch, or is passed on to the next branch in line. |
Inline | All pipeline parameters, see [Pipeline] | The inlined pipeline describes the topology of the specific branch. |
Example:
from: some_source_topic
branch:
- if:
expression: value['color'] == 'blue'
to: ksml_sensordata_blue
- if:
expression: value['color'] == 'red'
to: ksml_sensordata_red
- forEach:
code: |
print('Unknown color sensor: '+str(value))
In this example, the first two branches are entered if the respective predicate matches (the color attribute of value matches a certain color).
If the predicate returns false
, then the next predicate/branch is tried. Only the last branch in the list can be a sink operation.
forEach
This sends each message to a custom defined function. This function is expected to handle each message as its final step. The function does not (need to) return anything.
Applies to | Value Type | Description |
---|---|---|
KStream<K,V> |
Inline or reference | The [ForEach] function that is called for every record on the source stream. |
Examples:
forEach: my_print_function
forEach:
code: print(value)
to
Messages are sent directly to a named Stream
.
Applies to | Value Type | Description |
---|---|---|
KStream<K,V> |
string |
The name of a defined Stream. |
Example:
to: my_target_topic
toExtractor
Messages are passed onto a user function, which returns the name of the topic that message needs to be sent to. This operation acts as a Sink and is always the last operation in a pipeline.
Applies to | Value Type | Description |
---|---|---|
KStream<K,V> |
Inline or reference | The [TopicNameExtractor] function that is called for every message and returns the topic name to which the message shall be written. |
Examples:
toExtractor: my_extractor_function
toExtractor:
code: |
if key == 'sensor1':
return 'ksml_sensordata_sensor1'
elif key == 'sensor2':
return 'ksml_sensordata_sensor2'
else:
return 'ksml_sensordata_sensor0'