Skip to content

Operation Reference

This document provides a comprehensive reference for all operations available in KSML. Each operation is described with its parameters, behavior, and examples.

Introduction

Operations are the building blocks of stream processing in KSML. They define how data is transformed, filtered, aggregated, and otherwise processed as it flows through your application. Operations form the middle part of pipelines, taking input from the previous operation and producing output for the next operation.

Understanding the different types of operations and when to use them is crucial for building effective stream processing applications.

Operations Overview

KSML supports 28 operations for stream processing. Each operation serves a specific purpose in transforming, filtering, aggregating, or routing data:

Operation Purpose Common Use Cases
Stateless Transformation Operations
flatMap Transform one record into multiple records Split batch messages, expand arrays
map Transform both key and value Change message format, enrich data
mapKey Transform only the key Change partitioning key
mapValues Transform only the value (preserves key) Modify payload without affecting partitioning
selectKey Select a new key from the value Extract key from message content
transformKey Transform key using custom function Complex key transformations
transformValue Transform value using custom function Complex value transformations
Filtering Operations
filter Keep records that match a condition Remove unwanted messages
filterNot Remove records that match a condition Exclude specific messages
Format Conversion Operations
convertKey Convert key format (e.g., JSON to Avro) Change serialization format
convertValue Convert value format (e.g., JSON to Avro) Change serialization format
Grouping & Partitioning Operations
groupBy Group by a new key Prepare for aggregation with new key
groupByKey Group by existing key Prepare for aggregation
repartition Redistribute records across partitions Custom partitioning logic
Stateful Aggregation Operations
aggregate Build custom aggregations Complex calculations, custom state
count Count records per key Track occurrences
reduce Combine records with same key Accumulate values
Join Operations
join Inner join two streams Correlate related events
leftJoin Left outer join two streams Include all left records
merge Combine multiple streams into one Stream unification
outerJoin Full outer join two streams Include all records from both sides
Windowing Operations
windowBySession Group into session windows User session analysis
windowByTime Group into fixed time windows Time-based aggregations
Output Operations
forEach Process without producing output Side effects, external calls
print Print to console Debugging, monitoring
to Send to a specific topic Write results to Kafka
toTopicNameExtractor Send to dynamically determined topic Route to different topics
Control Flow Operations
branch Split stream into multiple branches Conditional routing
peek Observe records without modification Logging, debugging

Choosing the Right Operation

When designing your KSML application, consider these factors:

  • State Requirements: Stateful operations (aggregations, joins) require state stores and more resources
  • Partitioning: Operations like groupBy may trigger data redistribution
  • Performance: Some operations are more computationally expensive than others
  • Error Handling: Use try operations to handle potential failures gracefully

Stateless Transformation Operations

Stateless transformation operations modify records (key, value, or both) without maintaining state between records. These operations are the most common building blocks for data processing pipelines.

map

Transforms both the key and value of each record.

Parameters

Parameter Type Required Description
mapper Object Yes Specifies how to transform the key and value

The mapper can be defined using:

  • expression: A simple expression returning a tuple (key, value)
  • code: A Python code block returning a tuple (key, value)

Example

      - type: map
        mapper: extract_fields

Full example for map:

mapValues

Transforms the value of each record without changing the key.

Parameters

Parameter Type Required Description
mapper Object Yes Specifies how to transform the value

The mapper can be defined using:

  • expression: A simple expression
  • code: A Python code block

Example

          - type: mapValues
            mapper: add_priority_processing

Full example for mapValues:

mapKey

Transforms the key of each record without modifying the value.

Parameters

Parameter Type Required Description
mapper Object Yes Specifies how to transform the key

The mapper can be defined using: - expression: A simple expression returning the new key - code: A Python code block returning the new key

Example

      - type: mapKey
        mapper: extract_region_key

Full example for mapKey:

flatMap

Transforms each record into zero or more records, useful for splitting batch messages into individual records.

Parameters

Parameter Type Required Description
mapper Object Yes Specifies how to transform each record into multiple records

The mapper must specify:

  • resultType: Format "[(keyType,valueType)]" indicating list of tuples
  • code: Python code returning a list of tuples [(key, value), ...]

Example

      - type: flatMap
        mapper:
          resultType: list(tuple(string, json))
          code: |
            return [(f"{value['order_id']}_{i['item_id']}", {
              "order_id": value['order_id'],
              "customer_id": value['customer_id'],
              "item_id": i['item_id'],
              "quantity": i['quantity'],
              "total": i['quantity'] * i['price']
            }) for i in value['items']]

This example splits order batches containing multiple items into individual item records:

Producer - flatMap example (click to expand)
functions:
  generate_order_batch:
    type: generator
    globalCode: |
      import json, random
      counter = 0
    code: |
      global counter
      counter += 1
      cust = random.choice(["alice", "bob", "charlie"])
      batch = {
        "order_id": f"ord-{counter}",
        "customer_id": cust,
        "items": [
          {"item_id": "laptop", "quantity": 1, "price": 999.99},
          {"item_id": "mouse", "quantity": 2, "price": 29.99},
          {"item_id": "keyboard", "quantity": 1, "price": 79.99}
        ]
      }
      return cust, json.dumps(batch)
    resultType: (string, json)

producers:
  order_batch_producer:
    generator: generate_order_batch
    interval: 3s
    to:
      topic: order_batches
      keyType: string
      valueType: json
Processor - flatMap example (click to expand)
streams:
  order_batches_input:
    topic: order_batches
    keyType: string
    valueType: json

pipelines:
  main:
    from: order_batches_input
    via:
      - type: flatMap
        mapper:
          resultType: list(tuple(string, json))
          code: |
            return [(f"{value['order_id']}_{i['item_id']}", {
              "order_id": value['order_id'],
              "customer_id": value['customer_id'],
              "item_id": i['item_id'],
              "quantity": i['quantity'],
              "total": i['quantity'] * i['price']
            }) for i in value['items']]
    to:
      topic: individual_items
      keyType: string
      valueType: json

What this example does:

  • The producer generates order batches containing multiple items.
  • The processor uses flatMap to split each order batch into individual item records - transforming 1 input record into 3 output records (one per item).
  • Each output record has a unique key combining order ID and item ID, with calculated total prices per item.

selectKey

Changes the key of each record without modifying the value. This operation extracts a new key from the existing key and/or value, enabling data repartitioning and preparation for joins based on different key attributes.

Parameters

Parameter Type Required Description
mapper Object Yes Specifies how to derive the new key from the key/value

The mapper can be defined using:

  • expression: A simple expression returning the new key (can use both key and value)
  • code: A Python code block returning the new key (can use both key and value)

Example

      - type: selectKey
        mapper:
          expression: value.get("user_id")

This example demonstrates changing the key from session_id to user_id for better data organization:

Producer - selectKey example (click to expand)
streams:
  user_events:
    topic: user_events
    keyType: string
    valueType: json

functions:
  generate_user_event:
    type: generator
    resultType: (string,json)
    code: |
      import random, time
      uid = random.choice(["u01", "u02", "u03"])
      evt = random.choice(["login", "purchase", "view"])
      sid = f"s{random.randint(100, 999)}"
      data = {
        "user_id": uid,
        "event_type": evt,
        "session_id": sid,
        "timestamp": int(time.time())
      }
      if evt == "purchase":
        data["amount"] = round(random.uniform(10, 500), 2)
      return (sid, data)

producers:
  # Produce user events every 3 seconds
  user_event_producer:
    generator: generate_user_event
    interval: 3s
    to: user_events
Processor - selectKey example (click to expand)
streams:
  user_events:
    topic: user_events
    keyType: string
    valueType: json

  user_keyed_events:
    topic: user_keyed_events
    keyType: string
    valueType: json

pipelines:
  rekey_user_events:
    from: user_events
    via:
      - type: selectKey
        mapper:
          expression: value.get("user_id")
    to: user_keyed_events

What this example does:

  • The producer generates user events (login, purchase, view, logout, search) with different session IDs as keys
  • The processor uses selectKey to change the key from session_id to user_id, enabling better data partitioning for user-centric analytics
  • This rekeying allows subsequent operations to group and aggregate data by user rather than by session

transformKey

Transforms the key using a custom transformer function.

Parameters

Parameter Type Required Description
mapper String Yes Name of the key transformer function

Example

      - type: transformKey
        mapper: extract_customer_id

Full example for transformKey:

transformValue

Transforms the value using a custom transformer function.

Parameters

Parameter Type Required Description
mapper String Yes Name of the value transformer function

Example

      - type: transformValue
        mapper: convert_temperature

Full example for transformValue:

Filtering Operations

Filtering operations selectively pass or remove records based on conditions, allowing you to control which data continues through your processing pipeline.

filter

Keeps only records that satisfy a condition.

Parameters

Parameter Type Required Description
if Object Yes Specifies the condition

The if can be defined using:

  • expression: A simple boolean expression
  • code: A Python code block returning a boolean

Example

      - type: filter
        if: is_critical_sensor

Full example for filter:

filterNot

Excludes records that satisfy a condition (opposite of filter). Records are kept when the condition returns false.

Parameters

Parameter Type Required Description
if Object Yes Specifies the condition

The if parameter must reference a predicate function that returns a boolean.

Example

      - type: filterNot
        if: is_inactive_product

This example filters out products with "inactive" status, keeping all other products:

Producer - filterNot example (click to expand)
functions:
  generate_product:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter
      counter += 1
      pid = f"p{counter:03d}"
      return pid, {
        "product_id": pid,
        "name": random.choice(["laptop", "phone", "tablet"]),
        "status": random.choice(["active", "inactive", "pending"]),
        "price": round(random.uniform(50, 1500), 2)
      }
    resultType: (string, json)

producers:
  product_producer:
    generator: generate_product
    interval: 2s
    to:
      topic: product_stream
      keyType: string
      valueType: json
Processor - filterNot example (click to expand)
streams:
  product_input:
    topic: product_stream
    keyType: string
    valueType: json

functions:
  is_inactive_product:
    type: predicate
    code: |
      if value is None:
        return False

      status = value.get("status", "")
      if status == "inactive":
        print(f"Filtering out inactive product: {value.get('product_id')} - {value.get('name')}")
        return True

      return False

pipelines:
  main:
    from: product_input
    via:
      - type: filterNot
        if: is_inactive_product
      - type: peek
        forEach:
          code: |
            print(f"Active product kept: {key} - {value['name']} (status: {value['status']}, price: ${value['price']})")
    to:
      topic: active_products
      keyType: string
      valueType: json

What this example does:

  • The producer generates products with different statuses: active, inactive, pending, discontinued
  • The processor uses filterNot with a predicate function to exclude products with "inactive" status
  • Products with other statuses (active, pending, discontinued) are kept and passed through to the output topic

Format Conversion Operations

Format conversion operations change the serialization format of keys or values without altering the actual data content.

convertKey

Converts the key to a different data format.

Parameters

Parameter Type Required Description
into String Yes Target format for the key

Example

      - type: convertKey
        into: json:windowed(string)

Full example for convertKey:

convertValue

Converts the value to a different data format.

Parameters

Parameter Type Required Description
into String Yes Target format for the value

Example

      - type: convertValue
        into: json

Full example for convertValue:

Grouping & Partitioning Operations

Grouping and partitioning operations organize data by keys and control how records are distributed across partitions, preparing data for aggregation or improving processing parallelism.

groupBy

Groups records by a new key derived from the record.

Parameters

Parameter Type Required Description
keySelector Object Yes Specifies how to select the new key

The keySelector can be defined using: - expression: A simple expression returning the grouping key - code: A Python code block returning the grouping key

Example

      - type: groupBy
        name: group_by_sensor_type
        mapper:
          code: |
            if value is None:
              return "unknown"  
            if not "sensor_type" in value:
              return "unknown"
          expression: value["sensor_type"]
          resultType: string

Full example for groupBy:

groupByKey

Groups records by their existing key for subsequent aggregation operations.

Parameters

None. This operation is typically followed by an aggregation operation.

Example

      - type: groupByKey

Full example for groupByKey:

repartition

Redistributes records across partitions, optionally using custom partitioning logic. This operation allows you to control data distribution for performance optimization or to meet specific processing requirements.

Parameters

Parameter Type Required Description
numberOfPartitions Integer No Number of partitions for redistribution
partitioner String No Function name for custom partitioning logic

Example

Note:

To test this repartition example, ensure your topics have sufficient partitions. The example requires minimum 4 partitions since it redistributes to 4 partitions (0-3). Update your docker-compose.yml:

kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 4 --replication-factor 1 --topic user_activities
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 4 --replication-factor 1 --topic repartitioned_activities
      # Repartition with custom partitioner for user-based distribution
      - type: repartition
        numberOfPartitions: 4
        partitioner: activity_partitioner
Producer - repartition example (click to expand)
functions:
  generate_user_activity:
    type: generator
    globalCode: |
      import time
      import random
      activity_id = 0
      user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]
      activity_types = ["login", "purchase", "browse", "logout", "search"]
      regions = ["north", "south", "east", "west"]
    code: |
      global activity_id

      activity_id += 1

      # Generate user activity with region as initial key
      region = random.choice(regions)
      activity = {
        "activity_id": f"activity_{activity_id:04d}",
        "user_id": random.choice(user_ids),
        "activity_type": random.choice(activity_types),
        "timestamp": int(time.time() * 1000),
        "region": region
      }

      # Log the activity generation
      log.info("Generated activity: {} for user {} in region {} - type: {}", 
               activity["activity_id"], activity["user_id"], region, activity["activity_type"])

      # Use region as key initially (will be repartitioned by user_id later)
      key = region
      value = activity
    expression: (key, value)
    resultType: (string, json)

producers:
  user_activity_producer:
    generator: generate_user_activity
    interval: 3s
    to:
      topic: user_activities
      keyType: string
      valueType: json
Processor - repartition example (click to expand)
streams:
  user_activities:
    topic: user_activities
    keyType: string
    valueType: json
    offsetResetPolicy: earliest

  repartitioned_activities:
    topic: repartitioned_activities
    keyType: string
    valueType: json

functions:
  # Custom partitioner that distributes activities by user type
  # Users ending in even numbers get partition 0-1, odd numbers get partition 2-3
  activity_partitioner:
    type: streamPartitioner
    code: |
      # Custom partitioning logic based on user_id pattern
      # This demonstrates intelligent partitioning for user-based processing

      if value and "user_id" in value:
        user_id = value["user_id"]
        # Extract user number from user_001, user_002, etc.
        user_num = int(user_id.split("_")[-1])

        # Even user numbers (002, 004) -> partitions 0-1
        # Odd user numbers (001, 003, 005) -> partitions 2-3
        if user_num % 2 == 0:
          partition = user_num % 2  # 0 or 1
        else:
          partition = 2 + (user_num % 2)  # 2 or 3

        log.debug("Routing user {} (num:{}) to partition {}", 
                  user_id, user_num, partition)

        return partition

      # Default to partition 0
      return 0
    resultType: integer

pipelines:
  repartition_activities:
    from: user_activities
    via:
      # First, change key from region to user_id for user-based processing
      - type: mapKey
        mapper:
          code: |
            # Extract user_id from the activity to become the new key
            if value and "user_id" in value:
              user_key = value["user_id"]
              log.info("Changing key from region '{}' to user_id '{}'", key, user_key)
              result = user_key
            else:
              result = key
          expression: result
          resultType: string

      # Add processing metadata
      - type: transformValue
        mapper:
          code: |
            # Add repartitioning info to track the transformation
            if value:
              value["processing_info"] = f"Repartitioned by user: {value.get('user_id', 'unknown')}"
              value["original_region"] = key  # Keep track of original region
            result = value
          expression: result
          resultType: json

      # Repartition with custom partitioner for user-based distribution
      - type: repartition
        numberOfPartitions: 4
        partitioner: activity_partitioner

      # Log the repartitioned activity
      - type: peek
        forEach:
          code: |
            if value:
              log.info("Repartitioned activity {}: {} -> user-based partitioning applied", 
                       key, value.get("processing_info", "unknown"))

    # Send to output topic to observe the repartitioning results
    to:
      topic: repartitioned_activities
      keyType: string
      valueType: json

The repartition operation demonstrates data redistribution by changing keys from regions to user IDs, then using custom partitioning logic to distribute activities based on user patterns. This ensures related user activities are processed together while optimizing partition utilization.

What the example does:

Demonstrates intelligent data redistribution for user-centric processing:

  • Changes partitioning strategy from region-based to user-based keys
  • Applies custom partitioning logic based on user ID patterns
  • Routes even user numbers (002, 004) to partitions 0-1
  • Routes odd user numbers (001, 003, 005) to partitions 2-3
  • Producer generates activities initially keyed by region for realistic repartitioning scenario

Key Features:

  • Dynamic key transformation from region to user_id
  • Custom partition calculation based on user patterns
  • Guaranteed co-location of activities for the same user
  • Processing metadata tracking for observability
  • Explicit partition count handling (4 partitions total)
  • Fallback to partition 0 for edge cases

Expected Results:

When running this example, you'll see log messages like:

  • "Generated activity: activity_0001 for user user_001 in region south - type: purchase" - Producer creating activities
  • "Changing key from region 'south' to user_id 'user_001'" - Key transformation
  • "Repartitioned activity user_001: Repartitioned by user: user_001 -> user-based partitioning applied" - Successful repartitioning
  • User_001 and user_003 (odd numbers) go to partitions 2-3
  • User_002 and user_004 (even numbers) go to partitions 0-1
  • Activities for the same user are guaranteed to be processed in order

Stateful Aggregation Operations

Stateful aggregation operations maintain state between records to perform calculations like counting, summing, or building custom aggregates based on record keys.

aggregate

Aggregates records by key using a custom aggregation function.

Parameters

Parameter Type Required Description
initializer Object Yes Specifies the initial value for the aggregation
aggregator Object Yes Specifies how to combine the current record with the aggregate

Both initializer and aggregator can be defined using:

  • expression: A simple expression
  • code: A Python code block

Example

      - type: aggregate
        store:
          type: window
          windowSize: 1m
          retention: 10m
        initializer: initialize_sales_stats
        aggregator: aggregate_sales

Full example for aggregate:

count

Counts the number of records for each key.

Parameters

None.

Example

      - type: count

Full example for count:

reduce

Combines records with the same key using a reducer function.

Parameters

Parameter Type Required Description
reducer Object Yes Specifies how to combine two values

The reducer can be defined using:

  • expression: A simple expression
  • code: A Python code block

Example

      - type: reduce
        reducer: sum_amounts

Full example for reduce:

Join Operations

Join operations combine data from multiple streams or tables based on matching keys, enabling you to correlate related events from different data sources.

join

Performs an inner join between two streams.

Parameters

Parameter Type Required Description
stream String Yes The name of the stream to join with
table String Yes The name of the table to join with (for stream-table joins)
valueJoiner Object Yes Function that defines how to combine values from both sides
timeDifference Duration No The time difference for the join window (for stream-stream joins)
grace Duration No Grace period for late-arriving data (for stream-stream joins)
foreignKeyExtractor Object No Function to extract foreign key (for stream-table joins)
partitioner String No Function name for custom partitioning of current stream
otherPartitioner String No Function name for custom partitioning of join stream/table

Example

      - type: join
        stream: product_purchases
        valueJoiner: correlate_click_and_purchase
        timeDifference: 30m  # Look for purchases within 30 minutes of a click
        grace: 5m  # Grace period for late events
        thisStore:
          name: clicks_join_store
          type: window
          windowSize: 60m  # Must be 2*timeDifference
          retention: 65m   # Must be 2*timeDifference + grace = 60m + 5m
          retainDuplicates: true
        otherStore:
          name: purchases_join_store
          type: window
          windowSize: 60m  # Must be 2*timeDifference
          retention: 65m   # Must be 2*timeDifference + grace = 60m + 5m
          retainDuplicates: true

Full example for join:

leftJoin

Performs a left join between two streams.

Parameters

Parameter Type Required Description
stream String Yes The name of the stream to join with
table String Yes The name of the table to join with (for stream-table joins)
valueJoiner Object Yes Function that defines how to combine values from both sides
timeDifference Duration No The time difference for the join window (for stream-stream joins)
grace Duration No Grace period for late-arriving data (for stream-stream joins)
foreignKeyExtractor Object No Function to extract foreign key (for stream-table joins)
partitioner String No Function name for custom partitioning of current stream
otherPartitioner String No Function name for custom partitioning of join stream/table

Example

      - type: leftJoin
        table: user_locations
        valueJoiner: join_activity_with_location

Full example for leftJoin:

merge

Merges multiple streams with identical key and value types into a single unified stream. The merge operation combines streams without any joining logic - it simply forwards all records from all input streams to the output stream in the order they arrive.

Parameters

Parameter Type Required Description
stream String Yes The name of the stream to merge with the main stream

Example

    to:
      topic: merged_stream
Producer - merge example (click to expand)
functions:
  generate_stream_a:
    type: generator
    globalCode: |
      import random
      message_id = 0
      colors = ["red", "blue", "green"]
    code: |
      global message_id
      message_id += 1

      message = {
        "id": f"stream_a_{message_id}",
        "color": random.choice(colors),
        "source": "stream_a"
      }

      log.info("Generated message from stream A: {} - color: {}", message["id"], message["color"])

      key = message["color"]
      value = message
    expression: (key, value)
    resultType: (string, json)

  generate_stream_b:
    type: generator
    globalCode: |
      import random
      message_id = 0
      colors = ["red", "blue", "green"]
    code: |
      global message_id
      message_id += 1

      message = {
        "id": f"stream_b_{message_id}",
        "color": random.choice(colors),
        "source": "stream_b"
      }

      log.info("Generated message from stream B: {} - color: {}", message["id"], message["color"])

      key = message["color"]
      value = message
    expression: (key, value)
    resultType: (string, json)

producers:
  stream_a_producer:
    generator: generate_stream_a
    interval: 3s
    to:
      topic: stream_a
      keyType: string
      valueType: json

  stream_b_producer:
    generator: generate_stream_b
    interval: 2s
    to:
      topic: stream_b
      keyType: string
      valueType: json
Processor - merge example (click to expand)
streams:
  stream_a:
    topic: stream_a
    keyType: string
    valueType: json
    offsetResetPolicy: earliest

  stream_b:
    topic: stream_b
    keyType: string
    valueType: json
    offsetResetPolicy: earliest

  merged_stream:
    topic: merged_stream
    keyType: string
    valueType: json

pipelines:
  merge_streams:
    from: stream_a
    via:
      # Merge with stream B
      - type: merge
        stream: stream_b

      # Log merged messages
      - type: peek
        forEach:
          code: |
            if value:
              log.info("Merged message: {} from {} - color: {}", 
                       value.get("id", "unknown"), 
                       value.get("source", "unknown"), 
                       value.get("color", "unknown"))
    to:
      topic: merged_stream
      keyType: string
      valueType: json

What This Example Does:

The example demonstrates merging two independent streams (stream_a and stream_b) into a single processing pipeline. Both producers generate messages with a color key and JSON values containing an id, color, and source field. The merge operation combines both streams so that messages from either stream flow through the same downstream processing.

How the Merge Operation Works:

  • Stream Union: The merge operation creates a simple union of multiple streams - all records from all input streams are forwarded to the output
  • No Transformation: Records pass through unchanged, maintaining their original keys and values
  • Interleaved Processing: Messages from different streams are processed as they arrive, interleaved based on timing
  • Shared Pipeline: After merging, both streams share the same downstream operations (in this example, the peek operation logs all messages)

Important Notes:

  • All streams being merged must have identical key and value types
  • Records maintain their original timestamps and ordering per stream
  • No complex joining logic - this is a simple stream union operation
  • Can merge any number of streams by chaining multiple merge operations

Expected Results:

When running this example, you'll see interleaved log messages like:

  • "Generated message from stream A: stream_a_1 - color: green" - Producer A creating messages every 3 seconds
  • "Generated message from stream B: stream_b_2 - color: blue" - Producer B creating messages every 2 seconds
  • "Merged message: stream_a_1 from stream_a - color: green" - Merged pipeline processing stream A messages
  • "Merged message: stream_b_2 from stream_b - color: blue" - Same pipeline processing stream B messages

Both streams flow through the unified pipeline after merging, demonstrating how merge combines multiple data sources for shared processing.

outerJoin

Performs an outer join between two streams.

Parameters

Parameter Type Required Description
stream String Yes The name of the stream to join with
table String Yes The name of the table to join with (for stream-table joins)
valueJoiner Object Yes Function that defines how to combine values from both sides
timeDifference Duration No The time difference for the join window (for stream-stream joins)
grace Duration No Grace period for late-arriving data (for stream-stream joins)
foreignKeyExtractor Object No Function to extract foreign key (for stream-table joins)
partitioner String No Function name for custom partitioning of current stream
otherPartitioner String No Function name for custom partitioning of join stream/table

Example

      - type: outerJoin
        stream: user_logouts
        valueJoiner: analyze_user_session
        timeDifference: 10m  # Look for logouts within 10 minutes of login
        grace: 2m  # Grace period for late events
        thisStore:
          name: login_session_store
          type: window
          windowSize: 20m  # Must be 2*timeDifference
          retention: 22m   # Must be 2*timeDifference + grace
          retainDuplicates: true
        otherStore:
          name: logout_session_store
          type: window
          windowSize: 20m  # Must be 2*timeDifference
          retention: 22m   # Must be 2*timeDifference + grace
          retainDuplicates: true

Full example for outerJoin:

Windowing Operations

Windowing operations group records into time-based windows, enabling temporal aggregations and time-bounded processing.

windowByTime

Groups records into time windows.

Parameters

Parameter Type Required Description
windowType String No The type of window (tumbling, hopping, or sliding)
timeDifference Duration Yes The duration of the window
advanceBy Long No Only required for hopping windows, how often to advance the window
grace Long No Grace period for late-arriving data

Example

      - type: windowByTime
        windowType: tumbling
        duration: 5m
        grace: 30s

Full example for windowByTime:

windowBySession

Groups records into session windows, where events with timestamps within inactivityGap durations are seen as belonging to the same session.

Parameters

Parameter Type Required Description
inactivityGap Duration Yes The maximum duration between events before they are seen as belonging to a different session
grace Long No Grace period for late-arriving data

Example

      - type: windowBySession
        inactivityGap: 2m  # Close session after 2 minutes of inactivity
        grace: 30s

Full example for windowBySession:

Output Operations

Output operations represent the end of a processing pipeline, sending records to topics or performing terminal actions like logging.

to

Sends records to a specific Kafka topic.

Parameters

Parameter Type Required Description
topic String Yes The name of the target topic
keyType String No The data type of the key
valueType String No The data type of the value
partitioner String No Function name for custom partitioning logic

Example

    to: output_stream

Example with Custom Partitioner

    to:
      topic: partitioned_orders
      keyType: string
      valueType: json
      partitioner: priority_region_partitioner

Full example for to:

Full example for to with partitioner:

toTopicNameExtractor

Sends records to topics determined dynamically based on the record content. This operation enables content-based routing, allowing you to distribute messages to different topics based on their attributes, priorities, or business logic.

Parameters

Parameter Type Required Description
topicNameExtractor String Yes Name of the function that determines the topic name
partitioner String No Function name for custom partitioning logic

Example

    toTopicNameExtractor:
      topicNameExtractor: route_by_severity

This example demonstrates routing system events to different topics based on severity level:

Producer - toTopicNameExtractor example (click to expand)
streams:
  system_events:
    topic: system_events
    keyType: string
    valueType: json

functions:
  generate_system_event:
    type: generator
    resultType: (string,json)
    code: |
      import random, time
      sev = random.choice(["INFO", "WARNING", "ERROR", "CRITICAL"])
      msgs = {
        "CRITICAL": ["System failure", "Service down"],
        "ERROR": ["Request timeout", "DB failed"],
        "WARNING": ["High CPU", "Disk low"],
        "INFO": ["Service started", "Health OK"]
      }
      eid = f"e{random.randint(1000, 9999)}"
      return eid, {
        "event_id": eid,
        "severity": sev,
        "component": random.choice(["api", "db", "cache"]),
        "message": random.choice(msgs[sev]),
        "timestamp": int(time.time()),
        "cpu": random.uniform(10 if sev=="INFO" else 70, 100)
      }

producers:
  # Produce system events every 2 seconds
  system_event_producer:
    generator: generate_system_event
    interval: 2s
    to: system_events
Processor - toTopicNameExtractor example (click to expand)
streams:
  system_events:
    topic: system_events
    keyType: string
    valueType: json

  critical_alerts:
    topic: critical_alerts
    keyType: string
    valueType: json

  error_logs:
    topic: error_logs
    keyType: string
    valueType: json

  warning_logs:
    topic: warning_logs
    keyType: string
    valueType: json

  info_logs:
    topic: info_logs
    keyType: string
    valueType: json

functions:
  route_by_severity:
    type: topicNameExtractor
    code: |
      if value is None: return "info_logs"
      sev = value.get("severity", "INFO")
      if sev == "CRITICAL":
        log.error("CRITICAL: {}", value.get("message"))
      return {"CRITICAL": "critical_alerts", "ERROR": "error_logs", 
              "WARNING": "warning_logs"}.get(sev, "info_logs")
    resultType: string

pipelines:
  route_system_events:
    from: system_events
    toTopicNameExtractor:
      topicNameExtractor: route_by_severity

What this example does:

  • The producer generates system events with different severity levels (INFO, WARNING, ERROR, CRITICAL) from various system components
  • The processor uses toTopicNameExtractor with a custom function to route events to different topics based on severity
  • Critical events are logged and sent to critical_alerts topic, while other severities go to their respective log topics
  • This pattern enables priority-based processing and separate handling of critical system issues

forEach

Processes each record with a side effect, typically used for logging or external actions. This is a terminal operation that does not forward records.

Parameters

Parameter Type Required Description
forEach Object Yes Specifies the action to perform on each record

The forEach can be defined using: - code: A Python code block performing the side effect

Example

        forEach:
          code: |
            log_message(key, value)

Full example for forEach:

print

Prints each record to stdout for debugging purposes. This operation can use a custom mapper function to format the output, providing colored indicators and structured logging.

Parameters

Parameter Type Required Description
mapper String No Name of keyValuePrinter function to format output
prefix String No Optional prefix for the printed output

Example

    print:
      mapper: format_debug_output
      prefix: "DEBUG_CONSOLE: "

This example demonstrates printing debug messages with color-coded log levels and custom formatting:

Producer - print example (click to expand)
streams:
  debug_messages:
    topic: debug_messages
    keyType: string
    valueType: json

functions:
  generate_debug_message:
    type: generator
    resultType: (string,json)
    code: |
      import random, time
      lvl = random.choice(["INFO", "WARNING", "ERROR", "DEBUG"])
      msgs = {
        "ERROR": ["DB timeout", "Payment failed"],
        "WARNING": ["High CPU", "Rate limit near"],
        "DEBUG": ["Auth check", "Loading config"],
        "INFO": ["Service started", "Request done"]
      }
      rid = f"r{random.randint(100, 999)}"
      return rid, {
        "timestamp": int(time.time()),
        "level": lvl,
        "component": random.choice(["UserSvc", "PaySvc"]),
        "message": random.choice(msgs[lvl]),
        "request_id": rid,
        "thread_id": f"t{random.randint(1, 5)}"
      }

producers:
  # Produce debug messages every 2 seconds
  debug_message_producer:
    generator: generate_debug_message
    interval: 2s
    to: debug_messages
Processor - print example (click to expand)
streams:
  debug_messages:
    topic: debug_messages  
    keyType: string
    valueType: json

functions:
  format_debug_output:
    type: keyValuePrinter
    code: |
      if value is None: return f"[NULL] {key}"
      lvl = value.get("level", "?")
      colors = {"ERROR": "red", "WARNING": "yellow", "INFO": "green", "DEBUG": "blue"}
      return f"{colors.get(lvl, 'white')} [{lvl}] {value.get('component')} | {value.get('message')} | {value.get('request_id')}"
    resultType: string

pipelines:
  print_debug_messages:
    from: debug_messages
    print:
      mapper: format_debug_output
      prefix: "DEBUG_CONSOLE: "

What this example does:

  • The producer generates different types of debug messages (INFO, WARNING, ERROR, DEBUG) from various system components
  • The processor uses print with a custom keyValuePrinter function to format each message with color indicators ("red" for ERROR, "yellow" for WARNING, "green" for INFO, "blue" for DEBUG)
  • The formatted output includes component name, message text, request ID, and thread information for comprehensive debugging

Control Flow Operations

Control flow operations manage the flow of data through your processing pipeline, allowing for branching logic and record observation.

branch

Splits a stream into multiple substreams based on conditions.

Parameters

Parameter Type Required Description
branches Array Yes List of conditions and handling pipeline for each branch

The tag branches does not exist in the KSML language, but is meant to represent a composite object here that consists of two elements:

Parameter Type Required Description
if Predicate Yes A condition which can evaluate to True or False. When True, the message is sent down the branch's pipeline
pipeline Pipeline Yes A pipeline that contains a list of processing steps to send the message through

Example

    branch:
      # Branch 1: Priority orders (premium customers, high value)
      - if: is_priority
        via:
          - type: mapValues
            mapper: add_priority_processing
          - type: peek
            forEach:
              code: |
                log.info("PRIORITY: Order {} - ${} premium order", 
                       value.get("order_id"), value.get("total_amount"))
        to: priority_orders

      # Branch 2: Regional orders (US/EU, not priority)
      - if: is_regional
        via:
          - type: mapValues
            mapper: add_regional_processing
          - type: peek
            forEach:
              code: |
                log.info("REGIONAL: Order {} from {}", 
                       value.get("order_id"), value.get("region"))
        to: regional_orders

      # Branch 3: International orders  
      - if: is_international
        via:
          - type: mapValues
            mapper: add_international_processing
          - type: peek
            forEach:
              code: |
                log.info("INTERNATIONAL: Order {} from {} (customs required)", 
                       value.get("order_id"), value.get("region"))
        to: international_orders

Full example for branch:

peek

Performs a side effect on each record without changing it.

Parameters

Parameter Type Required Description
forEach Object Yes Specifies the action to perform on each record

The forEach can be defined using:

  • expression: A simple expression (rarely used for peek)
  • code: A Python code block performing the side effect

Example

          - type: peek
            forEach:
              code: |
                log.info("PRIORITY: Order {} - ${} premium order", 
                       value.get("order_id"), value.get("total_amount"))

Full example for peek:

Combining Operations

Operations can be combined in various ways to create complex processing pipelines.

Sequential Operations

Operations are executed in sequence, with each operation processing the output of the previous operation.

pipelines:
  my_pipeline:
    from: input_stream
    via:
      - type: filter
        if:
          expression: value.get("amount") > 0
      - type: transformValue
        mapper:
          code: enrich_transaction(value)
      - type: peek
        forEach:
          code: |
            log.info("Processed transaction: {}", value)
    to: output_stream

Branching and Merging

You can create complex topologies by branching streams and merging them back together.

pipelines:
  branch_pipeline:
    from: input_stream
    branch:
      - if:
          expression: value.get("type") == "A"
        as: type_a_stream
      - if:
          expression: value.get("type") == "B"
        as: type_b_stream

  process_a_pipeline:
    from: type_a_stream
    via:
      - type: mapValues
        mapper:
          code: process_type_a(value)
    to: merged_stream

  process_b_pipeline:
    from: type_b_stream
    via:
      - type: mapValues
        mapper:
          code: process_type_b(value)
    to: merged_stream

Best Practices

  • Chain operations thoughtfully: Consider the performance implications of chaining multiple operations.
  • Use stateless operations when possible: Stateless operations are generally more efficient than stateful ones.
  • Be careful with window sizes: Large windows can consume significant memory.
  • Handle errors gracefully: Use error handling operations to prevent pipeline failures.
  • Monitor performance: Keep an eye on throughput and latency, especially for stateful operations.

How KSML Operations Relate to Kafka Streams

KSML operations are YAML-based wrappers around Kafka Streams topology operations. Understanding this relationship helps you leverage Kafka Streams documentation and concepts:

Direct Mappings

Stateless Transformation Operations

KSML Operation Kafka Streams Method Purpose
filter KStream.processValues() / KTable.filter() Filter records based on conditions
filterNot KStream.processValues() / KTable.filterNot() Filter out matching records
flatMap KStream.process() Transform one record to multiple records
map KStream.process() Transform both key and value
mapKey KStream.process() Transform only the key
mapValues KStream.processValues() / KTable.transformValues() Transform only the value
selectKey KStream.process() Select new key from record content
transformKey KStream.process() Transform key using custom function
transformValue KStream.processValues() Transform value using custom function

Format Conversion Operations

KSML Operation Kafka Streams Method Purpose
convertKey KStream.processValues() Convert key data format
convertValue KStream.processValues() Convert value data format

Grouping & Partitioning Operations

KSML Operation Kafka Streams Method Purpose
groupBy KStream.groupBy() / KTable.groupBy() Group by new key
groupByKey KStream.groupByKey() Group by existing key
repartition KStream.repartition() Redistribute across partitions

Stateful Aggregation Operations

KSML Operation Kafka Streams Method Purpose
aggregate KGroupedStream.aggregate() / KGroupedTable.aggregate() Custom aggregation logic
count KGroupedStream.count() / KGroupedTable.count() Count records per key
reduce KGroupedStream.reduce() / KGroupedTable.reduce() Reduce to single value per key

Join Operations

KSML Operation Kafka Streams Method Purpose
join KStream.join() / KTable.join() Inner join streams/tables
leftJoin KStream.leftJoin() / KTable.leftJoin() Left outer join streams/tables
merge KStream.merge() Merge multiple streams into one
outerJoin KStream.outerJoin() / KTable.outerJoin() Full outer join streams/tables

Windowing Operations

KSML Operation Kafka Streams Method Purpose
windowBySession KGroupedStream.windowedBy(SessionWindows) Session-based windowing
windowByTime KGroupedStream.windowedBy(TimeWindows) Time-based windowing

Output Operations

KSML Operation Kafka Streams Method Purpose
forEach KStream.processValues() Side effects without output
print KStream.processValues() Print to stdout/file
to KStream.to() Send to Kafka topic
toTopicNameExtractor KStream.to(TopicNameExtractor) Dynamic topic routing

Control Flow Operations

KSML Operation Kafka Streams Method Purpose
branch KStream.split() Split into multiple branches
peek KStream.processValues() Observe records without changes

Key Implementation Details

  • Most KSML operations use KStream.process() or KStream.processValues() with custom processor suppliers rather than direct DSL methods. This enables seamless integration with KSML's Python function execution system.
  • Operations automatically adapt to work with KStream, KTable, and windowed streams, mapping to the appropriate Kafka Streams method based on context.
  • Stateful operations support configurable state stores through KSML's unified state management system.
  • Each operation integrates with Python functions through specialized user function wrappers (UserPredicate, UserKeyTransformer, etc.).