Operation Reference
This document provides a comprehensive reference for all operations available in KSML. Each operation is described with its parameters, behavior, and examples.
Introduction
Operations are the building blocks of stream processing in KSML. They define how data is transformed, filtered, aggregated, and otherwise processed as it flows through your application. Operations form the middle part of pipelines, taking input from the previous operation and producing output for the next operation.
Understanding the different types of operations and when to use them is crucial for building effective stream processing applications.
Operations Overview
KSML supports 28 operations for stream processing. Each operation serves a specific purpose in transforming, filtering, aggregating, or routing data:
Operation | Purpose | Common Use Cases |
---|---|---|
Stateless Transformation Operations | ||
flatMap | Transform one record into multiple records | Split batch messages, expand arrays |
map | Transform both key and value | Change message format, enrich data |
mapKey | Transform only the key | Change partitioning key |
mapValues | Transform only the value (preserves key) | Modify payload without affecting partitioning |
selectKey | Select a new key from the value | Extract key from message content |
transformKey | Transform key using custom function | Complex key transformations |
transformValue | Transform value using custom function | Complex value transformations |
Filtering Operations | ||
filter | Keep records that match a condition | Remove unwanted messages |
filterNot | Remove records that match a condition | Exclude specific messages |
Format Conversion Operations | ||
convertKey | Convert key format (e.g., JSON to Avro) | Change serialization format |
convertValue | Convert value format (e.g., JSON to Avro) | Change serialization format |
Grouping & Partitioning Operations | ||
groupBy | Group by a new key | Prepare for aggregation with new key |
groupByKey | Group by existing key | Prepare for aggregation |
repartition | Redistribute records across partitions | Custom partitioning logic |
Stateful Aggregation Operations | ||
aggregate | Build custom aggregations | Complex calculations, custom state |
count | Count records per key | Track occurrences |
reduce | Combine records with same key | Accumulate values |
Join Operations | ||
join | Inner join two streams | Correlate related events |
leftJoin | Left outer join two streams | Include all left records |
merge | Combine multiple streams into one | Stream unification |
outerJoin | Full outer join two streams | Include all records from both sides |
Windowing Operations | ||
windowBySession | Group into session windows | User session analysis |
windowByTime | Group into fixed time windows | Time-based aggregations |
Output Operations | ||
forEach | Process without producing output | Side effects, external calls |
Print to console | Debugging, monitoring | |
to | Send to a specific topic | Write results to Kafka |
toTopicNameExtractor | Send to dynamically determined topic | Route to different topics |
Control Flow Operations | ||
branch | Split stream into multiple branches | Conditional routing |
peek | Observe records without modification | Logging, debugging |
Choosing the Right Operation
When designing your KSML application, consider these factors:
- State Requirements: Stateful operations (aggregations, joins) require state stores and more resources
- Partitioning: Operations like
groupBy
may trigger data redistribution - Performance: Some operations are more computationally expensive than others
- Error Handling: Use
try
operations to handle potential failures gracefully
Stateless Transformation Operations
Stateless transformation operations modify records (key, value, or both) without maintaining state between records. These operations are the most common building blocks for data processing pipelines.
map
Transforms both the key and value of each record.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
mapper |
Object | Yes | Specifies how to transform the key and value |
The mapper
can be defined using:
expression
: A simple expression returning a tuple (key, value)code
: A Python code block returning a tuple (key, value)
Example
Full example for map
:
mapValues
Transforms the value of each record without changing the key.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
mapper |
Object | Yes | Specifies how to transform the value |
The mapper
can be defined using:
expression
: A simple expressioncode
: A Python code block
Example
Full example for mapValues
:
mapKey
Transforms the key of each record without modifying the value.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
mapper |
Object | Yes | Specifies how to transform the key |
The mapper
can be defined using:
- expression
: A simple expression returning the new key
- code
: A Python code block returning the new key
Example
Full example for mapKey
:
- Function Reference: keyTransformer - Shows mapKey used with keyTransformer function
flatMap
Transforms each record into zero or more records, useful for splitting batch messages into individual records.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
mapper |
Object | Yes | Specifies how to transform each record into multiple records |
The mapper
must specify:
resultType
: Format"[(keyType,valueType)]"
indicating list of tuplescode
: Python code returning a list of tuples[(key, value), ...]
Example
- type: flatMap
mapper:
resultType: list(tuple(string, json))
code: |
return [(f"{value['order_id']}_{i['item_id']}", {
"order_id": value['order_id'],
"customer_id": value['customer_id'],
"item_id": i['item_id'],
"quantity": i['quantity'],
"total": i['quantity'] * i['price']
}) for i in value['items']]
This example splits order batches containing multiple items into individual item records:
Producer - flatMap
example (click to expand)
functions:
generate_order_batch:
type: generator
globalCode: |
import json, random
counter = 0
code: |
global counter
counter += 1
cust = random.choice(["alice", "bob", "charlie"])
batch = {
"order_id": f"ord-{counter}",
"customer_id": cust,
"items": [
{"item_id": "laptop", "quantity": 1, "price": 999.99},
{"item_id": "mouse", "quantity": 2, "price": 29.99},
{"item_id": "keyboard", "quantity": 1, "price": 79.99}
]
}
return cust, json.dumps(batch)
resultType: (string, json)
producers:
order_batch_producer:
generator: generate_order_batch
interval: 3s
to:
topic: order_batches
keyType: string
valueType: json
Processor - flatMap
example (click to expand)
streams:
order_batches_input:
topic: order_batches
keyType: string
valueType: json
pipelines:
main:
from: order_batches_input
via:
- type: flatMap
mapper:
resultType: list(tuple(string, json))
code: |
return [(f"{value['order_id']}_{i['item_id']}", {
"order_id": value['order_id'],
"customer_id": value['customer_id'],
"item_id": i['item_id'],
"quantity": i['quantity'],
"total": i['quantity'] * i['price']
}) for i in value['items']]
to:
topic: individual_items
keyType: string
valueType: json
What this example does:
- The producer generates order batches containing multiple items.
- The processor uses
flatMap
to split each order batch into individual item records - transforming 1 input record into 3 output records (one per item). - Each output record has a unique key combining order ID and item ID, with calculated total prices per item.
selectKey
Changes the key of each record without modifying the value. This operation extracts a new key from the existing key and/or value, enabling data repartitioning and preparation for joins based on different key attributes.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
mapper |
Object | Yes | Specifies how to derive the new key from the key/value |
The mapper
can be defined using:
expression
: A simple expression returning the new key (can use bothkey
andvalue
)code
: A Python code block returning the new key (can use bothkey
andvalue
)
Example
This example demonstrates changing the key from session_id to user_id for better data organization:
Producer - selectKey
example (click to expand)
streams:
user_events:
topic: user_events
keyType: string
valueType: json
functions:
generate_user_event:
type: generator
resultType: (string,json)
code: |
import random, time
uid = random.choice(["u01", "u02", "u03"])
evt = random.choice(["login", "purchase", "view"])
sid = f"s{random.randint(100, 999)}"
data = {
"user_id": uid,
"event_type": evt,
"session_id": sid,
"timestamp": int(time.time())
}
if evt == "purchase":
data["amount"] = round(random.uniform(10, 500), 2)
return (sid, data)
producers:
# Produce user events every 3 seconds
user_event_producer:
generator: generate_user_event
interval: 3s
to: user_events
Processor - selectKey
example (click to expand)
What this example does:
- The producer generates user events (login, purchase, view, logout, search) with different session IDs as keys
- The processor uses
selectKey
to change the key fromsession_id
touser_id
, enabling better data partitioning for user-centric analytics - This rekeying allows subsequent operations to group and aggregate data by user rather than by session
transformKey
Transforms the key using a custom transformer function.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
mapper |
String | Yes | Name of the key transformer function |
Example
Full example for transformKey
:
transformValue
Transforms the value using a custom transformer function.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
mapper |
String | Yes | Name of the value transformer function |
Example
Full example for transformValue
:
Filtering Operations
Filtering operations selectively pass or remove records based on conditions, allowing you to control which data continues through your processing pipeline.
filter
Keeps only records that satisfy a condition.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
if |
Object | Yes | Specifies the condition |
The if
can be defined using:
expression
: A simple boolean expressioncode
: A Python code block returning a boolean
Example
Full example for filter
:
filterNot
Excludes records that satisfy a condition (opposite of filter). Records are kept when the condition returns false.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
if |
Object | Yes | Specifies the condition |
The if
parameter must reference a predicate function that returns a boolean.
Example
This example filters out products with "inactive" status, keeping all other products:
Producer - filterNot
example (click to expand)
functions:
generate_product:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
counter += 1
pid = f"p{counter:03d}"
return pid, {
"product_id": pid,
"name": random.choice(["laptop", "phone", "tablet"]),
"status": random.choice(["active", "inactive", "pending"]),
"price": round(random.uniform(50, 1500), 2)
}
resultType: (string, json)
producers:
product_producer:
generator: generate_product
interval: 2s
to:
topic: product_stream
keyType: string
valueType: json
Processor - filterNot
example (click to expand)
streams:
product_input:
topic: product_stream
keyType: string
valueType: json
functions:
is_inactive_product:
type: predicate
code: |
if value is None:
return False
status = value.get("status", "")
if status == "inactive":
print(f"Filtering out inactive product: {value.get('product_id')} - {value.get('name')}")
return True
return False
pipelines:
main:
from: product_input
via:
- type: filterNot
if: is_inactive_product
- type: peek
forEach:
code: |
print(f"Active product kept: {key} - {value['name']} (status: {value['status']}, price: ${value['price']})")
to:
topic: active_products
keyType: string
valueType: json
What this example does:
- The producer generates products with different statuses: active, inactive, pending, discontinued
- The processor uses
filterNot
with a predicate function to exclude products with "inactive" status - Products with other statuses (active, pending, discontinued) are kept and passed through to the output topic
Format Conversion Operations
Format conversion operations change the serialization format of keys or values without altering the actual data content.
convertKey
Converts the key to a different data format.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
into |
String | Yes | Target format for the key |
Example
Full example for convertKey
:
convertValue
Converts the value to a different data format.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
into |
String | Yes | Target format for the value |
Example
Full example for convertValue
:
Grouping & Partitioning Operations
Grouping and partitioning operations organize data by keys and control how records are distributed across partitions, preparing data for aggregation or improving processing parallelism.
groupBy
Groups records by a new key derived from the record.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
keySelector |
Object | Yes | Specifies how to select the new key |
The keySelector
can be defined using:
- expression
: A simple expression returning the grouping key
- code
: A Python code block returning the grouping key
Example
- type: groupBy
name: group_by_sensor_type
mapper:
code: |
if value is None:
return "unknown"
if not "sensor_type" in value:
return "unknown"
expression: value["sensor_type"]
resultType: string
Full example for groupBy
:
groupByKey
Groups records by their existing key for subsequent aggregation operations.
Parameters
None. This operation is typically followed by an aggregation operation.
Example
Full example for groupByKey
:
repartition
Redistributes records across partitions, optionally using custom partitioning logic. This operation allows you to control data distribution for performance optimization or to meet specific processing requirements.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
numberOfPartitions |
Integer | No | Number of partitions for redistribution |
partitioner |
String | No | Function name for custom partitioning logic |
Example
Note:
To test this repartition example, ensure your topics have sufficient partitions. The example requires minimum 4 partitions since it redistributes to 4 partitions (0-3). Update your docker-compose.yml:
# Repartition with custom partitioner for user-based distribution
- type: repartition
numberOfPartitions: 4
partitioner: activity_partitioner
Producer - repartition
example (click to expand)
functions:
generate_user_activity:
type: generator
globalCode: |
import time
import random
activity_id = 0
user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]
activity_types = ["login", "purchase", "browse", "logout", "search"]
regions = ["north", "south", "east", "west"]
code: |
global activity_id
activity_id += 1
# Generate user activity with region as initial key
region = random.choice(regions)
activity = {
"activity_id": f"activity_{activity_id:04d}",
"user_id": random.choice(user_ids),
"activity_type": random.choice(activity_types),
"timestamp": int(time.time() * 1000),
"region": region
}
# Log the activity generation
log.info("Generated activity: {} for user {} in region {} - type: {}",
activity["activity_id"], activity["user_id"], region, activity["activity_type"])
# Use region as key initially (will be repartitioned by user_id later)
key = region
value = activity
expression: (key, value)
resultType: (string, json)
producers:
user_activity_producer:
generator: generate_user_activity
interval: 3s
to:
topic: user_activities
keyType: string
valueType: json
Processor - repartition
example (click to expand)
streams:
user_activities:
topic: user_activities
keyType: string
valueType: json
offsetResetPolicy: earliest
repartitioned_activities:
topic: repartitioned_activities
keyType: string
valueType: json
functions:
# Custom partitioner that distributes activities by user type
# Users ending in even numbers get partition 0-1, odd numbers get partition 2-3
activity_partitioner:
type: streamPartitioner
code: |
# Custom partitioning logic based on user_id pattern
# This demonstrates intelligent partitioning for user-based processing
if value and "user_id" in value:
user_id = value["user_id"]
# Extract user number from user_001, user_002, etc.
user_num = int(user_id.split("_")[-1])
# Even user numbers (002, 004) -> partitions 0-1
# Odd user numbers (001, 003, 005) -> partitions 2-3
if user_num % 2 == 0:
partition = user_num % 2 # 0 or 1
else:
partition = 2 + (user_num % 2) # 2 or 3
log.debug("Routing user {} (num:{}) to partition {}",
user_id, user_num, partition)
return partition
# Default to partition 0
return 0
resultType: integer
pipelines:
repartition_activities:
from: user_activities
via:
# First, change key from region to user_id for user-based processing
- type: mapKey
mapper:
code: |
# Extract user_id from the activity to become the new key
if value and "user_id" in value:
user_key = value["user_id"]
log.info("Changing key from region '{}' to user_id '{}'", key, user_key)
result = user_key
else:
result = key
expression: result
resultType: string
# Add processing metadata
- type: transformValue
mapper:
code: |
# Add repartitioning info to track the transformation
if value:
value["processing_info"] = f"Repartitioned by user: {value.get('user_id', 'unknown')}"
value["original_region"] = key # Keep track of original region
result = value
expression: result
resultType: json
# Repartition with custom partitioner for user-based distribution
- type: repartition
numberOfPartitions: 4
partitioner: activity_partitioner
# Log the repartitioned activity
- type: peek
forEach:
code: |
if value:
log.info("Repartitioned activity {}: {} -> user-based partitioning applied",
key, value.get("processing_info", "unknown"))
# Send to output topic to observe the repartitioning results
to:
topic: repartitioned_activities
keyType: string
valueType: json
The repartition operation demonstrates data redistribution by changing keys from regions to user IDs, then using custom partitioning logic to distribute activities based on user patterns. This ensures related user activities are processed together while optimizing partition utilization.
What the example does:
Demonstrates intelligent data redistribution for user-centric processing:
- Changes partitioning strategy from region-based to user-based keys
- Applies custom partitioning logic based on user ID patterns
- Routes even user numbers (002, 004) to partitions 0-1
- Routes odd user numbers (001, 003, 005) to partitions 2-3
- Producer generates activities initially keyed by region for realistic repartitioning scenario
Key Features:
- Dynamic key transformation from region to user_id
- Custom partition calculation based on user patterns
- Guaranteed co-location of activities for the same user
- Processing metadata tracking for observability
- Explicit partition count handling (4 partitions total)
- Fallback to partition 0 for edge cases
Expected Results:
When running this example, you'll see log messages like:
"Generated activity: activity_0001 for user user_001 in region south - type: purchase"
- Producer creating activities"Changing key from region 'south' to user_id 'user_001'"
- Key transformation"Repartitioned activity user_001: Repartitioned by user: user_001 -> user-based partitioning applied"
- Successful repartitioning- User_001 and user_003 (odd numbers) go to partitions 2-3
- User_002 and user_004 (even numbers) go to partitions 0-1
- Activities for the same user are guaranteed to be processed in order
Stateful Aggregation Operations
Stateful aggregation operations maintain state between records to perform calculations like counting, summing, or building custom aggregates based on record keys.
aggregate
Aggregates records by key using a custom aggregation function.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
initializer |
Object | Yes | Specifies the initial value for the aggregation |
aggregator |
Object | Yes | Specifies how to combine the current record with the aggregate |
Both initializer
and aggregator
can be defined using:
expression
: A simple expressioncode
: A Python code block
Example
- type: aggregate
store:
type: window
windowSize: 1m
retention: 10m
initializer: initialize_sales_stats
aggregator: aggregate_sales
Full example for aggregate
:
count
Counts the number of records for each key.
Parameters
None.
Example
Full example for count
:
reduce
Combines records with the same key using a reducer function.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
reducer |
Object | Yes | Specifies how to combine two values |
The reducer
can be defined using:
expression
: A simple expressioncode
: A Python code block
Example
Full example for reduce
:
Join Operations
Join operations combine data from multiple streams or tables based on matching keys, enabling you to correlate related events from different data sources.
join
Performs an inner join between two streams.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
stream |
String | Yes | The name of the stream to join with |
table |
String | Yes | The name of the table to join with (for stream-table joins) |
valueJoiner |
Object | Yes | Function that defines how to combine values from both sides |
timeDifference |
Duration | No | The time difference for the join window (for stream-stream joins) |
grace |
Duration | No | Grace period for late-arriving data (for stream-stream joins) |
foreignKeyExtractor |
Object | No | Function to extract foreign key (for stream-table joins) |
partitioner |
String | No | Function name for custom partitioning of current stream |
otherPartitioner |
String | No | Function name for custom partitioning of join stream/table |
Example
- type: join
stream: product_purchases
valueJoiner: correlate_click_and_purchase
timeDifference: 30m # Look for purchases within 30 minutes of a click
grace: 5m # Grace period for late events
thisStore:
name: clicks_join_store
type: window
windowSize: 60m # Must be 2*timeDifference
retention: 65m # Must be 2*timeDifference + grace = 60m + 5m
retainDuplicates: true
otherStore:
name: purchases_join_store
type: window
windowSize: 60m # Must be 2*timeDifference
retention: 65m # Must be 2*timeDifference + grace = 60m + 5m
retainDuplicates: true
Full example for join
:
leftJoin
Performs a left join between two streams.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
stream |
String | Yes | The name of the stream to join with |
table |
String | Yes | The name of the table to join with (for stream-table joins) |
valueJoiner |
Object | Yes | Function that defines how to combine values from both sides |
timeDifference |
Duration | No | The time difference for the join window (for stream-stream joins) |
grace |
Duration | No | Grace period for late-arriving data (for stream-stream joins) |
foreignKeyExtractor |
Object | No | Function to extract foreign key (for stream-table joins) |
partitioner |
String | No | Function name for custom partitioning of current stream |
otherPartitioner |
String | No | Function name for custom partitioning of join stream/table |
Example
Full example for leftJoin
:
merge
Merges multiple streams with identical key and value types into a single unified stream. The merge operation combines streams without any joining logic - it simply forwards all records from all input streams to the output stream in the order they arrive.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
stream |
String | Yes | The name of the stream to merge with the main stream |
Example
Producer - merge
example (click to expand)
functions:
generate_stream_a:
type: generator
globalCode: |
import random
message_id = 0
colors = ["red", "blue", "green"]
code: |
global message_id
message_id += 1
message = {
"id": f"stream_a_{message_id}",
"color": random.choice(colors),
"source": "stream_a"
}
log.info("Generated message from stream A: {} - color: {}", message["id"], message["color"])
key = message["color"]
value = message
expression: (key, value)
resultType: (string, json)
generate_stream_b:
type: generator
globalCode: |
import random
message_id = 0
colors = ["red", "blue", "green"]
code: |
global message_id
message_id += 1
message = {
"id": f"stream_b_{message_id}",
"color": random.choice(colors),
"source": "stream_b"
}
log.info("Generated message from stream B: {} - color: {}", message["id"], message["color"])
key = message["color"]
value = message
expression: (key, value)
resultType: (string, json)
producers:
stream_a_producer:
generator: generate_stream_a
interval: 3s
to:
topic: stream_a
keyType: string
valueType: json
stream_b_producer:
generator: generate_stream_b
interval: 2s
to:
topic: stream_b
keyType: string
valueType: json
Processor - merge
example (click to expand)
streams:
stream_a:
topic: stream_a
keyType: string
valueType: json
offsetResetPolicy: earliest
stream_b:
topic: stream_b
keyType: string
valueType: json
offsetResetPolicy: earliest
merged_stream:
topic: merged_stream
keyType: string
valueType: json
pipelines:
merge_streams:
from: stream_a
via:
# Merge with stream B
- type: merge
stream: stream_b
# Log merged messages
- type: peek
forEach:
code: |
if value:
log.info("Merged message: {} from {} - color: {}",
value.get("id", "unknown"),
value.get("source", "unknown"),
value.get("color", "unknown"))
to:
topic: merged_stream
keyType: string
valueType: json
What This Example Does:
The example demonstrates merging two independent streams (stream_a
and stream_b
) into a single processing pipeline. Both producers generate messages with a color key and JSON values containing an id, color, and source field. The merge operation combines both streams so that messages from either stream flow through the same downstream processing.
How the Merge Operation Works:
- Stream Union: The merge operation creates a simple union of multiple streams - all records from all input streams are forwarded to the output
- No Transformation: Records pass through unchanged, maintaining their original keys and values
- Interleaved Processing: Messages from different streams are processed as they arrive, interleaved based on timing
- Shared Pipeline: After merging, both streams share the same downstream operations (in this example, the peek operation logs all messages)
Important Notes:
- All streams being merged must have identical key and value types
- Records maintain their original timestamps and ordering per stream
- No complex joining logic - this is a simple stream union operation
- Can merge any number of streams by chaining multiple merge operations
Expected Results:
When running this example, you'll see interleaved log messages like:
"Generated message from stream A: stream_a_1 - color: green"
- Producer A creating messages every 3 seconds"Generated message from stream B: stream_b_2 - color: blue"
- Producer B creating messages every 2 seconds"Merged message: stream_a_1 from stream_a - color: green"
- Merged pipeline processing stream A messages"Merged message: stream_b_2 from stream_b - color: blue"
- Same pipeline processing stream B messages
Both streams flow through the unified pipeline after merging, demonstrating how merge combines multiple data sources for shared processing.
outerJoin
Performs an outer join between two streams.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
stream |
String | Yes | The name of the stream to join with |
table |
String | Yes | The name of the table to join with (for stream-table joins) |
valueJoiner |
Object | Yes | Function that defines how to combine values from both sides |
timeDifference |
Duration | No | The time difference for the join window (for stream-stream joins) |
grace |
Duration | No | Grace period for late-arriving data (for stream-stream joins) |
foreignKeyExtractor |
Object | No | Function to extract foreign key (for stream-table joins) |
partitioner |
String | No | Function name for custom partitioning of current stream |
otherPartitioner |
String | No | Function name for custom partitioning of join stream/table |
Example
- type: outerJoin
stream: user_logouts
valueJoiner: analyze_user_session
timeDifference: 10m # Look for logouts within 10 minutes of login
grace: 2m # Grace period for late events
thisStore:
name: login_session_store
type: window
windowSize: 20m # Must be 2*timeDifference
retention: 22m # Must be 2*timeDifference + grace
retainDuplicates: true
otherStore:
name: logout_session_store
type: window
windowSize: 20m # Must be 2*timeDifference
retention: 22m # Must be 2*timeDifference + grace
retainDuplicates: true
Full example for outerJoin
:
Windowing Operations
Windowing operations group records into time-based windows, enabling temporal aggregations and time-bounded processing.
windowByTime
Groups records into time windows.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
windowType |
String | No | The type of window (tumbling , hopping , or sliding ) |
timeDifference |
Duration | Yes | The duration of the window |
advanceBy |
Long | No | Only required for hopping windows, how often to advance the window |
grace |
Long | No | Grace period for late-arriving data |
Example
Full example for windowByTime
:
windowBySession
Groups records into session windows, where events with timestamps within inactivityGap
durations are seen as belonging
to the same session.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
inactivityGap |
Duration | Yes | The maximum duration between events before they are seen as belonging to a different session |
grace |
Long | No | Grace period for late-arriving data |
Example
Full example for windowBySession
:
Output Operations
Output operations represent the end of a processing pipeline, sending records to topics or performing terminal actions like logging.
to
Sends records to a specific Kafka topic.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
topic |
String | Yes | The name of the target topic |
keyType |
String | No | The data type of the key |
valueType |
String | No | The data type of the value |
partitioner |
String | No | Function name for custom partitioning logic |
Example
Example with Custom Partitioner
to:
topic: partitioned_orders
keyType: string
valueType: json
partitioner: priority_region_partitioner
Full example for to
:
Full example for to
with partitioner:
toTopicNameExtractor
Sends records to topics determined dynamically based on the record content. This operation enables content-based routing, allowing you to distribute messages to different topics based on their attributes, priorities, or business logic.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
topicNameExtractor |
String | Yes | Name of the function that determines the topic name |
partitioner |
String | No | Function name for custom partitioning logic |
Example
This example demonstrates routing system events to different topics based on severity level:
Producer - toTopicNameExtractor
example (click to expand)
streams:
system_events:
topic: system_events
keyType: string
valueType: json
functions:
generate_system_event:
type: generator
resultType: (string,json)
code: |
import random, time
sev = random.choice(["INFO", "WARNING", "ERROR", "CRITICAL"])
msgs = {
"CRITICAL": ["System failure", "Service down"],
"ERROR": ["Request timeout", "DB failed"],
"WARNING": ["High CPU", "Disk low"],
"INFO": ["Service started", "Health OK"]
}
eid = f"e{random.randint(1000, 9999)}"
return eid, {
"event_id": eid,
"severity": sev,
"component": random.choice(["api", "db", "cache"]),
"message": random.choice(msgs[sev]),
"timestamp": int(time.time()),
"cpu": random.uniform(10 if sev=="INFO" else 70, 100)
}
producers:
# Produce system events every 2 seconds
system_event_producer:
generator: generate_system_event
interval: 2s
to: system_events
Processor - toTopicNameExtractor
example (click to expand)
streams:
system_events:
topic: system_events
keyType: string
valueType: json
critical_alerts:
topic: critical_alerts
keyType: string
valueType: json
error_logs:
topic: error_logs
keyType: string
valueType: json
warning_logs:
topic: warning_logs
keyType: string
valueType: json
info_logs:
topic: info_logs
keyType: string
valueType: json
functions:
route_by_severity:
type: topicNameExtractor
code: |
if value is None: return "info_logs"
sev = value.get("severity", "INFO")
if sev == "CRITICAL":
log.error("CRITICAL: {}", value.get("message"))
return {"CRITICAL": "critical_alerts", "ERROR": "error_logs",
"WARNING": "warning_logs"}.get(sev, "info_logs")
resultType: string
pipelines:
route_system_events:
from: system_events
toTopicNameExtractor:
topicNameExtractor: route_by_severity
What this example does:
- The producer generates system events with different severity levels (INFO, WARNING, ERROR, CRITICAL) from various system components
- The processor uses
toTopicNameExtractor
with a custom function to route events to different topics based on severity - Critical events are logged and sent to
critical_alerts
topic, while other severities go to their respective log topics - This pattern enables priority-based processing and separate handling of critical system issues
forEach
Processes each record with a side effect, typically used for logging or external actions. This is a terminal operation that does not forward records.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
forEach |
Object | Yes | Specifies the action to perform on each record |
The forEach
can be defined using:
- code
: A Python code block performing the side effect
Example
Full example for forEach
:
print
Prints each record to stdout for debugging purposes. This operation can use a custom mapper function to format the output, providing colored indicators and structured logging.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
mapper |
String | No | Name of keyValuePrinter function to format output |
prefix |
String | No | Optional prefix for the printed output |
Example
This example demonstrates printing debug messages with color-coded log levels and custom formatting:
Producer - print
example (click to expand)
streams:
debug_messages:
topic: debug_messages
keyType: string
valueType: json
functions:
generate_debug_message:
type: generator
resultType: (string,json)
code: |
import random, time
lvl = random.choice(["INFO", "WARNING", "ERROR", "DEBUG"])
msgs = {
"ERROR": ["DB timeout", "Payment failed"],
"WARNING": ["High CPU", "Rate limit near"],
"DEBUG": ["Auth check", "Loading config"],
"INFO": ["Service started", "Request done"]
}
rid = f"r{random.randint(100, 999)}"
return rid, {
"timestamp": int(time.time()),
"level": lvl,
"component": random.choice(["UserSvc", "PaySvc"]),
"message": random.choice(msgs[lvl]),
"request_id": rid,
"thread_id": f"t{random.randint(1, 5)}"
}
producers:
# Produce debug messages every 2 seconds
debug_message_producer:
generator: generate_debug_message
interval: 2s
to: debug_messages
Processor - print
example (click to expand)
streams:
debug_messages:
topic: debug_messages
keyType: string
valueType: json
functions:
format_debug_output:
type: keyValuePrinter
code: |
if value is None: return f"[NULL] {key}"
lvl = value.get("level", "?")
colors = {"ERROR": "red", "WARNING": "yellow", "INFO": "green", "DEBUG": "blue"}
return f"{colors.get(lvl, 'white')} [{lvl}] {value.get('component')} | {value.get('message')} | {value.get('request_id')}"
resultType: string
pipelines:
print_debug_messages:
from: debug_messages
print:
mapper: format_debug_output
prefix: "DEBUG_CONSOLE: "
What this example does:
- The producer generates different types of debug messages (INFO, WARNING, ERROR, DEBUG) from various system components
- The processor uses
print
with a customkeyValuePrinter
function to format each message with color indicators ("red" for ERROR, "yellow" for WARNING, "green" for INFO, "blue" for DEBUG) - The formatted output includes component name, message text, request ID, and thread information for comprehensive debugging
Control Flow Operations
Control flow operations manage the flow of data through your processing pipeline, allowing for branching logic and record observation.
branch
Splits a stream into multiple substreams based on conditions.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
branches |
Array | Yes | List of conditions and handling pipeline for each branch |
The tag branches
does not exist in the KSML language, but is meant to represent a composite object here that consists of two elements:
Parameter | Type | Required | Description |
---|---|---|---|
if |
Predicate | Yes | A condition which can evaluate to True or False. When True, the message is sent down the branch's pipeline |
pipeline |
Pipeline | Yes | A pipeline that contains a list of processing steps to send the message through |
Example
branch:
# Branch 1: Priority orders (premium customers, high value)
- if: is_priority
via:
- type: mapValues
mapper: add_priority_processing
- type: peek
forEach:
code: |
log.info("PRIORITY: Order {} - ${} premium order",
value.get("order_id"), value.get("total_amount"))
to: priority_orders
# Branch 2: Regional orders (US/EU, not priority)
- if: is_regional
via:
- type: mapValues
mapper: add_regional_processing
- type: peek
forEach:
code: |
log.info("REGIONAL: Order {} from {}",
value.get("order_id"), value.get("region"))
to: regional_orders
# Branch 3: International orders
- if: is_international
via:
- type: mapValues
mapper: add_international_processing
- type: peek
forEach:
code: |
log.info("INTERNATIONAL: Order {} from {} (customs required)",
value.get("order_id"), value.get("region"))
to: international_orders
Full example for branch
:
peek
Performs a side effect on each record without changing it.
Parameters
Parameter | Type | Required | Description |
---|---|---|---|
forEach |
Object | Yes | Specifies the action to perform on each record |
The forEach
can be defined using:
expression
: A simple expression (rarely used for peek)code
: A Python code block performing the side effect
Example
- type: peek
forEach:
code: |
log.info("PRIORITY: Order {} - ${} premium order",
value.get("order_id"), value.get("total_amount"))
Full example for peek
:
Combining Operations
Operations can be combined in various ways to create complex processing pipelines.
Sequential Operations
Operations are executed in sequence, with each operation processing the output of the previous operation.
pipelines:
my_pipeline:
from: input_stream
via:
- type: filter
if:
expression: value.get("amount") > 0
- type: transformValue
mapper:
code: enrich_transaction(value)
- type: peek
forEach:
code: |
log.info("Processed transaction: {}", value)
to: output_stream
Branching and Merging
You can create complex topologies by branching streams and merging them back together.
pipelines:
branch_pipeline:
from: input_stream
branch:
- if:
expression: value.get("type") == "A"
as: type_a_stream
- if:
expression: value.get("type") == "B"
as: type_b_stream
process_a_pipeline:
from: type_a_stream
via:
- type: mapValues
mapper:
code: process_type_a(value)
to: merged_stream
process_b_pipeline:
from: type_b_stream
via:
- type: mapValues
mapper:
code: process_type_b(value)
to: merged_stream
Best Practices
- Chain operations thoughtfully: Consider the performance implications of chaining multiple operations.
- Use stateless operations when possible: Stateless operations are generally more efficient than stateful ones.
- Be careful with window sizes: Large windows can consume significant memory.
- Handle errors gracefully: Use error handling operations to prevent pipeline failures.
- Monitor performance: Keep an eye on throughput and latency, especially for stateful operations.
How KSML Operations Relate to Kafka Streams
KSML operations are YAML-based wrappers around Kafka Streams topology operations. Understanding this relationship helps you leverage Kafka Streams documentation and concepts:
Direct Mappings
Stateless Transformation Operations
KSML Operation | Kafka Streams Method | Purpose |
---|---|---|
filter | KStream.processValues() / KTable.filter() |
Filter records based on conditions |
filterNot | KStream.processValues() / KTable.filterNot() |
Filter out matching records |
flatMap | KStream.process() |
Transform one record to multiple records |
map | KStream.process() |
Transform both key and value |
mapKey | KStream.process() |
Transform only the key |
mapValues | KStream.processValues() / KTable.transformValues() |
Transform only the value |
selectKey | KStream.process() |
Select new key from record content |
transformKey | KStream.process() |
Transform key using custom function |
transformValue | KStream.processValues() |
Transform value using custom function |
Format Conversion Operations
KSML Operation | Kafka Streams Method | Purpose |
---|---|---|
convertKey | KStream.processValues() |
Convert key data format |
convertValue | KStream.processValues() |
Convert value data format |
Grouping & Partitioning Operations
KSML Operation | Kafka Streams Method | Purpose |
---|---|---|
groupBy | KStream.groupBy() / KTable.groupBy() |
Group by new key |
groupByKey | KStream.groupByKey() |
Group by existing key |
repartition | KStream.repartition() |
Redistribute across partitions |
Stateful Aggregation Operations
KSML Operation | Kafka Streams Method | Purpose |
---|---|---|
aggregate | KGroupedStream.aggregate() / KGroupedTable.aggregate() |
Custom aggregation logic |
count | KGroupedStream.count() / KGroupedTable.count() |
Count records per key |
reduce | KGroupedStream.reduce() / KGroupedTable.reduce() |
Reduce to single value per key |
Join Operations
KSML Operation | Kafka Streams Method | Purpose |
---|---|---|
join | KStream.join() / KTable.join() |
Inner join streams/tables |
leftJoin | KStream.leftJoin() / KTable.leftJoin() |
Left outer join streams/tables |
merge | KStream.merge() |
Merge multiple streams into one |
outerJoin | KStream.outerJoin() / KTable.outerJoin() |
Full outer join streams/tables |
Windowing Operations
KSML Operation | Kafka Streams Method | Purpose |
---|---|---|
windowBySession | KGroupedStream.windowedBy(SessionWindows) |
Session-based windowing |
windowByTime | KGroupedStream.windowedBy(TimeWindows) |
Time-based windowing |
Output Operations
KSML Operation | Kafka Streams Method | Purpose |
---|---|---|
forEach | KStream.processValues() |
Side effects without output |
KStream.processValues() |
Print to stdout/file | |
to | KStream.to() |
Send to Kafka topic |
toTopicNameExtractor | KStream.to(TopicNameExtractor) |
Dynamic topic routing |
Control Flow Operations
KSML Operation | Kafka Streams Method | Purpose |
---|---|---|
branch | KStream.split() |
Split into multiple branches |
peek | KStream.processValues() |
Observe records without changes |
Key Implementation Details
- Most KSML operations use
KStream.process()
orKStream.processValues()
with custom processor suppliers rather than direct DSL methods. This enables seamless integration with KSML's Python function execution system. - Operations automatically adapt to work with KStream, KTable, and windowed streams, mapping to the appropriate Kafka Streams method based on context.
- Stateful operations support configurable state stores through KSML's unified state management system.
- Each operation integrates with Python functions through specialized user function wrappers (
UserPredicate
,UserKeyTransformer
, etc.).