Skip to content

Function Reference

KSML functions let you implement custom stream-processing logic in Python. They make it easier for data scientists, analysts, and developers to process streaming data without needing Java or the Kafka Streams API.

Functions extend built-in operations, enabling custom business logic, transformations, and processing within the KSML runtime- combining Kafka Streams’ power with Python’s simplicity.

Function Definition Structure

Functions are defined in the functions section of your KSML definition file. Each function has the following properties:

Property Type Required Description
type String Yes The type of function (predicate, aggregator, valueJoiner, etc.)
parameters Array No Additional custom parameters to add to the function's built-in parameters (see note below)
globalCode String No Python code executed once upon startup
code String No Python code implementing the function
expression String No An expression that the function will return as value
resultType Data type Sometimes The data type returned by the function. Required when it cannot be derived from function type.
stores Array No List of state stores the function can access

Note about parameters: Every function type has built-in parameters that are automatically provided by KSML (e.g., key and value for most function types). The parameters property is only needed when you want to add custom parameters beyond these built-in ones. These additional parameters can then be passed when calling the function from Python code.

Writing KSML Functions

Example KSML Function Definition

Example KSML Function Definition
functions:
  # Example of a complete function definition with all components
  process_sensor_data:
    type: valueTransformer
    globalCode: |
      # This code runs once when the application starts
      import json
      import time

      # Initialize global variables
      sensor_threshold = 25.0
      alert_count = 0

    code: |
      # This code runs for each message
      global alert_count

      # Process the sensor value
      if value is None:
        return None

      temperature = value.get("temperature", 0)

      # Convert Celsius to Fahrenheit
      temperature_f = (temperature * 9/5) + 32

      # Check for alerts
      is_alert = temperature > sensor_threshold
      if is_alert:
        alert_count += 1
        log.warn("High temperature detected: {}°C", temperature)

      # Return enriched data
      result = {
        "original_temp_c": temperature,
        "temp_fahrenheit": temperature_f,
        "is_alert": is_alert,
        "total_alerts": alert_count,
        "processed_at": int(time.time() * 1000)
      }

      return result

    resultType: json

  # Example of a simple expression-based function
  is_high_priority:
    type: predicate
    expression: value.get("priority", 0) > 7
    resultType: boolean

KSML functions are defined in the functions section of your KSML definition file. A typical function definition includes:

  • Type: Specifies the function's purpose and behavior
  • Parameters: Input parameters the function accepts (defined by the function type)
  • GlobalCode: Python code executed only once upon application start
  • Code: Python code implementing the function's logic
  • Expression: Shorthand for simple return expressions
  • ResultType: The expected return type of the function

Function Definition Formats

KSML supports two formats for defining functions:

Expression Format

For simple, one-line functions:

functions:
  is_valid:
    type: predicate
    code: |
      # Code is optional here
    expression: value.get("status") == "ACTIVE"

Code Block Format

For more complex functions:

functions:
  process_transaction:
    type: keyValueMapper
    code: |
      result = {}

      # Copy basic fields
      result["transaction_id"] = value.get("id")
      result["amount"] = value.get("amount", 0)

      # Calculate fee
      amount = value.get("amount", 0)
      if amount > 1000:
        result["fee"] = amount * 0.02
      else:
        result["fee"] = amount * 0.03

      # Add timestamp
      result["processed_at"] = int(time.time() * 1000)

      return result
    resultType: struct

Function Parameters

Built-in vs Custom Parameters

Every function type in KSML has built-in parameters that are automatically provided by KSML. These are implicitly available in your function code without needing to declare them:

Most function types (like forEach, predicate, valueTransformer) automatically receive:

  • key - The record key
  • value - The record value

Some specialized types have different built-in parameters:

  • aggregator: receives key, value, and aggregate
  • merger: receives key, aggregate1, and aggregate2
  • initializer: receives no parameters

Adding Custom Parameters

The parameters property allows you to add custom parameters beyond the built-in ones. This is useful when:

  1. Creating reusable functions that can behave differently based on configuration
  2. Calling functions from Python code with specific arguments
  3. Using the generic function type which has no built-in parameters

Example WITHOUT custom parameters:

functions:
  simple_logger:
    type: forEach
    # Only uses built-in key and value parameters
    code: |
      log.info("Processing: key={}, value={}", key, value)

Example WITH custom parameters:

functions:
  configurable_logger:
    type: forEach
    parameters: # ADDS 'prefix' to the built-in key and value
      - name: prefix
        type: string
    code: |
      log.info("{}: key={}, value={}", prefix, key, value)

When calling this function from Python:

# The custom parameter is passed along with built-in ones
configurable_logger(key, value, prefix="DEBUG")

Parameter Definition Structure

When defining custom parameters:

parameters:
  - name: parameter_name   # Name of the parameter
    type: parameter_type   # Data type (string, int, double, etc.)

Important: The parameters property adds to the built-in parameters - it doesn't replace them. Built-in parameters like key and value are still available in your function code.

Function Types Overview

Below is a table with all 21 function types in KSML.

Function Type Purpose Used In
Functions for stateless operations
forEach Process each message for side effects peek
keyTransformer Convert a key to another type or value mapKey, selectKey, toStream, transformKey
keyValueToKeyValueListTransformer Convert key and value to a list of key/values flatMap, transformKeyValueToKeyValueList
keyValueToValueListTransformer Convert key and value to a list of values flatMapValues, transformKeyValueToValueList
keyValueTransformer Convert key and value to another key and value flatMapValues, transformKeyValueToValueList
predicate Return true/false based on message content filter, branch
valueTransformer Convert value to another type or value mapValue, mapValues, transformValue
Functions for stateful operations
aggregator Incrementally build aggregated results aggregate
initializer Provide initial values for aggregations aggregate
merger Merge two aggregation results into one aggregate
reducer Combine two values into one reduce
Special Purpose Functions
foreignKeyExtractor Extract a key from a join table's record join, leftJoin
generator Function used in producers to generate a message producer
generic Generic custom function
keyValueMapper Convert key and value into a single output value groupBy, join, leftJoin
keyValuePrinter Output key and value print
metadataTransformer Convert Kafka headers and timestamps transformMetadata
valueJoiner Combine data from multiple streams join, leftJoin, outerJoin
Stream Related Functions
streamPartitioner Determine which partition to send records to to
timestampExtractor Extract timestamps from messages stream, table, globalTable
topicNameExtractor Derive a target topic name from key and value toTopicNameExtractor

Functions for stateless operations

forEach

Processes each message for side effects like logging, without changing the message.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

None (the function is called for its side effects)

functions:
  extract_region_key:
    type: keyTransformer
    code: |
      if value is None: return "unknown"
      return value.get("region", "unknown")
    resultType: string

Full example for forEach:

keyTransformer

Transforms a key/value into a new key, which then gets combined with the original value as a new message on the output stream.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

New key for the output message

Example

Function Definition:

functions:
  extract_region_key:
    type: keyTransformer
    code: |
      if value is None: return "unknown"
      return value.get("region", "unknown")
    resultType: string

This function extracts the region from transaction data to use as the new message key, enabling region-based partitioning.

Complete Working Example:

Producer - keyTransformer example (click to expand)
functions:
  generate_region_data:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter
      counter += 1
      tid = f"txn_{counter:05d}"
      return tid, {
        "transaction_id": tid,
        "region": random.choice(["us-east", "us-west", "europe", "asia"]),
        "amount": round(random.uniform(10.0, 1000.0), 2),
        "customer_id": f"c{random.randint(1, 100):03d}",
        "timestamp": counter * 1000
      }
    resultType: (string, json)

producers:
  region_data_producer:
    generator: generate_region_data
    interval: 2s
    to:
      topic: transaction_events
      keyType: string
      valueType: json
Processor - keyTransformer example (click to expand)
streams:
  transaction_input:
    topic: transaction_events
    keyType: string
    valueType: json
  region_output:
    topic: transactions_by_region
    keyType: string
    valueType: json

functions:
  extract_region_key:
    type: keyTransformer
    code: |
      if value is None: return "unknown"
      return value.get("region", "unknown")
    resultType: string

  log_repartitioned:
    type: forEach
    code: |
      log.info("Repartitioned - Key: {}, Region: {}, Amount: ${}", 
               key, value.get("region"), value.get("amount"))

pipelines:
  region_repartitioning:
    from: transaction_input
    via:
      - type: mapKey
        mapper: extract_region_key
      - type: peek
        forEach: log_repartitioned
    to: region_output

Additional Example:

Full example for keyTransformer: Stream Table Join Tutorial

keyValueToKeyValueListTransformer

Takes one message and converts it into a list of output messages, which then get sent to the output stream. Unlike keyValueToValueListTransformer, this function can create new keys for each output message, enabling data reshaping and repartitioning.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

A list of key-value pairs [(key1, value1), (key2, value2), ...]

Example

  split_batch_orders:
    type: keyValueToKeyValueListTransformer
    code: |
      # Split batch orders into individual orders with unique keys
      # This transforms one batch record into multiple individual order records
      if value is None or "orders" not in value:
        return []

      batch_id = key
      orders = value.get("orders", [])
      individual_records = []

      for i, order in enumerate(orders):
        # Create unique key for each individual order
        order_key = f"{batch_id}_order_{i+1}"

        # Create individual order record
        order_value = {
          "order_id": order_key,
          "batch_id": batch_id,
          "product": order.get("product"),
          "quantity": order.get("quantity", 1),
          "customer_email": order.get("customer_email"),
          "processing_timestamp": value.get("timestamp")
        }

        individual_records.append((order_key, order_value))

      log.info("Split batch {} into {} individual orders", batch_id, len(individual_records))
      return individual_records
    resultType: list(tuple(string, json))

This example demonstrates splitting batch orders into individual orders with unique keys, useful for processing bulk data into individual records.

Producer - keyvaluetokeyvaluelisttransformer example (click to expand)
functions:
  generate_batch_orders:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter
      counter += 1
      bid = f"b{counter:03d}"
      prods = ["laptop", "phone", "tablet"]
      custs = ["alice@co", "bob@co", "charlie@co"]
      orders = [{"product": random.choice(prods), "qty": random.randint(1,5), 
                 "customer": random.choice(custs)} for _ in range(random.randint(2,4))]
      return bid, {"batch_id": bid, "orders": orders, "total": len(orders)}
    resultType: (string, json)

producers:
  batch_order_producer:
    generator: generate_batch_orders
    interval: 3s
    to:
      topic: batch_orders
      keyType: string
      valueType: json
Processor - keyvaluetokeyvaluelisttransformer example (click to expand)
streams:
  batch_orders_input:
    topic: batch_orders
    keyType: string
    valueType: json
  individual_orders_output:
    topic: individual_orders
    keyType: string
    valueType: json

functions:
  split_batch_orders:
    type: keyValueToKeyValueListTransformer
    code: |
      # Split batch orders into individual orders with unique keys
      # This transforms one batch record into multiple individual order records
      if value is None or "orders" not in value:
        return []

      batch_id = key
      orders = value.get("orders", [])
      individual_records = []

      for i, order in enumerate(orders):
        # Create unique key for each individual order
        order_key = f"{batch_id}_order_{i+1}"

        # Create individual order record
        order_value = {
          "order_id": order_key,
          "batch_id": batch_id,
          "product": order.get("product"),
          "quantity": order.get("quantity", 1),
          "customer_email": order.get("customer_email"),
          "processing_timestamp": value.get("timestamp")
        }

        individual_records.append((order_key, order_value))

      log.info("Split batch {} into {} individual orders", batch_id, len(individual_records))
      return individual_records
    resultType: list(tuple(string, json))

pipelines:
  split_batch_processing:
    from: batch_orders_input
    via:
      - type: transformKeyValueToKeyValueList
        mapper: split_batch_orders
    to: individual_orders_output

keyValueToValueListTransformer

Takes one message and converts it into a list of output values, which then get combined with the original key and sent to the output stream.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

A list of values [value1, value2, ...] that will be combined with the original key

Example

  explode_order_items:
    type: keyValueToValueListTransformer
    code: |
      # Split order into individual item records
      # Key remains the same (order_id), but each item becomes a separate value
      if value is None or "items" not in value:
        return []

      items = value.get("items", [])
      item_records = []

      for item in items:
        # Create individual item record with order context
        item_record = {
          "order_id": value.get("order_id"),
          "customer_id": value.get("customer_id"),
          "product": item.get("product"),
          "quantity": item.get("quantity"),
          "unit_price": item.get("price"),
          "total_price": item.get("price", 0) * item.get("quantity", 0),
          "order_total": value.get("order_total")
        }
        item_records.append(item_record)

      log.info("Exploded order {} into {} item records", 
               value.get("order_id"), len(item_records))

      return item_records
    resultType: list(json)
Producer - keyValueToValueListTransformer example (click to expand)
functions:
  generate_order_data:
    type: generator
    globalCode: |
      import random
      counter = 0
      products = ["laptop", "phone", "tablet", "headphones", "speaker"]
    code: |
      global counter, products

      # Generate order ID as key  
      order_id = f"order_{counter:03d}"
      counter += 1

      # Generate order with multiple items
      num_items = random.randint(2, 5)
      items = []

      for i in range(num_items):
        item = {
          "product": random.choice(products),
          "quantity": random.randint(1, 3),
          "price": round(random.uniform(50.0, 500.0), 2)
        }
        items.append(item)

      # Create order value with items array
      value = {
        "order_id": order_id,
        "customer_id": f"cust_{random.randint(1, 50):03d}",
        "items": items,
        "order_total": sum(item["price"] * item["quantity"] for item in items)
      }

    expression: (order_id, value)
    resultType: (string, json)

producers:
  order_producer:
    generator: generate_order_data
    interval: 3s
    to:
      topic: customer_orders
      keyType: string
      valueType: json
Processor - keyValueToValueListTransformer example (click to expand)
streams:
  orders_input:
    topic: customer_orders
    keyType: string
    valueType: json
  items_output:
    topic: individual_items
    keyType: string
    valueType: json

functions:
  explode_order_items:
    type: keyValueToValueListTransformer
    code: |
      # Split order into individual item records
      # Key remains the same (order_id), but each item becomes a separate value
      if value is None or "items" not in value:
        return []

      items = value.get("items", [])
      item_records = []

      for item in items:
        # Create individual item record with order context
        item_record = {
          "order_id": value.get("order_id"),
          "customer_id": value.get("customer_id"),
          "product": item.get("product"),
          "quantity": item.get("quantity"),
          "unit_price": item.get("price"),
          "total_price": item.get("price", 0) * item.get("quantity", 0),
          "order_total": value.get("order_total")
        }
        item_records.append(item_record)

      log.info("Exploded order {} into {} item records", 
               value.get("order_id"), len(item_records))

      return item_records
    resultType: list(json)

pipelines:
  explode_orders:
    from: orders_input
    via:
      - type: transformKeyValueToValueList
        mapper: explode_order_items
    to: items_output

keyValueTransformer

Takes one message and converts it into another message, which may have different key/value types.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

A tuple of (new_key, new_value)

Example

  create_external_request:
    type: keyValueTransformer
    code: |
      import time

      # Extract fields from JSON order event
      if not value:
        return None

      order_id = value.get("order_id")
      customer_id = value.get("customer_id")
      status = value.get("status")
      amount = value.get("amount")
      timestamp = value.get("timestamp")
      sequence_number = value.get("sequence_number")
      order_details = value.get("order_details", {})
      customer_info = value.get("customer_info", {})
      fulfillment = value.get("fulfillment", {})
      business_context = value.get("business_context", {})
      metadata = value.get("metadata", {})

      # Create comprehensive request for external payment processing system
      request_id = f"REQ_{order_id}_{timestamp}"

      external_request = {
        "request_id": request_id,
        "request_type": "PAYMENT_PROCESSING",
        "original_order": {
          "order_id": order_id,
          "customer_id": customer_id,
          "amount": amount,
          "timestamp": timestamp,
          "sequence_number": sequence_number
        },
        "payment_details": {
          "amount": amount,
          "currency": order_details.get("currency", "USD"),
          "payment_method": order_details.get("payment_method"),
          "customer_tier": customer_info.get("customer_tier"),
          "loyalty_points": customer_info.get("loyalty_points")
        },
        "processing_context": {
          "priority": business_context.get("priority", "normal"),
          "high_value": business_context.get("high_value", False),
          "customer_previous_orders": customer_info.get("previous_orders", 0),
          "order_source": order_details.get("order_source")
        },
        "async_metadata": {
          "correlation_id": metadata.get("correlation_id", f"corr_{request_id}"),
          "created_at": int(time.time() * 1000),
          "timeout_ms": 30000,  # 30 second timeout
          "retry_count": 0,
          "expected_response_topic": "external_responses"
        },
        "external_system_info": {
          "target_system": "payment_processor",
          "api_version": "v2.1",
          "request_format": "async_json",
          "callback_required": True
        }
      }

      log.info("Created external payment request for order {}: amount=${:.2f}, priority={}", 
               order_id, amount, business_context.get("priority", "normal"))

      return (request_id, external_request)

    expression: result
    resultType: (string, json)

Full example for keyValueTransformer:

predicate

Returns true or false based on message content. Used for filtering and branching operations.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

Boolean (true or false)

Example

  is_critical_sensor:
    type: predicate
    code: |
      # Check location
      if value.get('sensors', {}).get('location') not in ['server_room', 'data_center']:
        return False

      # Check temperature threshold based on location
      if value.get('sensors', {}).get('location') == 'server_room' and value.get('sensors', {}).get('temperature') > 20:
        return True
      if value.get('sensors', {}).get('location') == 'data_center' and value.get('sensors', {}).get('temperature') > 30:
        return True

      return False

Full example for predicate:

valueTransformer

Transforms a key/value into a new value, which is combined with the original key and sent to the output stream.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

New value for the output message

Example

  convert_temperature:
    type: valueTransformer
    code: |
      result = {
        "device_id": value.get('device_id'),
        "temperature_c": round((value.get('temperature') - 32) * 5/9, 2) if value.get('temperature') else None,
        "humidity": value.get('humidity'),
        "timestamp": value.get('timestamp')
      }
    expression: result
    resultType: json

Full example for valueTransformer:

Functions for stateful operations

aggregator

Incrementally builds aggregated results from multiple messages.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed
aggregatedValue Any The current aggregated value (can be None)

Return Value

New aggregated value

Example

  update_stats:
    type: aggregator
    code: |
      # Update payment statistics
      if value and aggregatedValue:
        amount = value.get("amount", 0.0)
        aggregatedValue["count"] = aggregatedValue.get("count", 0) + 1
        aggregatedValue["total_amount"] = aggregatedValue.get("total_amount", 0.0) + amount

        current_min = aggregatedValue.get("min_amount")
        current_max = aggregatedValue.get("max_amount")

        if current_min is None or amount < current_min:
          aggregatedValue["min_amount"] = amount
        if current_max is None or amount > current_max:
          aggregatedValue["max_amount"] = amount

        aggregatedValue["average_amount"] = round(aggregatedValue["total_amount"] / aggregatedValue["count"], 2)
    expression: aggregatedValue
    resultType: json

Full example for aggregator:

initializer

Provides initial values for aggregations.

Parameters

None

Return Value

Initial value for aggregation

Example

  init_stats:
    type: initializer
    code: |
      # Initialize statistics
      stats = {
        "count": 0,
        "total_amount": 0.0,
        "min_amount": None,
        "max_amount": None
      }
    expression: stats
    resultType: json

Full example for initializer:

merger

Merges two aggregation results into one. Used in aggregation operations to combine partial results.

Parameters

Parameter Type Description
key Any The key of the record being processed
value1 Any The value of the first aggregation
value2 Any The value of the second aggregation

Return Value

The merged aggregation result

Example

  merge_counts:
    type: merger
    code: |
      # This function is called when two session windows need to be merged
      # For example, when a late-arriving event connects two previously separate sessions

      count1 = value1 or 0
      count2 = value2 or 0
      merged_total = count1 + count2

      log.info("Merging sessions: {} + {} = {}", count1, count2, merged_total)
      return merged_total
    resultType: int
Producer - merger example (click to expand)
functions:
  generate_user_activity:
    type: generator
    globalCode: |
      import random, time
      counter = 0
      last_act = {}
      base_t = int(time.time() * 1000)
    code: |
      global counter, last_act, base_t
      uid = random.choice(["alice", "bob", "charlie"])
      t = base_t + (counter * 2000)
      if uid in last_act and random.random() < 0.15:
        t = last_act[uid] + 660000  # 11min gap
      last_act[uid] = t
      counter += 1
      return uid, {
        "user_id": uid,
        "page": random.choice(["home", "products", "profile"]),
        "timestamp": t,
        "event_type": "page_view"
      }
    resultType: (string, json)

producers:
  activity_producer:
    generator: generate_user_activity
    interval: 2s
    to:
      topic: user_activity
      keyType: string
      valueType: json
Processor - merger example (click to expand)
streams:
  user_activity_input:
    topic: user_activity
    keyType: string
    valueType: json

functions:
  # Initialize session counter
  init_counter:
    type: initializer
    expression: 0
    resultType: int

  # Count events in session
  count_events:
    type: aggregator
    code: |
      # Increment the counter for each event
      return (aggregatedValue or 0) + 1
    resultType: int

  # Merge session counters when sessions are combined
  merge_counts:
    type: merger
    code: |
      # This function is called when two session windows need to be merged
      # For example, when a late-arriving event connects two previously separate sessions

      count1 = value1 or 0
      count2 = value2 or 0
      merged_total = count1 + count2

      log.info("Merging sessions: {} + {} = {}", count1, count2, merged_total)
      return merged_total
    resultType: int

pipelines:
  session_counting:
    from: user_activity_input
    via:
      - type: groupByKey
      - type: windowBySession
        inactivityGap: 10m  # Session ends after 10 minutes of inactivity
        grace: 2m          # Allow 2 minutes for late events
      - type: aggregate
        initializer: init_counter
        aggregator: count_events
        merger: merge_counts  # Required for session windows - merges overlapping sessions
        store:
          type: session
          retention: 1h
      - type: toStream
      - type: peek
        forEach:
          code: |
            log.info("User {} session: {} events", key, value)
      # Convert windowed key to string for output serialization
      - type: mapKey
        mapper:
          code: |
            # Convert windowed key to readable string format
            # The windowed key is a dictionary with 'key', 'start', 'end' fields
            return f"user_{key['key']}_window_{key['start']}_{key['end']}"
          resultType: string
    to:
      topic: session_counts
      keyType: string  # Now using simple string keys after transformation
      valueType: int

The merger function is specifically designed for session window aggregations where late-arriving events can merge previously separate sessions. This example demonstrates user activity tracking with session-based counting.

What the example does:

Simulates user activity tracking with session windows and merging:

  • Groups events into 10-min inactivity sessions
  • Counts events per user
  • Merges sessions when late events connect them
  • Producer simulates gaps to trigger merging

Key Features:

  • Automatic session windowing & late data handling
  • Type-safe merger (integers)
  • Windowed key transformation for output
  • Merge logic adds event counts

Expected Results:

When running this example, you'll see log messages like:

  • "Merging sessions: 3 + 2 = 5" - Shows the merger function combining session counts
  • "User alice session: 4 events" - Displays final session results after merging
  • Session windows spanning different time periods for each user

reducer

Combines two values into one.

Parameters

Parameter Type Description
value1 Any The first value to combine
value2 Any The second value to combine

Return Value

Combined value

Example

  sum_amounts:
    type: reducer
    code: |
      # Sum two transaction amounts (in cents)
      total = value1 + value2
    expression: total
    resultType: long

Full example for reducer:

Special Purpose Functions

foreignKeyExtractor

Extracts a key from a join table's record. Used during join operations to determine which records to join.

Parameters

Parameter Type Description
value Any The value of the record to get a key from

Return Value

The key to look up in the table being joined with

Example

  extract_customer_id:
    type: foreignKeyExtractor
    code: |
      # Extract the foreign key (customer_id) from the order value
      # This key will be used to look up the customer in the customers table
      if value is None:
        return None

      customer_id = value.get("customer_id")
      log.debug("Extracting customer_id: {} from order", customer_id)
      return customer_id
    resultType: string
Producer - foreignKeyExtractor example (click to expand)
functions:
  generate_orders:
    type: generator
    globalCode: |
      import random, time
      order_counter = 1
    code: |
      global order_counter
      oid = f"ord_{order_counter:03d}"
      order_counter += 1
      return oid, {
        "order_id": oid,
        "customer_id": random.choice(["c001", "c002", "c003"]),
        "product": random.choice(["laptop", "phone"]),
        "amount": random.randint(100, 1000),
        "timestamp": int(time.time() * 1000)
      }
    resultType: (string, json)

  generate_customers:
    type: generator
    globalCode: |
      import time, random
      custs = [
        {"id": "c001", "name": "Alice", "tier": "gold"},
        {"id": "c002", "name": "Bob", "tier": "silver"},  
        {"id": "c003", "name": "Charlie", "tier": "bronze"}
      ]
    code: |
      global custs
      c = random.choice(custs)
      return c["id"], {
        "customer_id": c["id"],
        "name": c["name"],
        "tier": c["tier"],
        "created_at": int(time.time() * 1000)
      }
    resultType: (string, json)

producers:
  order_producer:
    generator: generate_orders
    interval: 3s
    to:
      topic: orders
      keyType: string
      valueType: json

  customer_producer:
    generator: generate_customers
    interval: 5s  # Create customers periodically
    to:
      topic: customers
      keyType: string
      valueType: json
Processor - foreignKeyExtractor example (click to expand)
streams:
  orders_input:
    topic: orders
    keyType: string
    valueType: json

tables:
  customers_table:
    topic: customers
    keyType: string
    valueType: json

functions:
  # Extract customer_id from order value to join with customers table
  extract_customer_id:
    type: foreignKeyExtractor
    code: |
      # Extract the foreign key (customer_id) from the order value
      # This key will be used to look up the customer in the customers table
      if value is None:
        return None

      customer_id = value.get("customer_id")
      log.debug("Extracting customer_id: {} from order", customer_id)
      return customer_id
    resultType: string

  # Join order with customer data
  join_order_customer:
    type: valueJoiner
    code: |
      # value1 is the order, value2 is the customer
      order = value1
      customer = value2

      if order is None:
        return None

      # Create enriched order with customer information
      enriched_order = {
        "order_id": order.get("order_id"),
        "product": order.get("product"),
        "amount": order.get("amount"),
        "timestamp": order.get("timestamp"),
        "customer": {
          "customer_id": order.get("customer_id"),
          "name": customer.get("name", "Unknown") if customer else "Unknown",
          "email": customer.get("email", "Unknown") if customer else "Unknown",
          "tier": customer.get("tier", "Unknown") if customer else "Unknown"
        }
      }

      log.info("Joined order {} with customer {}", 
               order.get("order_id"), 
               customer.get("name") if customer else "Unknown")

      return enriched_order
    resultType: json

pipelines:
  enrich_orders:
    from: orders_input
    via:
      # Join orders table with customers table using foreignKeyExtractor
      - type: join
        table: customers_table
        foreignKeyExtractor: extract_customer_id  # Extracts customer_id from order
        valueJoiner: join_order_customer
      - type: peek
        forEach:
          code: |
            log.info("Enriched order: {} for customer: {}", 
                     value.get("order_id"), 
                     value.get("customer", {}).get("name"))
    to:
      topic: enriched_orders
      keyType: string  # Still keyed by order_id
      valueType: json

The foreignKeyExtractor enables table joins where the join key is embedded within the record value rather than being the record key. This example demonstrates order enrichment by joining with customer data using a foreign key relationship.

Example

This example simulates an e-commerce system enriching orders with customer data:

  • Orders keyed by order_id, referencing customer_id
  • Customer details looked up by customer_id
  • Foreign key extracted from orders
  • Orders joined with customers to produce enriched records

Key Features:

  • Foreign key join pattern
  • KSML table join with key extraction
  • Data enrichment from multiple sources
  • Preserves original order keys

Expected Results:

When running this example, you'll see log messages like:

  • "Joined order order_001 with customer Alice Johnson" - Shows successful order-customer joins
  • "Enriched order: order_003 for customer: Bob Smith" - Displays enriched results with customer names
  • Orders enriched with customer tier, email, and name information

Use Cases:

This pattern is commonly used for:

  • Order enrichment with customer details
  • Transaction enrichment with account information
  • Event enrichment with user profiles
  • Any scenario where records contain foreign key references

generator

Function used in producers to generate messages. It takes no input parameters and produces key-value pairs.

Parameters

None

Return Value

A tuple of (key, value) representing the generated message

Example

  generate_tutorial_data:
    type: generator
    globalCode: |
      import random
      sensor_id = 0
      locations = ["server_room", "warehouse", "data_center"]
    code: |
      global sensor_id, locations
      key = "sensor" + str(sensor_id)
      sensor_id = (sensor_id + 1) % 5
      location = random.choice(locations)
      sensors = {"temperature": random.randrange(150), "humidity": random.randrange(90), "location": location}
      value = {"sensors": sensors}
    expression: (key, value)
    resultType: (string, json)

Full example for generator:

keyValueMapper

Transforms both the key and value of a record.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

Tuple of (new_key, new_value)

Example

  extract_product_id:
    type: keyValueMapper
    code: |
      # Map from order (key, value) to product_id for join
      product_id = value.get("product_id") if value else None
    expression: product_id
    resultType: string

Full example for keyValueMapper:

keyValuePrinter

Converts a message to a string for output to a file or stdout.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed

Return Value

String to be written to file or stdout

Example

The keyValuePrinter formats records for human-readable output to stdout or files. This example shows converting sales data into formatted reports for monitoring and debugging.

Producer - keyValuePrinter example (click to expand)
functions:
  generate_sales_data:
    type: generator
    globalCode: |
      import random
      counter = 1
      products = ["laptop", "mouse", "keyboard", "monitor", "webcam"]
      customers = ["alice", "bob", "charlie", "diana"]

    code: |
      global counter, products, customers

      sale_id = f"sale_{counter:03d}"
      counter += 1

      sale_data = {
        "sale_id": sale_id,
        "product": random.choice(products),
        "customer": random.choice(customers),
        "amount": round(random.uniform(19.99, 299.99), 2),
        "quantity": random.randint(1, 3),
        "region": random.choice(["north", "south", "east", "west"])
      }

    expression: (sale_id, sale_data)
    resultType: (string, json)

producers:
  sales_producer:
    generator: generate_sales_data
    interval: 3s
    to:
      topic: sales_data
      keyType: string
      valueType: json
Processor - keyValuePrinter example (click to expand)
streams:
  sales_input:
    topic: sales_data
    keyType: string
    valueType: json

functions:
  format_sales_report:
    type: keyValuePrinter
    code: |
      if value is None:
        return f"ERROR: Sale {key} has no data"

      # Extract sale information
      product = value.get("product", "Unknown")
      customer = value.get("customer", "Unknown")
      amount = value.get("amount", 0)
      quantity = value.get("quantity", 0)
      region = value.get("region", "Unknown")

      # Create formatted sales report
      return f"SALE REPORT | ID: {key} | Customer: {customer} | Product: {product} | Qty: {quantity} | Amount: ${amount:.2f} | Region: {region}"
    resultType: string

pipelines:
  print_sales:
    from: sales_input
    print:
      mapper: format_sales_report

What the example does:

Demonstrates formatted business reporting with KSML:

  • Sales Data Processing: Converts raw sales records into reports
  • Custom Formatting: Transforms JSON into readable output
  • Print Operation: Outputs formatted data via print
  • Real-time Monitoring: Enables instant visibility of transactions

Key Features:

  • Python string formatting
  • Null/error handling
  • keyValuePrinter function
  • Field extraction from JSON

Expected Results:

When running this example, you'll see formatted output like:

  • SALE REPORT | ID: sale_001 | Customer: alice | Product: laptop | Qty: 2 | Amount: $1299.99 | Region: north
  • SALE REPORT | ID: sale_002 | Customer: bob | Product: mouse | Qty: 1 | Amount: $29.99 | Region: south

metadataTransformer

Transforms a message's metadata (headers and timestamp).

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed
metadata Object Contains the headers and timestamp of the message

Return Value

Modified metadata for the output message

Example

  enrich_event_metadata:
    type: metadataTransformer
    code: |
      import time

      # Get processing timestamp
      process_time = int(time.time() * 1000)

      # Determine event severity based on status code
      status_code = value.get("status_code", 200) if value else 200
      severity = "critical" if status_code >= 500 else "warning" if status_code >= 400 else "info"

      # Add processing headers
      new_headers = [
        {"key": "processed_timestamp", "value": str(process_time)},
        {"key": "event_severity", "value": severity},
        {"key": "processor_id", "value": "ksml-metadata-enricher"}
      ]

      # Preserve existing headers and add new ones
      existing_headers = metadata.get("headers", [])
      metadata["headers"] = existing_headers + new_headers

      log.info("Enriched event {} with {} additional headers", key, len(new_headers))
      return metadata
Producer - metadataTransformer example (click to expand)
functions:
  generate_api_events:
    type: generator
    globalCode: |
      import random
      counter = 0
      endpoints = ["/api/users", "/api/orders", "/api/products", "/api/health"]
      methods = ["GET", "POST", "PUT", "DELETE"]

    code: |
      global counter, endpoints, methods

      # Generate event ID as key
      event_id = f"evt_{counter:04d}"
      counter += 1

      # Generate API event data
      event_data = {
        "event_id": event_id,
        "endpoint": random.choice(endpoints),
        "method": random.choice(methods),
        "user_id": f"user_{random.randint(1, 50):03d}",
        "status_code": random.choices([200, 201, 400, 404, 500], weights=[60, 15, 15, 5, 5])[0],
        "response_time_ms": random.randint(50, 1500),
        "timestamp": "auto"
      }

    expression: (event_id, event_data)
    resultType: (string, json)

producers:
  api_event_producer:
    generator: generate_api_events
    interval: 2s
    to:
      topic: api_events
      keyType: string
      valueType: json
Processor - metadataTransformer example (click to expand)
streams:
  api_events_input:
    topic: api_events
    keyType: string
    valueType: json

functions:
  enrich_event_metadata:
    type: metadataTransformer
    code: |
      import time

      # Get processing timestamp
      process_time = int(time.time() * 1000)

      # Determine event severity based on status code
      status_code = value.get("status_code", 200) if value else 200
      severity = "critical" if status_code >= 500 else "warning" if status_code >= 400 else "info"

      # Add processing headers
      new_headers = [
        {"key": "processed_timestamp", "value": str(process_time)},
        {"key": "event_severity", "value": severity},
        {"key": "processor_id", "value": "ksml-metadata-enricher"}
      ]

      # Preserve existing headers and add new ones
      existing_headers = metadata.get("headers", [])
      metadata["headers"] = existing_headers + new_headers

      log.info("Enriched event {} with {} additional headers", key, len(new_headers))
      return metadata

pipelines:
  enrich_api_events:
    from: api_events_input
    via:
      - type: transformMetadata
        mapper: enrich_event_metadata
      - type: peek
        forEach:
          code: |
            log.info("API Event: {} {} - Status: {} - Response Time: {}ms", 
                     value.get("method"), value.get("endpoint"),
                     value.get("status_code"), value.get("response_time_ms"))
    print:
      mapper:
        code: |
          method = value.get('method', 'UNKNOWN')
          endpoint = value.get('endpoint', '/unknown')
          status = value.get('status_code', 0)
          return f"ENRICHED EVENT | {key} | {method} {endpoint} | Status: {status} | Headers processed"
        resultType: string

This example:

  • Shows metadata enrichment in stream processing.

  • Generates realistic API events (endpoints + status codes)

  • Enriches headers with timestamps, severity, processor ID
  • Classifies events (critical/warning/info) by status code
  • Logs enrichment for monitoring/debugging
  • Uses Python time for timestamps
  • Has conditional + extensible headers
  • Uses transformMetadata with mapper param
  • Output is formatted via print

Expected Results:

When running this example, you'll see enriched events with additional headers:

  • ENRICHED EVENT | evt_0001 | POST /api/users | Status: 200 | Headers processed
  • ENRICHED EVENT | evt_0002 | DELETE /api/health | Status: 400 | Headers processed
  • Log messages showing: "Enriched event evt_0001 with 3 additional headers"

streamPartitioner

Determines which partition a record should be sent to when writing to a Kafka topic. This allows custom partitioning logic based on record content, ensuring related records go to the same partition for ordered processing.

Parameters

Parameter Type Description
topic String The name of the topic the record is being sent to
key Any The key of the record being partitioned
value Any The value of the record being partitioned
numPartitions Integer The total number of partitions available in the target topic

Return Value

An integer representing the partition number (0 to numPartitions-1) where the record should be sent

Example

Note:

To test this streamPartitioner example, ensure your topics have sufficient partitions. The example requires minimum 9 partitions since it routes to partitions 0-8. Update your docker-compose.yml:

kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 9 --replication-factor 1 --topic order_events
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 9 --replication-factor 1 --topic partitioned_orders
functions:
  # StreamPartitioner function determines which partition to send each record to
  # Returns an integer representing the target partition number
  priority_region_partitioner:
    type: streamPartitioner
    code: |
      # Custom partitioning logic based on priority and region
      # This ensures orders with same priority+region go to same partition
      # for ordered processing and improved locality

      if value:
        priority = value.get("priority", "standard")
        region = value.get("region", "UNKNOWN")

        # Map priority to a base partition range
        priority_map = {
          "express": 0,    # Express gets partitions 0-2
          "standard": 3,   # Standard gets partitions 3-5
          "economy": 6     # Economy gets partitions 6-8
        }

        # Map region to offset within priority range
        region_map = {
          "NORTH": 0,
          "SOUTH": 1,
          "EAST": 2,
          "WEST": 0,
          "CENTRAL": 1
        }

        base_partition = priority_map.get(priority, 3)
        region_offset = region_map.get(region, 0)

        # Calculate target partition (assuming 9 partitions total)
        partition = (base_partition + region_offset) % 9

        log.debug("Routing order {} to partition {}: priority={}, region={}", 
                  value.get("order_id"), partition, priority, region)

        return partition

      # Default to partition 0 if no value
      return 0
Producer - streamPartitioner example (click to expand)
functions:
  generate_order_event:
    type: generator
    globalCode: |
      import time
      import random
      order_id = 0
      regions = ["NORTH", "SOUTH", "EAST", "WEST", "CENTRAL"]
      priorities = ["express", "standard", "economy"]
    code: |
      global order_id

      order_id += 1

      # Generate order details with priority and region
      order = {
        "order_id": f"ORD-{order_id:04d}",
        "amount": round(random.uniform(10.0, 1000.0), 2),
        "priority": random.choice(priorities),
        "region": random.choice(regions),
        "customer_id": f"CUST-{random.randint(1000, 9999)}",
        "timestamp": int(time.time() * 1000)
      }

      # Log the order generation
      log.info("Generated order: {} with priority={}, region={}", 
               order["order_id"], order["priority"], order["region"])

      # Use order_id as key, order details as value
      key = order["order_id"]
      value = order
    expression: (key, value)
    resultType: (string, json)

producers:
  order_events_producer:
    generator: generate_order_event
    interval: 2s
    to:
      topic: order_events
      keyType: string
      valueType: json
Processor - streamPartitioner example (click to expand)
streams:
  order_events:
    topic: order_events
    keyType: string
    valueType: json
    offsetResetPolicy: earliest

  partitioned_orders:
    topic: partitioned_orders
    keyType: string
    valueType: json

functions:
  # StreamPartitioner function determines which partition to send each record to
  # Returns an integer representing the target partition number
  priority_region_partitioner:
    type: streamPartitioner
    code: |
      # Custom partitioning logic based on priority and region
      # This ensures orders with same priority+region go to same partition
      # for ordered processing and improved locality

      if value:
        priority = value.get("priority", "standard")
        region = value.get("region", "UNKNOWN")

        # Map priority to a base partition range
        priority_map = {
          "express": 0,    # Express gets partitions 0-2
          "standard": 3,   # Standard gets partitions 3-5
          "economy": 6     # Economy gets partitions 6-8
        }

        # Map region to offset within priority range
        region_map = {
          "NORTH": 0,
          "SOUTH": 1,
          "EAST": 2,
          "WEST": 0,
          "CENTRAL": 1
        }

        base_partition = priority_map.get(priority, 3)
        region_offset = region_map.get(region, 0)

        # Calculate target partition (assuming 9 partitions total)
        partition = (base_partition + region_offset) % 9

        log.debug("Routing order {} to partition {}: priority={}, region={}", 
                  value.get("order_id"), partition, priority, region)

        return partition

      # Default to partition 0 if no value
      return 0
    resultType: integer

pipelines:
  partition_orders:
    from: order_events
    via:
      # Transform to add routing metadata
      - type: transformValue
        mapper:
          code: |
            # Add partition routing info to the order
            if value:
              # Calculate which partition this will go to
              priority = value.get("priority", "standard")
              region = value.get("region", "UNKNOWN")
              value["routing_info"] = f"Priority: {priority}, Region: {region}"
            result = value
          expression: result
          resultType: json

      # Peek to log the routing decision
      - type: peek
        forEach:
          code: |
            if value:
              log.info("Processing order {}: {} -> will be partitioned by priority/region", 
                       key, value.get("routing_info", "unknown"))

    # Use the streamPartitioner function when writing to output topic
    to:
      topic: partitioned_orders
      keyType: string
      valueType: json
      partitioner: priority_region_partitioner

The streamPartitioner function provides custom control over how records are distributed across topic partitions. This example demonstrates intelligent order routing based on business priorities and geographic regions.

What the example does:

Implements a sophisticated partitioning strategy for order processing:

  • Routes orders to specific partitions based on priority (express/standard/economy)
  • Further segments by geographic region within each priority tier
  • Ensures orders with same priority+region go to same partition
  • Producer generates realistic order events with various priorities/regions

Key Features:

  • Custom partition calculation based on multiple fields
  • Guaranteed ordering for related records (same priority+region)
  • Improved data locality and processing efficiency
  • Explicit partition count handling (9 partitions total)
  • Fallback to partition 0 for edge cases

Expected Results:

When running this example, you'll see log messages like:

  • "Generated order: ORD-0001 with priority=economy, region=CENTRAL" - Producer creating orders
  • "Processing order ORD-0001: Priority: economy, Region: CENTRAL -> will be partitioned by priority/region" - Routing decision
  • Express orders (priority=express) go to partitions 0-2
  • Standard orders (priority=standard) go to partitions 3-5
  • Economy orders (priority=economy) go to partitions 6-8
  • Within each priority range, regions determine the exact partition

valueJoiner

Combines values from two streams or tables during join operations, creating enriched records that contain data from both sources. The function must handle cases where one or both values might be null, depending on the join type (inner, left, outer). This function has access to the join key for context-aware value combination.

Parameters

Parameter Type Description
key Any The join key used to match records
value1 Any The value from the first stream/table
value2 Any The value from the second stream/table

Return Value

Combined value

Example

  join_order_with_product:
    type: valueJoiner
    code: |
      # Combine order and product information
      result = {}

      # Add order details
      if value1 is not None:
        result.update(value1)

      # Add product details
      if value2 is not None:
        result["product_details"] = value2
        # Calculate total price
        quantity = value1.get("quantity", 0) if value1 else 0
        price = value2.get("price", 0) if value2 else 0
        result["total_price"] = quantity * price

      new_value = result
    expression: new_value
    resultType: json

Full example for valueJoiner:

timestampExtractor

Extracts timestamps from messages for time-based operations.

Parameters

Parameter Type Description
record Object The ConsumerRecord containing key, value, timestamp, and metadata
previousTimestamp Long The previous timestamp (can be used as fallback)

Return Value

Timestamp in milliseconds (long)

Example

    timestampExtractor:
      code: |
        # Extract custom timestamp from message value
        try:
          # Try different ways to get the value data
          value_data = None

          # Method 1: If record is a ConsumerRecord with .value() method
          if hasattr(record, 'value') and callable(record.value):
            value_data = record.value()
          # Method 2: If record is the value directly (dict)
          elif isinstance(record, dict):
            value_data = record

          # Extract custom timestamp from event data
          if value_data and "event_timestamp" in value_data:
            event_time = value_data.get("event_timestamp")
            if event_time and event_time > 0:
              log.info("Using event timestamp: {} for {}", event_time, value_data.get("event_id"))
              return event_time

        except Exception as e:
          log.warn("Error extracting custom timestamp: {}", str(e))

        # Fallback to record timestamp or current time
        import time
        current_time = int(time.time() * 1000)
        log.info("Using current time as fallback: {}", current_time)
        return current_time
Producer - timestampExtractor example (click to expand)
functions:
  generate_events_with_timestamps:
    type: generator
    globalCode: |
      import random, time
      counter = 0
      base_t = int(time.time() * 1000)
    code: |
      global counter, base_t
      counter += 1
      eid = f"e{counter:04d}"
      offset = random.randint(-300, 60)  # -5min to +1min
      return eid, {
        "event_id": eid,
        "event_timestamp": base_t + (counter * 1000) + (offset * 1000),
        "event_type": random.choice(["action", "system", "data"]),
        "user_id": f"u{random.randint(1, 100):03d}",
        "delay": abs(offset) if offset < 0 else 0
      }
    resultType: (string, json)

producers:
  event_producer:
    generator: generate_events_with_timestamps
    interval: 1s
    to:
      topic: timestamped_events
      keyType: string
      valueType: json
Processor - timestampExtractor example (click to expand)
streams:
  events_input:
    topic: timestamped_events
    keyType: string
    valueType: json
    # Configure custom timestamp extraction
    timestampExtractor:
      code: |
        # Extract custom timestamp from message value
        try:
          # Try different ways to get the value data
          value_data = None

          # Method 1: If record is a ConsumerRecord with .value() method
          if hasattr(record, 'value') and callable(record.value):
            value_data = record.value()
          # Method 2: If record is the value directly (dict)
          elif isinstance(record, dict):
            value_data = record

          # Extract custom timestamp from event data
          if value_data and "event_timestamp" in value_data:
            event_time = value_data.get("event_timestamp")
            if event_time and event_time > 0:
              log.info("Using event timestamp: {} for {}", event_time, value_data.get("event_id"))
              return event_time

        except Exception as e:
          log.warn("Error extracting custom timestamp: {}", str(e))

        # Fallback to record timestamp or current time
        import time
        current_time = int(time.time() * 1000)
        log.info("Using current time as fallback: {}", current_time)
        return current_time

  ordered_events:
    topic: time_ordered_events
    keyType: string
    valueType: json

functions:
  log_timestamp_info:
    type: forEach
    code: |
      event_time = value.get("event_timestamp") if value else 0
      delay = value.get("processing_delay", 0) if value else 0

      log.info("Event processed in time order: {} (event_time={}, delay={}s)", 
               key, event_time, delay)

pipelines:
  process_with_event_time:
    from: events_input
    via:
      - type: peek
        forEach: log_timestamp_info
    to: ordered_events

What the example does:

Demonstrates custom timestamp extraction for event-time processing:

  • Creates events with custom timestamps that simulate out-of-order and delayed processing scenarios
  • Extracts event-time timestamps from message content rather than using record timestamps
  • Processes events based on their event time rather than arrival time
  • Provides robust fallback mechanisms for missing or invalid timestamps
  • Uses custom timestampExtractor function in stream definition
  • Uses Python time manipulation and timestamp handling
  • Support for both ConsumerRecord and direct value access patterns

Expected Results:

When running this example, you'll see events processed in time order:

  • Event processed in time order: event_0001 (event_time=1755974335641, delay=155s)
  • Event processed in time order: event_0002 (event_time=1755974539885, delay=41s)
  • Log messages showing: "Using event timestamp: 1755974601885 for event_0015"

topicNameExtractor

Dynamically determines the target topic for message routing based on record content. This enables intelligent message distribution, multi-tenancy support, and content-based routing patterns without requiring separate processing pipelines. The function has access to record context for advanced routing decisions.

Parameters

Parameter Type Description
key Any The key of the record being processed
value Any The value of the record being processed
recordContext Object Record metadata and processing context

Return Value

String representing the topic name to send the message to

Example

  route_by_sensor_type:
    type: topicNameExtractor
    code: |
      if value is None:
        return "unknown_sensor_data"

      sensor_type = value.get("sensor_type", "unknown")
      alert_level = value.get("alert_level", "normal")

      # Route critical alerts to dedicated topic regardless of sensor type
      if alert_level == "critical":
        log.warn("Critical alert from sensor {}: {} reading = {}", 
                 value.get("sensor_id"), sensor_type, value.get("reading"))
        return "critical_sensor_alerts"

      # Route by sensor type for normal and warning levels
      if sensor_type == "temperature":
        return "temperature_sensors"
      elif sensor_type == "humidity":
        return "humidity_sensors" 
      elif sensor_type == "pressure":
        return "pressure_sensors"
      else:
        return "unknown_sensor_data"
Producer - topicNameExtractor example (click to expand)
functions:
  generate_sensor_data:
    type: generator
    globalCode: |
      import random
      counter = 0
      sensor_types = ["temperature", "humidity", "pressure"]
      locations = ["factory_a", "factory_b", "warehouse"]
    code: |
      global counter, sensor_types, locations

      # Generate sensor ID as key
      sensor_id = f"sensor_{counter % 10:02d}"
      counter += 1

      # Generate sensor reading
      sensor_type = random.choice(sensor_types)

      value = {
        "sensor_id": sensor_id,
        "sensor_type": sensor_type,
        "location": random.choice(locations),
        "reading": round(random.uniform(10.0, 100.0), 2),
        "timestamp": counter * 1000,
        "alert_level": random.choices(["normal", "warning", "critical"], [80, 15, 5])[0]
      }

    expression: (sensor_id, value)
    resultType: (string, json)

producers:
  sensor_producer:
    generator: generate_sensor_data
    interval: 2s
    to:
      topic: mixed_sensor_data
      keyType: string
      valueType: json
Processor - topicNameExtractor example (click to expand)
streams:
  mixed_sensors:
    topic: mixed_sensor_data
    keyType: string
    valueType: json
  temperature_data:
    topic: temperature_sensors
    keyType: string
    valueType: json
  humidity_data:
    topic: humidity_sensors
    keyType: string
    valueType: json
  pressure_data:
    topic: pressure_sensors
    keyType: string
    valueType: json
  critical_alerts:
    topic: critical_sensor_alerts
    keyType: string
    valueType: json

functions:
  route_by_sensor_type:
    type: topicNameExtractor
    code: |
      if value is None:
        return "unknown_sensor_data"

      sensor_type = value.get("sensor_type", "unknown")
      alert_level = value.get("alert_level", "normal")

      # Route critical alerts to dedicated topic regardless of sensor type
      if alert_level == "critical":
        log.warn("Critical alert from sensor {}: {} reading = {}", 
                 value.get("sensor_id"), sensor_type, value.get("reading"))
        return "critical_sensor_alerts"

      # Route by sensor type for normal and warning levels
      if sensor_type == "temperature":
        return "temperature_sensors"
      elif sensor_type == "humidity":
        return "humidity_sensors" 
      elif sensor_type == "pressure":
        return "pressure_sensors"
      else:
        return "unknown_sensor_data"

  log_routing:
    type: forEach
    code: |
      log.info("Sensor data: {} type={} level={} reading={}", 
               key, value.get("sensor_type"), 
               value.get("alert_level"), value.get("reading"))

pipelines:
  route_sensor_data:
    from: mixed_sensors
    via:
      - type: peek
        forEach: log_routing
    toTopicNameExtractor:
      topicNameExtractor: route_by_sensor_type

What the example does:

Demonstrates dynamic topic routing based on message content:

  • Creates mixed sensor data (temperature, humidity, pressure) with varying alert levels
  • Routes messages to different topics based on sensor type and priority
  • Prioritizes critical alerts to a dedicated topic regardless of sensor type
  • Distributes normal/warning messages to type-specific topics

Key Technical Features:

  • topicNameExtractor function for dynamic topic selection
  • Priority-based routing with alert level evaluation
  • Fallback topic handling for unknown sensor types
  • toTopicNameExtractor operation instead of static to operation
  • Integration with logging for routing visibility

Expected Results:

When running this example, you'll see messages routed to different topics:

  • Critical alerts: "Critical alert from sensor sensor_05: pressure reading = 78.26"critical_sensor_alerts topic
  • Normal temperature readings → temperature_sensors topic
  • Normal humidity readings → humidity_sensors topic
  • Normal pressure readings → pressure_sensors topic
  • Unknown sensor types → unknown_sensor_data topic

Other Functions

generic

Generic custom function that can be used for any purpose. It can accept custom parameters and return any type of value.

Parameters

User-defined parameters

Return Value

Any value, depending on the function's purpose

Example

  calculate_price:
    type: generic
    parameters:
      - name: base_price
        type: double
      - name: discount_rate
        type: double
      - name: tax_rate
        type: double
    code: |
      # Calculate discounted price
      discount_amount = base_price * (discount_rate / 100)
      discounted_price = base_price - discount_amount

      # Calculate tax on discounted price  
      tax_amount = discounted_price * (tax_rate / 100)
      final_price = discounted_price + tax_amount

      return {
        "original_price": base_price,
        "discount_amount": discount_amount,
        "discounted_price": discounted_price,
        "tax_amount": tax_amount,
        "final_price": final_price,
        "total_savings": discount_amount
      }
    resultType: json
Producer - generic example (click to expand)
functions:
  generate_product_data:
    type: generator
    globalCode: |
      import random
      counter = 0
      products = ["laptop", "phone", "tablet", "headphones"]
      categories = ["electronics", "accessories", "computing"]
    code: |
      global counter, products, categories
      product_id = f"prod_{counter:03d}"
      counter += 1

      product_data = {
        "product_id": product_id,
        "name": random.choice(products),
        "category": random.choice(categories),
        "base_price": round(random.uniform(50.0, 1000.0), 2),
        "discount_rate": random.choice([0, 5, 10, 15, 20]),
        "quantity": random.randint(1, 100)
      }
    expression: (product_id, product_data)
    resultType: (string, json)

producers:
  product_producer:
    generator: generate_product_data
    interval: 2s
    to:
      topic: raw_products
      keyType: string
      valueType: json
Processor - generic example (click to expand)
streams:
  raw_products:
    topic: raw_products
    keyType: string
    valueType: json

functions:
  calculate_price:
    type: generic
    parameters:
      - name: base_price
        type: double
      - name: discount_rate
        type: double
      - name: tax_rate
        type: double
    code: |
      # Calculate discounted price
      discount_amount = base_price * (discount_rate / 100)
      discounted_price = base_price - discount_amount

      # Calculate tax on discounted price  
      tax_amount = discounted_price * (tax_rate / 100)
      final_price = discounted_price + tax_amount

      return {
        "original_price": base_price,
        "discount_amount": discount_amount,
        "discounted_price": discounted_price,
        "tax_amount": tax_amount,
        "final_price": final_price,
        "total_savings": discount_amount
      }
    resultType: json

  enrich_product:
    type: valueTransformer
    code: |
      if value and "base_price" in value and "discount_rate" in value:
        # Call our generic function with custom tax rate
        price_info = calculate_price(
          base_price=value["base_price"],
          discount_rate=value["discount_rate"], 
          tax_rate=8.5
        )

        # Add calculated pricing to the product data
        value["pricing"] = price_info
        value["processed_at"] = "2024-01-01T00:00:00Z"

      return value
    resultType: json

pipelines:
  process_products:
    from: raw_products
    via:
      - type: transformValue
        mapper: enrich_product
      - type: peek
        forEach:
          code: |
            original = value["pricing"]["original_price"]
            final = value["pricing"]["final_price"]
            savings = value["pricing"]["total_savings"]
            log.info("Processed product: {} - Original: ${:.2f}, Final: ${:.2f}, Saved: ${:.2f}".format(
                     key, original, final, savings))
    to:
      topic: enriched_products
      keyType: string
      valueType: json

What the example does:

Demonstrates how to create reusable business logic with generic functions:

  • Creates sample product data with base prices and discount rates
  • calculate_price accepts parameters to compute final pricing with tax
  • The generic function is called from within a valueTransformer
  • Adds calculated pricing information to product records

Key Technical Features:

  • type: generic for reusable custom functions
  • Custom parameter definitions with types (double, string, etc.)
  • Return any data structure (JSON objects, arrays, primitives)
  • Call generic functions from other functions using standard Python syntax
  • Mix generic functions with standard KSML function types

Expected Results:

When running these examples, you will see:

  • Product data being generated with random base prices and discount rates
  • Log messages showing: "Processed product: prod_001 - Original: $856.58, Final: $922.50, Saved: $128.49"
  • Each product enriched with detailed pricing calculations including tax
  • Generic function providing consistent pricing logic across all products

How KSML Functions Relate to Kafka Streams

KSML functions are Python implementations that map directly to Kafka Streams Java interfaces. Understanding this relationship helps you leverage Kafka Streams documentation and concepts:

Direct Mappings

Functions for stateless operations

KSML Function Type Kafka Streams Interface Purpose
forEach ForeachAction<K,V> Process records for side effects
keyTransformer KeyValueMapper<K,V,KR> Transform keys
keyValueTransformer KeyValueMapper<K,V,KeyValue<KR,VR>> Transform both key and value
predicate Predicate<K,V> Filter records based on conditions
valueTransformer ValueTransformer<V,VR> / ValueMapper<V,VR> Transform values

Functions for stateful operations

KSML Function Type Kafka Streams Interface Purpose
aggregator Aggregator<K,V,VA> Aggregate records incrementally
initializer Initializer<VA> Provide initial aggregation values
merger Merger<K,V> Merge aggregation results
reducer Reducer<V> Combine values of same type

Special Purpose Functions

KSML Function Type Kafka Streams Interface Purpose
foreignKeyExtractor Function<V,FK> Extract foreign key for joins
keyValueMapper KeyValueMapper<K,V,VR> Convert key-value to single output
valueJoiner ValueJoiner<V1,V2,VR> Join values from two streams
KSML Function Type Kafka Streams Interface Purpose
streamPartitioner StreamPartitioner<K,V> Custom partition selection
timestampExtractor TimestampExtractor Extract event time from records
topicNameExtractor TopicNameExtractor<K,V> Dynamic topic routing

Function Execution Context

When your Python functions execute, they have access to:

  • Logger: For outputting information to application logs

    • log.<log-level>("Debug message") - can be debug, info, warn, error, trace
  • Metrics: For monitoring function performance and behavior

    • metrics.counter("name").increment() - Count occurrences
    • metrics.gauge("name").record(value) - Record values
    • with metrics.timer("name"): - Measure execution time
  • State Stores: For maintaining state between function invocations (when configured)

    • store.get(key) - Retrieve value from store
    • store.put(key, value) - Store a value
    • store.delete(key) - Remove a value
    • Must be declared in the function's stores parameter

This execution context provides the tools needed for debugging, monitoring, and implementing stateful processing.