Operations
Table of Contents
- Introduction
- Operations
- aggregate
- cogroup
- convertKey
- convertKeyValue
- convertValue
- count
- filter
- filterNot
- groupBy
- groupByKey
- join
- leftJoin
- map
- mapKey
- mapValues
- merge
- outerJoin
- peek
- reduce
- repartition
- selectKey
- suppress
- toStream
- transformKey
- transformKeyValue
- transformKeyValueToKeyValueList
- transformKeyValueToValueList
- transformValue
- windowBySession
- windowByTime
- Sink Operations
Introduction
Pipelines in KSML have a beginning, a middle and (optionally) an end. Operations form the middle part of pipelines. They are modeled as separate YAML entities, where each operation takes input from the previous operation and applies its own logic. The returned stream then serves as input for the next operation.
Transform Operations
Transformations are operations that take an input stream and convert it to an output stream. This section lists all supported transformations. Each one states the type of stream it returns.
Parameter | Value Type | Description |
---|---|---|
name |
string | The name of the operation. |
Note that not all combinations of output/input streams are supported by Kafka Streams. The user that writes the KSML definition needs to make sure that streams that result from one operation can actually serve as input to the next. KSML does type checking and will exit with an error when operations that can not be chained together are listed after another in the KSML definition.
aggregate
This operation aggregates multiple values into a single one by repeatedly calling an aggregator function. It can operate on a range of stream types.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KGroupedStream<K,V> |
KTable<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
initializer |
Inline or reference | Yes | An Initializer function, which takes no arguments and returns a value of type VR . |
||
aggregator |
Inline or reference | Yes | An Aggregator function, which takes a key of type K , a value of type V and aggregatedValue of type VR . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type VR . |
||
KGroupedTable<K,V> |
KTable<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
initializer |
Inline or reference | Yes | An Initializer function, which takes no arguments and returns a value of type VR . |
||
adder |
Inline or reference | Yes | An Aggregator function, which takes a key of type K , a value of type V and aggregatedValue of type VR . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type VR . |
||
subtractor |
Inline or reference | Yes | An Aggregator function, which takes a key of type K , a value of type V and aggregatedValue of type VR . It should remove the key/value from the previously calculated aggregateValue and return a new aggregate value of type VR . |
||
SessionWindowedKStream<K,V> |
KTable<Windowed<K>,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type session . |
initializer |
Inline or reference | Yes | An Initializer function, which takes no arguments and returns a value of type VR . |
||
aggregator |
Inline or reference | Yes | An Aggregator function, which takes a key of type K , a value of type V and aggregatedValue of type VR . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type VR . |
||
merger |
Inline or reference | Yes | A Merger function, which takes a key of type K , and two values value1 and value2 of type V . The return value is the merged result, also of type V . |
||
[TimeWindowedKStreamObject]<K,V> |
KTable<Windowed<K>,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type window . |
initializer |
Inline or reference | Yes | An Initializer function, which takes no arguments and returns a value of type VR . |
||
aggregator |
Inline or reference | Yes | An Aggregator function, which takes a key of type K , a value of type V and aggregatedValue of type VR . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type VR . |
||
[CogroupedKStream]<K,V> |
KTable<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
initializer |
Inline or reference | Yes | An Initializer function, which takes no arguments and returns a value of type VR . |
||
SessionWindowedCogroupedKStream<K,V> |
KTable<Windowed<K>,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type session . |
initializer |
Inline or reference | Yes | An Initializer function, which takes no arguments and returns a value of type VR . |
||
merger |
Inline or reference | Yes | A Merger function, which takes a key of type K , and two values value1 and value2 of type V . The return value is the merged result, also of type V . |
||
TimeWindowedCogroupedKStream<K,V> |
KTable<Windowed<K>,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type window . |
initializer |
Inline or reference | Yes | An Initializer function, which takes no arguments and returns a value of type VR . |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: aggregate
initializer:
expression: 0
aggregator:
expression: aggregatedValue + value
- type: toStream
to: output_stream
cogroup
This operation cogroups multiple values into a single one by repeatedly calling an aggregator function. It can operate on a range of stream types.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KGroupedStream<K,V> |
[CogroupedKStream]<K,VR> |
aggregator |
Inline or reference | Yes | An Aggregator function, which takes a key of type K , a value of type V and aggregatedValue of type VR . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type VR . |
[CogroupedKStream]<K,V> |
n/a | n/a | n/a | n/a | This method is currently not supported in KSML. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: cogroup
aggregator:
expression: aggregatedValue + value
- type: toStream
to: output_stream
Note: this operation was added to KSML for completion purposes, but is not considered ready or fully functional. Feel free to experiment, but don’t rely on this in production. Syntax changes may occur in future KSML releases.
convertKey
This built-in operation takes a message and converts the key into a given type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<KR,V> |
into |
string | The type to convert the key into. Conversion to KR is done by KSML. |
Example:
from:
topic: input_stream
keyType: string
valueType: string
via:
- type: convertKey
into: json
to: output_stream
convertKeyValue
This built-in operation takes a message and converts the key and value into a given type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<KR,VR> |
into |
string | The type to convert the key and value into. Conversion of key into KR and value into VR is done by KSML. |
Example:
from:
topic: input_stream
keyType: string
valueType: string
via:
- type: convertKeyValue
into: (json,xml)
to: output_stream
convertValue
This built-in operation takes a message and converts the value into a given type.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
into |
string | The type to convert the value into. Conversion of value into VR is done by KSML. |
Example:
from:
topic: input_stream
keyType: string
valueType: string
via:
- type: convertValue
into: xml
to: output_stream
count
This operation counts the number of messages and returns a table multiple values into a single one by repeatedly calling an aggregator function. It can operate on a range of stream types.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KGroupedStream<K,V> |
KTable<K,Long> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
KGroupedTable<K,V> |
KTable<K,Long> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
SessionWindowedKStream<K,V> |
KTable<Windowed<K>,Long> |
store |
Store configuration | No | An optional Store configuration, should be of type session . |
[TimeWindowedKStreamObject]<K,V> |
KTable<Windowed<K>,Long> |
store |
Store configuration | No | An optional Store configuration, should be of type window . |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: count
- type: toStream
to: output_stream
filter
Filter all incoming messages according to some predicate. The predicate function is called for every message. Only when
the predicate returns true
, then the message will be sent to the output stream.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
if |
Yes | Inline or reference | A Predicate function, which returns True if the message can pass the filter, False otherwise. |
KTable<K,V> |
KTable<K,V> |
if |
Yes | Inline or reference | A Predicate function, which returns True if the message can pass the filter, False otherwise. |
Example:
from: input_stream
via:
- type: filter
if:
expression: key.startswith('a')
to: output_stream
filterNot
This operation works exactly like filter, but negates all predicates before applying them. That means
messages for which the predicate returns False
are accepted, while those that the predicate returns True
for are
filtered out.
See filter for details on how to implement.
groupBy
Group the records of a stream by value resulting from a KeyValueMapper.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KGroupedStream<KR,V> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V and returns a value of type KR to group the stream by. |
||
KTable<K,V> |
KGroupedTable<KR,V> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V and returns a value of type KR to group the stream by. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: aggregate
initializer:
expression: 0
aggregator:
expression: value1+value2
- type: toStream
to: output_stream
groupByKey
Group the records of a stream by the stream’s key.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KGroupedStream<K,V> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
Example:
from: input_stream
via:
- type: groupByKey
- type: aggregate
initializer:
expression: 0
aggregator:
expression: value1+value2
- type: toStream
to: output_stream
join
Join records of this stream with another stream’s records using inner join. The join is computed on the
records’ key with join predicate thisStream.key == otherStream.key
. If both streams are not tables, then
their timestamps need to be close enough as defined by timeDifference.
Stream Type | Returns | Parameter | Value Type | Required | Description | |
---|---|---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type window . |
|
stream |
string |
Yes | The name of the stream to join with. The stream should be of key type K and value type VR . |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a key of type K , and two values value1 and value2 of type V . The return value is the joined value of type VR . |
|||
timeDifference |
duration |
Yes | The maximum allowed between two joined records. | |||
grace |
duration |
No | A grace period during with out-of-order to-be-joined records may still arrive. | |||
KStream<K,V> |
KStream<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
|
table |
string |
Yes | The name of the table to join with. The table should be of key type K and value type VO . |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a value1 of type V from the source table and a value2 of type VO from the join table. The return value is the joined value of type VR . |
|||
grace |
duration |
No | A grace period during with out-of-order to-be-joined records may still arrive. | |||
KStream<K,V> |
KStream<K,VR> |
globalTable |
string |
Yes | The name of the global table to join with. The global table should be of key type GK and value type GV . |
|
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V . The return value is the key of type GK of the records from the GlobalTable to join with. |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a key of type K , and two values value1 and value2 of type V . The return value is the joined value of type VR . |
|||
KTable<K,V> |
KTable<K,VR> |
store |
Store configuration | No | The Store configuration. | |
table |
string |
Yes | The name of the table to join with. The table should be of key type K and value type VO . |
|||
foreignKeyExtractor |
Inline or reference | No | A [ForeignKeyExtractor] function, which takes a value of type V , which needs to be converted into the key type KO of the table to join with. |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a value1 of type V from the source table and a value2 of type VO from the join table. The return value is the joined value of type VR . |
|||
partitioner |
Inline or reference | No | A [Partitioner] function, which partitions the records on the primary stream. | |||
otherPartitioner |
Inline or reference | No | A [Partitioner] function, which partitions the records on the join table. |
Example:
from: input_stream
via:
- type: join
stream: second_stream
valueJoiner: my_key_value_mapper
timeDifference: 1m
to: output_stream
leftJoin
Join records of this stream with another stream’s records using left join. The join is computed on the
records’ key with join predicate thisStream.key == otherStream.key
. If both streams are not tables, then
their timestamps need to be close enough as defined by timeDifference.
Stream Type | Returns | Parameter | Value Type | Required | Description | |
---|---|---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type window . |
|
stream |
string |
Yes | The name of the stream to join with. The stream should be of key type K and value type VR . |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a key of type K , and two values value1 and value2 of type V . The return value is the joined value of type VR . |
|||
timeDifference |
duration |
Yes | The maximum allowed between two joined records. | |||
grace |
duration |
No | A grace period during with out-of-order to-be-joined records may still arrive. | |||
KStream<K,V> |
KStream<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
|
table |
string |
Yes | The name of the table to join with. The table should be of key type K and value type VO . |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a value1 of type V from the source table and a value2 of type VO from the join table. The return value is the joined value of type VR . |
|||
grace |
duration |
No | A grace period during with out-of-order to-be-joined records may still arrive. | |||
KStream<K,V> |
KStream<K,VR> |
globalTable |
string |
Yes | The name of the global table to join with. The global table should be of key type GK and value type GV . |
|
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V . The return value is the key of type GK of the records from the GlobalTable to join with. |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a key of type K , and two values value1 and value2 of type V . The return value is the joined value of type VR . |
|||
KTable<K,V> |
KTable<K,VR> |
store |
Store configuration | No | The Store configuration. | |
table |
string |
Yes | The name of the table to join with. The table should be of key type K and value type VO . |
|||
foreignKeyExtractor |
Inline or reference | No | A [ForeignKeyExtractor] function, which takes a value of type V , which needs to be converted into the key type KO of the table to join with. |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a value1 of type V from the source table and a value2 of type VO from the join table. The return value is the joined value of type VR . |
|||
partitioner |
Inline or reference | No | A [Partitioner] function, which partitions the records on the primary stream. | |||
otherPartitioner |
Inline or reference | No | A [Partitioner] function, which partitions the records on the join table. |
Example:
from: input_stream
via:
- type: leftJoin
stream: second_stream
valueJoiner: my_key_value_mapper
timeDifference: 1m
to: output_stream
map
This is an alias for transformKeyValue.
mapKey
This is an alias for transformKey.
mapValues
This is an alias for transformValue.
merge
Merge this stream and the given stream into one larger stream. There is no ordering guarantee between records from this stream and records from the provided stream in the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
stream |
string |
The name of the stream to merge with. |
Example:
from: input_stream
via:
- type: merge
stream: second_stream
to: output_stream
outerJoin
Join records of this stream with another stream’s records using outer join. The join is computed on the
records’ key with join predicate thisStream.key == otherStream.key
. If both streams are not tables, then
their timestamps need to be close enough as defined by timeDifference.
Stream Type | Returns | Parameter | Value Type | Required | Description | |
---|---|---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type window . |
|
stream |
string |
Yes | The name of the stream to join with. The stream should be of key type K and value type VR . |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a key of type K , and two values value1 and value2 of type V . The return value is the joined value of type VR . |
|||
timeDifference |
duration |
Yes | The maximum allowed between two joined records. | |||
grace |
duration |
No | A grace period during with out-of-order to-be-joined records may still arrive. | |||
KTable<K,V> |
KStream<K,VR> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
|
table |
string |
Yes | The name of the table to join with. The table should be of key type K and value type VO . |
|||
valueJoiner |
Inline or reference | Yes | A [ValueJoiner] function, which takes a value1 of type V from the source table and a value2 of type VO from the join table. The return value is the joined value of type VR . |
Example:
from: input_stream
via:
- type: outerJoin
stream: second_stream
valueJoiner: my_key_value_mapper
timeDifference: 1m
to: output_stream
peek
Perform an action on each record of a stream. This is a stateless record-by-record operation. Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
forEach |
Inline or reference | The [ForEach] function that will be called for every message, receiving arguments key of type K and value of type V . |
Example:
from: input_stream
via:
- type: peek
forEach: print_key_and_value
to: output_stream
reduce
Combine the values of records in this stream by the grouped key. Records with null key or value are ignored. Combining implies that the type of the aggregate result is the same as the type of the input value, similar to aggregate(Initializer, Aggregator).
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KGroupedStream<K,V> |
KTable<K,V> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
reducer |
Inline or reference | Yes | A Reducer function, which takes a key of type K , a value of type V and aggregatedValue of type V . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type V . |
||
KGroupedTable<K,V> |
KTable<K,V> |
store |
Store configuration | No | An optional Store configuration, should be of type keyValue . |
adder |
Inline or reference | Yes | A Reducer function, which takes a key of type K , a value of type V and aggregatedValue of type V . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type V . |
||
subtractor |
Inline or reference | Yes | A Reducer function, which takes a key of type K , a value of type V and aggregatedValue of type V . It should remove the key/value from the previously calculated aggregateValue and return a new aggregate value of type V . |
||
SessionWindowedKStream<K,V> |
KTable<Windowed<K>,V> |
store |
Store configuration | No | An optional Store configuration, should be of type session . |
reducer |
Inline or reference | Yes | A Reducer function, which takes a key of type K , a value of type V and aggregatedValue of type V . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type V . |
||
[TimeWindowedKStreamObject]<K,V> |
KTable<Windowed<K>,V> |
store |
Store configuration | No | An optional Store configuration, should be of type window . |
reducer |
Inline or reference | Yes | A Reducer function, which takes a key of type K , a value of type V and aggregatedValue of type V . It should add the key/value to the previously calculated aggregateValue and return a new aggregate value of type V . |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: aggregate
initializer:
expression: 0
aggregator:
expression: aggregatedValue + value
- type: toStream
to: output_stream
Example:
[ yaml ]
----
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: reduce
reducer:
expression: value1+value2
- type: toStream
to: output_stream
repartition
Materialize this stream to an auto-generated repartition topic with a given number of partitions, using a custom
partitioner. Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be
automatically purged. The topic will be named as “${applicationId}-
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,V> |
numberOfPartitions |
integer | No | The number of partitions of the repartitioned topic. |
partitioner |
Inline or reference | No | A custom [Partitioner] function to partition records. |
Example:
from: input_stream
via:
- type: repartition
name: my_partitioner
numberOfPartitions: 3
partitioner: my_own_partitioner
- type: peek
forEach: print_key_and_value
- type: toStream
to: output_stream
selectKey
This is an alias for transformKey.
suppress
Suppress some updates from this changelog stream, determined by the supplied Suppressed configuration. When windowCloses is selected and no further restrictions are provided, then this is interpreted as Suppressed.untilWindowCloses(unbounded()).
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KTable<K,V> |
KTable<K,V> |
until |
string |
Yes | This value can either be timeLimit or windowCloses . Note that timeLimit suppression works on any stream, while windowCloses suppression works only on Windowed streams. For the latter, see [windowByTime] or [windowBySession]. |
duration |
string |
No | The Duration to suppress updates (only when until ==timeLimit ) |
||
maxBytes |
int |
No | The maximum number of bytes to suppress updates | ||
maxRecords |
int |
No | The maximum number of records to suppress updates | ||
bufferFullStrategy |
string |
No | Can be one of emitEarlyWhenFull , shutdownWhenFull |
Example:
from: input_table
via:
- type: suppress
until: timeLimit
duration: 30s
maxBytes: 128000
maxRecords: 10000
bufferFullStrategy: emitEarlyWhenFull
- type: peek
forEach: print_key_and_value
- type: toStream
to: output_stream
toStream
Convert a KTable into a KStream object.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KTable<K,V> |
KStream<KR,V> |
mapper |
Inline or reference | No | A KeyValueMapper function, which takes a key of type K and a value of type V . The return value is the key of resulting stream, which is of type KR . If no mapper is provided, then keys remain unchanged. |
Example:
from: input_table
via:
- type: toStream
to: output_stream
transformKey
This operation takes a message and transforms the key into a new key, which may have a different type.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<KR,V> |
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V . The return value is the key of resulting stream, which is of type KR . |
Example:
from: input_stream
via:
- type: transformKey
mapper:
expression: str(key) # convert key from source type to string
to: output_stream
transformKeyValue
This operation takes a message and transforms the key and value into a new key and value, which can each have a different type than the source message key and value.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<KR,VR> |
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V . The return type should be a tuple of type (KR,VR) containing the transformed key and value . |
Example:
from: input_stream
via:
- type: transformKeyValue
mapper:
expression: (str(key), str(value)) # convert key and value from source type to string
to: output_stream
transformKeyValueToKeyValueList
This operation takes a message and transforms it into zero, one or more new messages, which may have different key and value types than the source.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<KR,VR> |
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V . The return type should be a list of type [(KR,VR)] containing a list of transformed key and value pairs. |
Example:
from: input_stream
via:
- type: transformKeyValueToKeyValueList
mapper:
expression: [ (key,value), (key,value) ] # duplicate all incoming messages
to: output_stream
transformKeyValueToValueList
This operation takes a message and transforms it into zero, one or more new values, which may have different value types than the source. Every entry in the result list is combined with the source key and produced on the output stream.
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
mapper |
Inline or reference | A KeyValueMapper function, which takes a key of type K and a value of type V . The return type should be a list of type [VR] containing a list of transformed value s. |
Example:
from: input_stream
via:
- type: transformKeyValueToValueList
mapper:
expression: [ value+1, value+2, value+3 ] # creates 3 new messages [key,VR] for every input message
to: output_stream
transformMetadata
This operation takes a message and transforms its value to a new value, which may have different value type than the source.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
mapper |
Inline or reference | Yes | A [MetadataTransformer] function that converts the metadata (Kafka headers, timestamp) of every record in the stream. It gets a metadata object as input and should return the same type, but potentially with modified fields. |
Example:
from: input_stream
via:
- type: transformValue
mapper:
expression: str(value) # convert value from source type to String
to: output_stream
transformValue
This operation takes a message and transforms its value to a new value, which may have different value type than the source.
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KStream<K,V> |
KStream<K,VR> |
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V . The return type should be a value of type VR . |
Example:
from: input_stream
via:
- type: transformValue
mapper:
expression: str(value) # convert value from source type to String
to: output_stream
windowBySession
Create a new windowed KStream instance that can be used to perform windowed aggregations. For more details on the different types of windows, please refer to [WindowTypes]|[this page].
Stream Type | Returns | Parameter | Value Type | Required | Description |
---|---|---|---|---|---|
KGroupedStream<K,V> |
SessionWindowedKStream<K,V> |
inactivityGap | Duration | Yes | The maximum inactivity gap with which keys are grouped. |
grace | Duration | No | The grace duration allowing for out-of-order messages to still be associated with the right session. | ||
[CogroupedKStream]<K,V> |
SessionWindowedCogroupedKStream<K,V> |
inactivityGap | Duration | Yes | The maximum inactivity gap with which keys are grouped. |
grace | Duration | No | The grace duration allowing for out-of-order messages to still be associated with the right session. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: windowedBySession
inactivityGap: 1h
grace: 5m
- type: reduce
reducer: my_reducer_function
- type: toStream
to: output_stream
windowByTime
Create a new windowed KStream instance that can be used to perform windowed aggregations. For more details on the different types of windows, please refer to [WindowTypes]|[this page].
Stream Type | Returns | Parameter | Value Type | Description |
---|---|---|---|---|
KGroupedStream<K,V> |
TimeWindowedKStream<K,V> |
windowType |
string |
Fixed value sliding . |
timeDifference | Duration | The time difference parameter for the [SlidingWindows] object. | ||
grace | Duration | (Optional) The grace parameter for the [SlidingWindows] object. | ||
KGroupedStream<K,V> |
TimeWindowedKStream<K,V> |
windowType |
string |
Fixed value hopping . |
advanceBy | Duration | The amount by which each window is advanced. If this value is not specified, then it will be equal to duration, which gives tumbling windows. If you make this value smaller than duration you will get hopping windows. | ||
grace | Duration | (Optional) The grace parameter for the [TimeWindows] object. | ||
KGroupedStream<K,V> |
TimeWindowedKStream<K,V> |
windowType |
string |
Fixed value tumbling . |
duration | Duration | The duration parameter for the [TimeWindows] object. | ||
grace | Duration | (Optional) The grace parameter for the [TimeWindows] object. | ||
[CogroupedKStream]<K,V> |
TimeWindowedCogroupedKStream<K,V> |
windowType |
string |
Fixed value sliding . |
timeDifference | Duration | The time difference parameter for the [SlidingWindows] object. | ||
grace | Duration | (Optional) The grace parameter for the [SlidingWindows] object. | ||
[CogroupedKStream]<K,V> |
TimeWindowedCogroupedKStream<K,V> |
windowType |
string |
Fixed value hopping . |
advanceBy | Duration | The amount by which each window is advanced. If this value is not specified, then it will be equal to duration, which gives tumbling windows. If you make this value smaller than duration you will get hopping windows. | ||
grace | Duration | (Optional) The grace parameter for the [TimeWindows] object. | ||
[CogroupedKStream]<K,V> |
TimeWindowedCogroupedKStream<K,V> |
windowType |
string |
Fixed value tumbling . |
duration | Duration | The duration parameter for the [TimeWindows] object. | ||
grace | Duration | (Optional) The grace parameter for the [TimeWindows] object. |
Example:
from: input_stream
via:
- type: groupBy
mapper: my_mapper_function
- type: windowedBy
windowType: time
duration: 1h
advanceBy: 15m
grace: 5m
- type: reduce
reducer: my_reducer_function
- type: toStream
to: output_stream
Sink Operations
as
Pipelines closed of with as
can be referred by other pipelines as their starting reference. This allows for a common
part of processing logic to be placed in its own pipeline in KSML, serving as an intermediate result.
Applies to | Value Type | Required | Description |
---|---|---|---|
Any pipeline<K,V> |
string | Yes | The name under which the pipeline result can be referenced by other pipelines. |
Example:
pipelines:
first:
from: some_source_topic
via:
- type: ...
as: first_pipeline
second:
from: first_pipeline
via:
- type: ...
to: ...
Here, the first pipeline ends by sending its output to a stream internally called first_pipeline
. This stream is used
as input for the second
pipeline.
branch
Branches out messages from the input stream into several branches based on predicates. Each branch is defined as a list
item below the branch operation. Branch predicates are defined using the if
keyword. Messages are only processed by
one of the branches, namely the first one for which the predicate returns true
.
Applies to | Value Type | Required | Description |
---|---|---|---|
KStream<K,V> |
List of branch definitions | Yes | See for description of branch definitions below. |
Branches in KSML are nested pipelines, which are parsed without the requirement of a source attribute. Each branch accepts the following parameters:
Branch element | Value Type | Required | Description |
---|---|---|---|
if |
Inline Predicate or reference | No | The Predicate function that determines if the message is sent down this branch, or is passed on to the next branch in line. |
Inline | All pipeline parameters, see [Pipeline] | Yes | The inlined pipeline describes the topology of the specific branch. |
Example:
from: some_source_topic
branch:
- if:
expression: value['color'] == 'blue'
to: ksml_sensordata_blue
- if:
expression: value['color'] == 'red'
to: ksml_sensordata_red
- forEach:
code: |
print('Unknown color sensor: '+value["color"])
In this example, the first two branches are entered if the respective predicate matches (the color attribute of value
matches a certain color).
If the predicate returns false
, then the next predicate/branch is tried. Only the last branch in the list can be a
sink operation.
forEach
This sends each message to a custom defined function. This function is expected to handle each message as its final step. The function does not (need to) return anything.
Applies to | Value Type | Description |
---|---|---|
KStream<K,V> |
Inline or reference | The [ForEach] function that is called for every record on the source stream. Its arguments are key of type K and value of type V . |
Examples:
forEach: my_foreach_function
forEach:
code: print(value)
This sends each message to a custom defined print function. This function is expected to handle each message as the final in the pipeline. The function does not (need to) return anything.
As target, you can specify a filename. If none is specified, then all messages are printed to stdout.
Applies to | Parameter | Value Type | Required | Description |
---|---|---|---|---|
KStream<K,V> |
filename | string | No | The filename to output records to. If nothing is specified, then messages will be printed on stdout. |
label | string | No | A label to attach to every output record. | |
mapper |
Inline or reference | Yes | A KeyValueMapper function, which takes a key of type K and a value of type V . The return value should be of type string and is sent to the specified file or stdout. |
Examples:
from: source
via:
- type: ...
print:
filename: file.txt
mapper:
expression: "record value: " + str(value)
to
Messages are sent directly to a named Stream
.
Applies to | Value Type | Required | Description |
---|---|---|---|
KStream<K,V> |
Inline [Topic] or reference to a stream, table or global table | Yes | The name of a defined stream. |
Examples:
to: my_target_topic
from: source
via:
- type: ...
to:
topic: my_target_topic
keyType: someType
valueType: someOtherType
partitioner:
expression: hash_of(key)
toTopicNameExtractor
Messages are passed onto a user function, which returns the name of the topic that message needs to be sent to. This operation acts as a Sink and is always the last operation in a pipeline.
Applies to | Value Type | Required | Description |
---|---|---|---|
KStream<K,V> |
Inline or reference | Yes | The [TopicNameExtractor] function that is called for every message and returns the topic name to which the message shall be written. |
Examples:
toTopicNameExtractor: my_extractor_function
toTopicNameExtractor:
code: |
if key == 'sensor1':
return 'ksml_sensordata_sensor1'
elif key == 'sensor2':
return 'ksml_sensordata_sensor2'
else:
return 'ksml_sensordata_sensor0'