Function Reference
KSML functions let you implement custom stream-processing logic in Python. They make it easier for data scientists, analysts, and developers to process streaming data without needing Java or the Kafka Streams API.
Functions extend built-in operations, enabling custom business logic, transformations, and processing within the KSML runtime- combining Kafka Streams’ power with Python’s simplicity.
Function Definition Structure
Functions are defined in the functions
section of your KSML definition file. Each function has the following
properties:
Property | Type | Required | Description |
---|---|---|---|
type |
String | Yes | The type of function (predicate, aggregator, valueJoiner, etc.) |
parameters |
Array | No | Additional custom parameters to add to the function's built-in parameters (see note below) |
globalCode |
String | No | Python code executed once upon startup |
code |
String | No | Python code implementing the function |
expression |
String | No | An expression that the function will return as value |
resultType |
Data type | Sometimes | The data type returned by the function. Required when it cannot be derived from function type. |
stores |
Array | No | List of state stores the function can access |
Note about parameters: Every function type has built-in parameters that are automatically provided by KSML (e.g.,
key
and value
for most function types). The parameters
property is only needed when you want to add custom
parameters beyond these built-in ones. These additional parameters can then be passed when calling the function from
Python code.
Writing KSML Functions
Example KSML Function Definition
Example KSML Function Definition
functions:
# Example of a complete function definition with all components
process_sensor_data:
type: valueTransformer
globalCode: |
# This code runs once when the application starts
import json
import time
# Initialize global variables
sensor_threshold = 25.0
alert_count = 0
code: |
# This code runs for each message
global alert_count
# Process the sensor value
if value is None:
return None
temperature = value.get("temperature", 0)
# Convert Celsius to Fahrenheit
temperature_f = (temperature * 9/5) + 32
# Check for alerts
is_alert = temperature > sensor_threshold
if is_alert:
alert_count += 1
log.warn("High temperature detected: {}°C", temperature)
# Return enriched data
result = {
"original_temp_c": temperature,
"temp_fahrenheit": temperature_f,
"is_alert": is_alert,
"total_alerts": alert_count,
"processed_at": int(time.time() * 1000)
}
return result
resultType: json
# Example of a simple expression-based function
is_high_priority:
type: predicate
expression: value.get("priority", 0) > 7
resultType: boolean
KSML functions are defined in the functions
section of your KSML definition file. A typical function definition
includes:
- Type: Specifies the function's purpose and behavior
- Parameters: Input parameters the function accepts (defined by the function type)
- GlobalCode: Python code executed only once upon application start
- Code: Python code implementing the function's logic
- Expression: Shorthand for simple return expressions
- ResultType: The expected return type of the function
Function Definition Formats
KSML supports two formats for defining functions:
Expression Format
For simple, one-line functions:
functions:
is_valid:
type: predicate
code: |
# Code is optional here
expression: value.get("status") == "ACTIVE"
Code Block Format
For more complex functions:
functions:
process_transaction:
type: keyValueMapper
code: |
result = {}
# Copy basic fields
result["transaction_id"] = value.get("id")
result["amount"] = value.get("amount", 0)
# Calculate fee
amount = value.get("amount", 0)
if amount > 1000:
result["fee"] = amount * 0.02
else:
result["fee"] = amount * 0.03
# Add timestamp
result["processed_at"] = int(time.time() * 1000)
return result
resultType: struct
Function Parameters
Built-in vs Custom Parameters
Every function type in KSML has built-in parameters that are automatically provided by KSML. These are implicitly available in your function code without needing to declare them:
Most function types (like forEach
, predicate
, valueTransformer
) automatically receive:
key
- The record keyvalue
- The record value
Some specialized types have different built-in parameters:
aggregator
: receiveskey
,value
, andaggregate
merger
: receiveskey
,aggregate1
, andaggregate2
initializer
: receives no parameters
Adding Custom Parameters
The parameters
property allows you to add custom parameters beyond the built-in ones. This is useful when:
- Creating reusable functions that can behave differently based on configuration
- Calling functions from Python code with specific arguments
- Using the
generic
function type which has no built-in parameters
Example WITHOUT custom parameters:
functions:
simple_logger:
type: forEach
# Only uses built-in key and value parameters
code: |
log.info("Processing: key={}, value={}", key, value)
Example WITH custom parameters:
functions:
configurable_logger:
type: forEach
parameters: # ADDS 'prefix' to the built-in key and value
- name: prefix
type: string
code: |
log.info("{}: key={}, value={}", prefix, key, value)
When calling this function from Python:
# The custom parameter is passed along with built-in ones
configurable_logger(key, value, prefix="DEBUG")
Parameter Definition Structure
When defining custom parameters:
parameters:
- name: parameter_name # Name of the parameter
type: parameter_type # Data type (string, int, double, etc.)
Important: The parameters
property adds to the built-in parameters - it doesn't replace them. Built-in
parameters like key
and value
are still available in your function code.
Function Types Overview
Below is a table with all 21 function types in KSML.
Function Type | Purpose | Used In |
---|---|---|
Functions for stateless operations | ||
forEach | Process each message for side effects | peek |
keyTransformer | Convert a key to another type or value | mapKey, selectKey, toStream, transformKey |
keyValueToKeyValueListTransformer | Convert key and value to a list of key/values | flatMap, transformKeyValueToKeyValueList |
keyValueToValueListTransformer | Convert key and value to a list of values | flatMapValues, transformKeyValueToValueList |
keyValueTransformer | Convert key and value to another key and value | flatMapValues, transformKeyValueToValueList |
predicate | Return true/false based on message content | filter, branch |
valueTransformer | Convert value to another type or value | mapValue, mapValues, transformValue |
Functions for stateful operations | ||
aggregator | Incrementally build aggregated results | aggregate |
initializer | Provide initial values for aggregations | aggregate |
merger | Merge two aggregation results into one | aggregate |
reducer | Combine two values into one | reduce |
Special Purpose Functions | ||
foreignKeyExtractor | Extract a key from a join table's record | join, leftJoin |
generator | Function used in producers to generate a message | producer |
generic | Generic custom function | |
keyValueMapper | Convert key and value into a single output value | groupBy, join, leftJoin |
keyValuePrinter | Output key and value | |
metadataTransformer | Convert Kafka headers and timestamps | transformMetadata |
valueJoiner | Combine data from multiple streams | join, leftJoin, outerJoin |
Stream Related Functions | ||
streamPartitioner | Determine which partition to send records to | to |
timestampExtractor | Extract timestamps from messages | stream, table, globalTable |
topicNameExtractor | Derive a target topic name from key and value | toTopicNameExtractor |
Functions for stateless operations
forEach
Processes each message for side effects like logging, without changing the message.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
None (the function is called for its side effects)
functions:
extract_region_key:
type: keyTransformer
code: |
if value is None: return "unknown"
return value.get("region", "unknown")
resultType: string
Full example for forEach
:
keyTransformer
Transforms a key/value into a new key, which then gets combined with the original value as a new message on the output stream.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
New key for the output message
Example
Function Definition:
functions:
extract_region_key:
type: keyTransformer
code: |
if value is None: return "unknown"
return value.get("region", "unknown")
resultType: string
This function extracts the region from transaction data to use as the new message key, enabling region-based partitioning.
Complete Working Example:
Producer - keyTransformer
example (click to expand)
functions:
generate_region_data:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
counter += 1
tid = f"txn_{counter:05d}"
return tid, {
"transaction_id": tid,
"region": random.choice(["us-east", "us-west", "europe", "asia"]),
"amount": round(random.uniform(10.0, 1000.0), 2),
"customer_id": f"c{random.randint(1, 100):03d}",
"timestamp": counter * 1000
}
resultType: (string, json)
producers:
region_data_producer:
generator: generate_region_data
interval: 2s
to:
topic: transaction_events
keyType: string
valueType: json
Processor - keyTransformer
example (click to expand)
streams:
transaction_input:
topic: transaction_events
keyType: string
valueType: json
region_output:
topic: transactions_by_region
keyType: string
valueType: json
functions:
extract_region_key:
type: keyTransformer
code: |
if value is None: return "unknown"
return value.get("region", "unknown")
resultType: string
log_repartitioned:
type: forEach
code: |
log.info("Repartitioned - Key: {}, Region: {}, Amount: ${}",
key, value.get("region"), value.get("amount"))
pipelines:
region_repartitioning:
from: transaction_input
via:
- type: mapKey
mapper: extract_region_key
- type: peek
forEach: log_repartitioned
to: region_output
Additional Example:
Full example for keyTransformer
: Stream Table Join Tutorial
keyValueToKeyValueListTransformer
Takes one message and converts it into a list of output messages, which then get sent to the output stream. Unlike
keyValueToValueListTransformer
, this function can create new keys for each output message, enabling data reshaping and
repartitioning.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
A list of key-value pairs [(key1, value1), (key2, value2), ...]
Example
split_batch_orders:
type: keyValueToKeyValueListTransformer
code: |
# Split batch orders into individual orders with unique keys
# This transforms one batch record into multiple individual order records
if value is None or "orders" not in value:
return []
batch_id = key
orders = value.get("orders", [])
individual_records = []
for i, order in enumerate(orders):
# Create unique key for each individual order
order_key = f"{batch_id}_order_{i+1}"
# Create individual order record
order_value = {
"order_id": order_key,
"batch_id": batch_id,
"product": order.get("product"),
"quantity": order.get("quantity", 1),
"customer_email": order.get("customer_email"),
"processing_timestamp": value.get("timestamp")
}
individual_records.append((order_key, order_value))
log.info("Split batch {} into {} individual orders", batch_id, len(individual_records))
return individual_records
resultType: list(tuple(string, json))
This example demonstrates splitting batch orders into individual orders with unique keys, useful for processing bulk data into individual records.
Producer - keyvaluetokeyvaluelisttransformer
example (click to expand)
functions:
generate_batch_orders:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
counter += 1
bid = f"b{counter:03d}"
prods = ["laptop", "phone", "tablet"]
custs = ["alice@co", "bob@co", "charlie@co"]
orders = [{"product": random.choice(prods), "qty": random.randint(1,5),
"customer": random.choice(custs)} for _ in range(random.randint(2,4))]
return bid, {"batch_id": bid, "orders": orders, "total": len(orders)}
resultType: (string, json)
producers:
batch_order_producer:
generator: generate_batch_orders
interval: 3s
to:
topic: batch_orders
keyType: string
valueType: json
Processor - keyvaluetokeyvaluelisttransformer
example (click to expand)
streams:
batch_orders_input:
topic: batch_orders
keyType: string
valueType: json
individual_orders_output:
topic: individual_orders
keyType: string
valueType: json
functions:
split_batch_orders:
type: keyValueToKeyValueListTransformer
code: |
# Split batch orders into individual orders with unique keys
# This transforms one batch record into multiple individual order records
if value is None or "orders" not in value:
return []
batch_id = key
orders = value.get("orders", [])
individual_records = []
for i, order in enumerate(orders):
# Create unique key for each individual order
order_key = f"{batch_id}_order_{i+1}"
# Create individual order record
order_value = {
"order_id": order_key,
"batch_id": batch_id,
"product": order.get("product"),
"quantity": order.get("quantity", 1),
"customer_email": order.get("customer_email"),
"processing_timestamp": value.get("timestamp")
}
individual_records.append((order_key, order_value))
log.info("Split batch {} into {} individual orders", batch_id, len(individual_records))
return individual_records
resultType: list(tuple(string, json))
pipelines:
split_batch_processing:
from: batch_orders_input
via:
- type: transformKeyValueToKeyValueList
mapper: split_batch_orders
to: individual_orders_output
keyValueToValueListTransformer
Takes one message and converts it into a list of output values, which then get combined with the original key and sent to the output stream.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
A list of values [value1, value2, ...]
that will be combined with the original key
Example
explode_order_items:
type: keyValueToValueListTransformer
code: |
# Split order into individual item records
# Key remains the same (order_id), but each item becomes a separate value
if value is None or "items" not in value:
return []
items = value.get("items", [])
item_records = []
for item in items:
# Create individual item record with order context
item_record = {
"order_id": value.get("order_id"),
"customer_id": value.get("customer_id"),
"product": item.get("product"),
"quantity": item.get("quantity"),
"unit_price": item.get("price"),
"total_price": item.get("price", 0) * item.get("quantity", 0),
"order_total": value.get("order_total")
}
item_records.append(item_record)
log.info("Exploded order {} into {} item records",
value.get("order_id"), len(item_records))
return item_records
resultType: list(json)
Producer - keyValueToValueListTransformer
example (click to expand)
functions:
generate_order_data:
type: generator
globalCode: |
import random
counter = 0
products = ["laptop", "phone", "tablet", "headphones", "speaker"]
code: |
global counter, products
# Generate order ID as key
order_id = f"order_{counter:03d}"
counter += 1
# Generate order with multiple items
num_items = random.randint(2, 5)
items = []
for i in range(num_items):
item = {
"product": random.choice(products),
"quantity": random.randint(1, 3),
"price": round(random.uniform(50.0, 500.0), 2)
}
items.append(item)
# Create order value with items array
value = {
"order_id": order_id,
"customer_id": f"cust_{random.randint(1, 50):03d}",
"items": items,
"order_total": sum(item["price"] * item["quantity"] for item in items)
}
expression: (order_id, value)
resultType: (string, json)
producers:
order_producer:
generator: generate_order_data
interval: 3s
to:
topic: customer_orders
keyType: string
valueType: json
Processor - keyValueToValueListTransformer
example (click to expand)
streams:
orders_input:
topic: customer_orders
keyType: string
valueType: json
items_output:
topic: individual_items
keyType: string
valueType: json
functions:
explode_order_items:
type: keyValueToValueListTransformer
code: |
# Split order into individual item records
# Key remains the same (order_id), but each item becomes a separate value
if value is None or "items" not in value:
return []
items = value.get("items", [])
item_records = []
for item in items:
# Create individual item record with order context
item_record = {
"order_id": value.get("order_id"),
"customer_id": value.get("customer_id"),
"product": item.get("product"),
"quantity": item.get("quantity"),
"unit_price": item.get("price"),
"total_price": item.get("price", 0) * item.get("quantity", 0),
"order_total": value.get("order_total")
}
item_records.append(item_record)
log.info("Exploded order {} into {} item records",
value.get("order_id"), len(item_records))
return item_records
resultType: list(json)
pipelines:
explode_orders:
from: orders_input
via:
- type: transformKeyValueToValueList
mapper: explode_order_items
to: items_output
keyValueTransformer
Takes one message and converts it into another message, which may have different key/value types.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
A tuple of (new_key, new_value)
Example
create_external_request:
type: keyValueTransformer
code: |
import time
# Extract fields from JSON order event
if not value:
return None
order_id = value.get("order_id")
customer_id = value.get("customer_id")
status = value.get("status")
amount = value.get("amount")
timestamp = value.get("timestamp")
sequence_number = value.get("sequence_number")
order_details = value.get("order_details", {})
customer_info = value.get("customer_info", {})
fulfillment = value.get("fulfillment", {})
business_context = value.get("business_context", {})
metadata = value.get("metadata", {})
# Create comprehensive request for external payment processing system
request_id = f"REQ_{order_id}_{timestamp}"
external_request = {
"request_id": request_id,
"request_type": "PAYMENT_PROCESSING",
"original_order": {
"order_id": order_id,
"customer_id": customer_id,
"amount": amount,
"timestamp": timestamp,
"sequence_number": sequence_number
},
"payment_details": {
"amount": amount,
"currency": order_details.get("currency", "USD"),
"payment_method": order_details.get("payment_method"),
"customer_tier": customer_info.get("customer_tier"),
"loyalty_points": customer_info.get("loyalty_points")
},
"processing_context": {
"priority": business_context.get("priority", "normal"),
"high_value": business_context.get("high_value", False),
"customer_previous_orders": customer_info.get("previous_orders", 0),
"order_source": order_details.get("order_source")
},
"async_metadata": {
"correlation_id": metadata.get("correlation_id", f"corr_{request_id}"),
"created_at": int(time.time() * 1000),
"timeout_ms": 30000, # 30 second timeout
"retry_count": 0,
"expected_response_topic": "external_responses"
},
"external_system_info": {
"target_system": "payment_processor",
"api_version": "v2.1",
"request_format": "async_json",
"callback_required": True
}
}
log.info("Created external payment request for order {}: amount=${:.2f}, priority={}",
order_id, amount, business_context.get("priority", "normal"))
return (request_id, external_request)
expression: result
resultType: (string, json)
Full example for keyValueTransformer
:
predicate
Returns true or false based on message content. Used for filtering and branching operations.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
Boolean (true or false)
Example
is_critical_sensor:
type: predicate
code: |
# Check location
if value.get('sensors', {}).get('location') not in ['server_room', 'data_center']:
return False
# Check temperature threshold based on location
if value.get('sensors', {}).get('location') == 'server_room' and value.get('sensors', {}).get('temperature') > 20:
return True
if value.get('sensors', {}).get('location') == 'data_center' and value.get('sensors', {}).get('temperature') > 30:
return True
return False
Full example for predicate
:
- Tutorial: Filtering and Transforming for predicate functions for data filtering
valueTransformer
Transforms a key/value into a new value, which is combined with the original key and sent to the output stream.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
New value for the output message
Example
convert_temperature:
type: valueTransformer
code: |
result = {
"device_id": value.get('device_id'),
"temperature_c": round((value.get('temperature') - 32) * 5/9, 2) if value.get('temperature') else None,
"humidity": value.get('humidity'),
"timestamp": value.get('timestamp')
}
expression: result
resultType: json
Full example for valueTransformer
:
- Tutorial: Filtering and Transforming for understanding valueTransformer for data enrichment
Functions for stateful operations
aggregator
Incrementally builds aggregated results from multiple messages.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
aggregatedValue | Any | The current aggregated value (can be None) |
Return Value
New aggregated value
Example
update_stats:
type: aggregator
code: |
# Update payment statistics
if value and aggregatedValue:
amount = value.get("amount", 0.0)
aggregatedValue["count"] = aggregatedValue.get("count", 0) + 1
aggregatedValue["total_amount"] = aggregatedValue.get("total_amount", 0.0) + amount
current_min = aggregatedValue.get("min_amount")
current_max = aggregatedValue.get("max_amount")
if current_min is None or amount < current_min:
aggregatedValue["min_amount"] = amount
if current_max is None or amount > current_max:
aggregatedValue["max_amount"] = amount
aggregatedValue["average_amount"] = round(aggregatedValue["total_amount"] / aggregatedValue["count"], 2)
expression: aggregatedValue
resultType: json
Full example for aggregator
:
- Tutorial: Aggregations for comprehensive aggregator function examples
initializer
Provides initial values for aggregations.
Parameters
None
Return Value
Initial value for aggregation
Example
init_stats:
type: initializer
code: |
# Initialize statistics
stats = {
"count": 0,
"total_amount": 0.0,
"min_amount": None,
"max_amount": None
}
expression: stats
resultType: json
Full example for initializer
:
merger
Merges two aggregation results into one. Used in aggregation operations to combine partial results.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value1 | Any | The value of the first aggregation |
value2 | Any | The value of the second aggregation |
Return Value
The merged aggregation result
Example
merge_counts:
type: merger
code: |
# This function is called when two session windows need to be merged
# For example, when a late-arriving event connects two previously separate sessions
count1 = value1 or 0
count2 = value2 or 0
merged_total = count1 + count2
log.info("Merging sessions: {} + {} = {}", count1, count2, merged_total)
return merged_total
resultType: int
Producer - merger
example (click to expand)
functions:
generate_user_activity:
type: generator
globalCode: |
import random, time
counter = 0
last_act = {}
base_t = int(time.time() * 1000)
code: |
global counter, last_act, base_t
uid = random.choice(["alice", "bob", "charlie"])
t = base_t + (counter * 2000)
if uid in last_act and random.random() < 0.15:
t = last_act[uid] + 660000 # 11min gap
last_act[uid] = t
counter += 1
return uid, {
"user_id": uid,
"page": random.choice(["home", "products", "profile"]),
"timestamp": t,
"event_type": "page_view"
}
resultType: (string, json)
producers:
activity_producer:
generator: generate_user_activity
interval: 2s
to:
topic: user_activity
keyType: string
valueType: json
Processor - merger
example (click to expand)
streams:
user_activity_input:
topic: user_activity
keyType: string
valueType: json
functions:
# Initialize session counter
init_counter:
type: initializer
expression: 0
resultType: int
# Count events in session
count_events:
type: aggregator
code: |
# Increment the counter for each event
return (aggregatedValue or 0) + 1
resultType: int
# Merge session counters when sessions are combined
merge_counts:
type: merger
code: |
# This function is called when two session windows need to be merged
# For example, when a late-arriving event connects two previously separate sessions
count1 = value1 or 0
count2 = value2 or 0
merged_total = count1 + count2
log.info("Merging sessions: {} + {} = {}", count1, count2, merged_total)
return merged_total
resultType: int
pipelines:
session_counting:
from: user_activity_input
via:
- type: groupByKey
- type: windowBySession
inactivityGap: 10m # Session ends after 10 minutes of inactivity
grace: 2m # Allow 2 minutes for late events
- type: aggregate
initializer: init_counter
aggregator: count_events
merger: merge_counts # Required for session windows - merges overlapping sessions
store:
type: session
retention: 1h
- type: toStream
- type: peek
forEach:
code: |
log.info("User {} session: {} events", key, value)
# Convert windowed key to string for output serialization
- type: mapKey
mapper:
code: |
# Convert windowed key to readable string format
# The windowed key is a dictionary with 'key', 'start', 'end' fields
return f"user_{key['key']}_window_{key['start']}_{key['end']}"
resultType: string
to:
topic: session_counts
keyType: string # Now using simple string keys after transformation
valueType: int
The merger function is specifically designed for session window aggregations where late-arriving events can merge previously separate sessions. This example demonstrates user activity tracking with session-based counting.
What the example does:
Simulates user activity tracking with session windows and merging:
- Groups events into 10-min inactivity sessions
- Counts events per user
- Merges sessions when late events connect them
- Producer simulates gaps to trigger merging
Key Features:
- Automatic session windowing & late data handling
- Type-safe merger (integers)
- Windowed key transformation for output
- Merge logic adds event counts
Expected Results:
When running this example, you'll see log messages like:
"Merging sessions: 3 + 2 = 5"
- Shows the merger function combining session counts"User alice session: 4 events"
- Displays final session results after merging- Session windows spanning different time periods for each user
reducer
Combines two values into one.
Parameters
Parameter | Type | Description |
---|---|---|
value1 | Any | The first value to combine |
value2 | Any | The second value to combine |
Return Value
Combined value
Example
sum_amounts:
type: reducer
code: |
# Sum two transaction amounts (in cents)
total = value1 + value2
expression: total
resultType: long
Full example for reducer
:
Special Purpose Functions
foreignKeyExtractor
Extracts a key from a join table's record. Used during join operations to determine which records to join.
Parameters
Parameter | Type | Description |
---|---|---|
value | Any | The value of the record to get a key from |
Return Value
The key to look up in the table being joined with
Example
extract_customer_id:
type: foreignKeyExtractor
code: |
# Extract the foreign key (customer_id) from the order value
# This key will be used to look up the customer in the customers table
if value is None:
return None
customer_id = value.get("customer_id")
log.debug("Extracting customer_id: {} from order", customer_id)
return customer_id
resultType: string
Producer - foreignKeyExtractor
example (click to expand)
functions:
generate_orders:
type: generator
globalCode: |
import random, time
order_counter = 1
code: |
global order_counter
oid = f"ord_{order_counter:03d}"
order_counter += 1
return oid, {
"order_id": oid,
"customer_id": random.choice(["c001", "c002", "c003"]),
"product": random.choice(["laptop", "phone"]),
"amount": random.randint(100, 1000),
"timestamp": int(time.time() * 1000)
}
resultType: (string, json)
generate_customers:
type: generator
globalCode: |
import time, random
custs = [
{"id": "c001", "name": "Alice", "tier": "gold"},
{"id": "c002", "name": "Bob", "tier": "silver"},
{"id": "c003", "name": "Charlie", "tier": "bronze"}
]
code: |
global custs
c = random.choice(custs)
return c["id"], {
"customer_id": c["id"],
"name": c["name"],
"tier": c["tier"],
"created_at": int(time.time() * 1000)
}
resultType: (string, json)
producers:
order_producer:
generator: generate_orders
interval: 3s
to:
topic: orders
keyType: string
valueType: json
customer_producer:
generator: generate_customers
interval: 5s # Create customers periodically
to:
topic: customers
keyType: string
valueType: json
Processor - foreignKeyExtractor
example (click to expand)
streams:
orders_input:
topic: orders
keyType: string
valueType: json
tables:
customers_table:
topic: customers
keyType: string
valueType: json
functions:
# Extract customer_id from order value to join with customers table
extract_customer_id:
type: foreignKeyExtractor
code: |
# Extract the foreign key (customer_id) from the order value
# This key will be used to look up the customer in the customers table
if value is None:
return None
customer_id = value.get("customer_id")
log.debug("Extracting customer_id: {} from order", customer_id)
return customer_id
resultType: string
# Join order with customer data
join_order_customer:
type: valueJoiner
code: |
# value1 is the order, value2 is the customer
order = value1
customer = value2
if order is None:
return None
# Create enriched order with customer information
enriched_order = {
"order_id": order.get("order_id"),
"product": order.get("product"),
"amount": order.get("amount"),
"timestamp": order.get("timestamp"),
"customer": {
"customer_id": order.get("customer_id"),
"name": customer.get("name", "Unknown") if customer else "Unknown",
"email": customer.get("email", "Unknown") if customer else "Unknown",
"tier": customer.get("tier", "Unknown") if customer else "Unknown"
}
}
log.info("Joined order {} with customer {}",
order.get("order_id"),
customer.get("name") if customer else "Unknown")
return enriched_order
resultType: json
pipelines:
enrich_orders:
from: orders_input
via:
# Join orders table with customers table using foreignKeyExtractor
- type: join
table: customers_table
foreignKeyExtractor: extract_customer_id # Extracts customer_id from order
valueJoiner: join_order_customer
- type: peek
forEach:
code: |
log.info("Enriched order: {} for customer: {}",
value.get("order_id"),
value.get("customer", {}).get("name"))
to:
topic: enriched_orders
keyType: string # Still keyed by order_id
valueType: json
The foreignKeyExtractor enables table joins where the join key is embedded within the record value rather than being the record key. This example demonstrates order enrichment by joining with customer data using a foreign key relationship.
Example
This example simulates an e-commerce system enriching orders with customer data:
- Orders keyed by
order_id
, referencingcustomer_id
- Customer details looked up by
customer_id
- Foreign key extracted from orders
- Orders joined with customers to produce enriched records
Key Features:
- Foreign key join pattern
- KSML table join with key extraction
- Data enrichment from multiple sources
- Preserves original order keys
Expected Results:
When running this example, you'll see log messages like:
"Joined order order_001 with customer Alice Johnson"
- Shows successful order-customer joins"Enriched order: order_003 for customer: Bob Smith"
- Displays enriched results with customer names- Orders enriched with customer tier, email, and name information
Use Cases:
This pattern is commonly used for:
- Order enrichment with customer details
- Transaction enrichment with account information
- Event enrichment with user profiles
- Any scenario where records contain foreign key references
generator
Function used in producers to generate messages. It takes no input parameters and produces key-value pairs.
Parameters
None
Return Value
A tuple of (key, value) representing the generated message
Example
generate_tutorial_data:
type: generator
globalCode: |
import random
sensor_id = 0
locations = ["server_room", "warehouse", "data_center"]
code: |
global sensor_id, locations
key = "sensor" + str(sensor_id)
sensor_id = (sensor_id + 1) % 5
location = random.choice(locations)
sensors = {"temperature": random.randrange(150), "humidity": random.randrange(90), "location": location}
value = {"sensors": sensors}
expression: (key, value)
resultType: (string, json)
Full example for generator
:
keyValueMapper
Transforms both the key and value of a record.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
Tuple of (new_key, new_value)
Example
extract_product_id:
type: keyValueMapper
code: |
# Map from order (key, value) to product_id for join
product_id = value.get("product_id") if value else None
expression: product_id
resultType: string
Full example for keyValueMapper
:
keyValuePrinter
Converts a message to a string for output to a file or stdout.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
Return Value
String to be written to file or stdout
Example
The keyValuePrinter formats records for human-readable output to stdout or files. This example shows converting sales data into formatted reports for monitoring and debugging.
Producer - keyValuePrinter
example (click to expand)
functions:
generate_sales_data:
type: generator
globalCode: |
import random
counter = 1
products = ["laptop", "mouse", "keyboard", "monitor", "webcam"]
customers = ["alice", "bob", "charlie", "diana"]
code: |
global counter, products, customers
sale_id = f"sale_{counter:03d}"
counter += 1
sale_data = {
"sale_id": sale_id,
"product": random.choice(products),
"customer": random.choice(customers),
"amount": round(random.uniform(19.99, 299.99), 2),
"quantity": random.randint(1, 3),
"region": random.choice(["north", "south", "east", "west"])
}
expression: (sale_id, sale_data)
resultType: (string, json)
producers:
sales_producer:
generator: generate_sales_data
interval: 3s
to:
topic: sales_data
keyType: string
valueType: json
Processor - keyValuePrinter
example (click to expand)
streams:
sales_input:
topic: sales_data
keyType: string
valueType: json
functions:
format_sales_report:
type: keyValuePrinter
code: |
if value is None:
return f"ERROR: Sale {key} has no data"
# Extract sale information
product = value.get("product", "Unknown")
customer = value.get("customer", "Unknown")
amount = value.get("amount", 0)
quantity = value.get("quantity", 0)
region = value.get("region", "Unknown")
# Create formatted sales report
return f"SALE REPORT | ID: {key} | Customer: {customer} | Product: {product} | Qty: {quantity} | Amount: ${amount:.2f} | Region: {region}"
resultType: string
pipelines:
print_sales:
from: sales_input
print:
mapper: format_sales_report
What the example does:
Demonstrates formatted business reporting with KSML:
- Sales Data Processing: Converts raw sales records into reports
- Custom Formatting: Transforms JSON into readable output
- Print Operation: Outputs formatted data via
print
- Real-time Monitoring: Enables instant visibility of transactions
Key Features:
- Python string formatting
- Null/error handling
keyValuePrinter
function- Field extraction from JSON
Expected Results:
When running this example, you'll see formatted output like:
SALE REPORT | ID: sale_001 | Customer: alice | Product: laptop | Qty: 2 | Amount: $1299.99 | Region: north
SALE REPORT | ID: sale_002 | Customer: bob | Product: mouse | Qty: 1 | Amount: $29.99 | Region: south
metadataTransformer
Transforms a message's metadata (headers and timestamp).
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
metadata | Object | Contains the headers and timestamp of the message |
Return Value
Modified metadata for the output message
Example
enrich_event_metadata:
type: metadataTransformer
code: |
import time
# Get processing timestamp
process_time = int(time.time() * 1000)
# Determine event severity based on status code
status_code = value.get("status_code", 200) if value else 200
severity = "critical" if status_code >= 500 else "warning" if status_code >= 400 else "info"
# Add processing headers
new_headers = [
{"key": "processed_timestamp", "value": str(process_time)},
{"key": "event_severity", "value": severity},
{"key": "processor_id", "value": "ksml-metadata-enricher"}
]
# Preserve existing headers and add new ones
existing_headers = metadata.get("headers", [])
metadata["headers"] = existing_headers + new_headers
log.info("Enriched event {} with {} additional headers", key, len(new_headers))
return metadata
Producer - metadataTransformer
example (click to expand)
functions:
generate_api_events:
type: generator
globalCode: |
import random
counter = 0
endpoints = ["/api/users", "/api/orders", "/api/products", "/api/health"]
methods = ["GET", "POST", "PUT", "DELETE"]
code: |
global counter, endpoints, methods
# Generate event ID as key
event_id = f"evt_{counter:04d}"
counter += 1
# Generate API event data
event_data = {
"event_id": event_id,
"endpoint": random.choice(endpoints),
"method": random.choice(methods),
"user_id": f"user_{random.randint(1, 50):03d}",
"status_code": random.choices([200, 201, 400, 404, 500], weights=[60, 15, 15, 5, 5])[0],
"response_time_ms": random.randint(50, 1500),
"timestamp": "auto"
}
expression: (event_id, event_data)
resultType: (string, json)
producers:
api_event_producer:
generator: generate_api_events
interval: 2s
to:
topic: api_events
keyType: string
valueType: json
Processor - metadataTransformer
example (click to expand)
streams:
api_events_input:
topic: api_events
keyType: string
valueType: json
functions:
enrich_event_metadata:
type: metadataTransformer
code: |
import time
# Get processing timestamp
process_time = int(time.time() * 1000)
# Determine event severity based on status code
status_code = value.get("status_code", 200) if value else 200
severity = "critical" if status_code >= 500 else "warning" if status_code >= 400 else "info"
# Add processing headers
new_headers = [
{"key": "processed_timestamp", "value": str(process_time)},
{"key": "event_severity", "value": severity},
{"key": "processor_id", "value": "ksml-metadata-enricher"}
]
# Preserve existing headers and add new ones
existing_headers = metadata.get("headers", [])
metadata["headers"] = existing_headers + new_headers
log.info("Enriched event {} with {} additional headers", key, len(new_headers))
return metadata
pipelines:
enrich_api_events:
from: api_events_input
via:
- type: transformMetadata
mapper: enrich_event_metadata
- type: peek
forEach:
code: |
log.info("API Event: {} {} - Status: {} - Response Time: {}ms",
value.get("method"), value.get("endpoint"),
value.get("status_code"), value.get("response_time_ms"))
print:
mapper:
code: |
method = value.get('method', 'UNKNOWN')
endpoint = value.get('endpoint', '/unknown')
status = value.get('status_code', 0)
return f"ENRICHED EVENT | {key} | {method} {endpoint} | Status: {status} | Headers processed"
resultType: string
This example:
-
Shows metadata enrichment in stream processing.
-
Generates realistic API events (endpoints + status codes)
- Enriches headers with timestamps, severity, processor ID
- Classifies events (critical/warning/info) by status code
- Logs enrichment for monitoring/debugging
- Uses Python
time
for timestamps - Has conditional + extensible headers
- Uses
transformMetadata
withmapper
param - Output is formatted via
print
Expected Results:
When running this example, you'll see enriched events with additional headers:
ENRICHED EVENT | evt_0001 | POST /api/users | Status: 200 | Headers processed
ENRICHED EVENT | evt_0002 | DELETE /api/health | Status: 400 | Headers processed
- Log messages showing: "Enriched event evt_0001 with 3 additional headers"
streamPartitioner
Determines which partition a record should be sent to when writing to a Kafka topic. This allows custom partitioning logic based on record content, ensuring related records go to the same partition for ordered processing.
Parameters
Parameter | Type | Description |
---|---|---|
topic | String | The name of the topic the record is being sent to |
key | Any | The key of the record being partitioned |
value | Any | The value of the record being partitioned |
numPartitions | Integer | The total number of partitions available in the target topic |
Return Value
An integer representing the partition number (0 to numPartitions-1) where the record should be sent
Example
Note:
To test this streamPartitioner example, ensure your topics have sufficient partitions. The example requires minimum 9 partitions since it routes to partitions 0-8. Update your docker-compose.yml:
functions:
# StreamPartitioner function determines which partition to send each record to
# Returns an integer representing the target partition number
priority_region_partitioner:
type: streamPartitioner
code: |
# Custom partitioning logic based on priority and region
# This ensures orders with same priority+region go to same partition
# for ordered processing and improved locality
if value:
priority = value.get("priority", "standard")
region = value.get("region", "UNKNOWN")
# Map priority to a base partition range
priority_map = {
"express": 0, # Express gets partitions 0-2
"standard": 3, # Standard gets partitions 3-5
"economy": 6 # Economy gets partitions 6-8
}
# Map region to offset within priority range
region_map = {
"NORTH": 0,
"SOUTH": 1,
"EAST": 2,
"WEST": 0,
"CENTRAL": 1
}
base_partition = priority_map.get(priority, 3)
region_offset = region_map.get(region, 0)
# Calculate target partition (assuming 9 partitions total)
partition = (base_partition + region_offset) % 9
log.debug("Routing order {} to partition {}: priority={}, region={}",
value.get("order_id"), partition, priority, region)
return partition
# Default to partition 0 if no value
return 0
Producer - streamPartitioner
example (click to expand)
functions:
generate_order_event:
type: generator
globalCode: |
import time
import random
order_id = 0
regions = ["NORTH", "SOUTH", "EAST", "WEST", "CENTRAL"]
priorities = ["express", "standard", "economy"]
code: |
global order_id
order_id += 1
# Generate order details with priority and region
order = {
"order_id": f"ORD-{order_id:04d}",
"amount": round(random.uniform(10.0, 1000.0), 2),
"priority": random.choice(priorities),
"region": random.choice(regions),
"customer_id": f"CUST-{random.randint(1000, 9999)}",
"timestamp": int(time.time() * 1000)
}
# Log the order generation
log.info("Generated order: {} with priority={}, region={}",
order["order_id"], order["priority"], order["region"])
# Use order_id as key, order details as value
key = order["order_id"]
value = order
expression: (key, value)
resultType: (string, json)
producers:
order_events_producer:
generator: generate_order_event
interval: 2s
to:
topic: order_events
keyType: string
valueType: json
Processor - streamPartitioner
example (click to expand)
streams:
order_events:
topic: order_events
keyType: string
valueType: json
offsetResetPolicy: earliest
partitioned_orders:
topic: partitioned_orders
keyType: string
valueType: json
functions:
# StreamPartitioner function determines which partition to send each record to
# Returns an integer representing the target partition number
priority_region_partitioner:
type: streamPartitioner
code: |
# Custom partitioning logic based on priority and region
# This ensures orders with same priority+region go to same partition
# for ordered processing and improved locality
if value:
priority = value.get("priority", "standard")
region = value.get("region", "UNKNOWN")
# Map priority to a base partition range
priority_map = {
"express": 0, # Express gets partitions 0-2
"standard": 3, # Standard gets partitions 3-5
"economy": 6 # Economy gets partitions 6-8
}
# Map region to offset within priority range
region_map = {
"NORTH": 0,
"SOUTH": 1,
"EAST": 2,
"WEST": 0,
"CENTRAL": 1
}
base_partition = priority_map.get(priority, 3)
region_offset = region_map.get(region, 0)
# Calculate target partition (assuming 9 partitions total)
partition = (base_partition + region_offset) % 9
log.debug("Routing order {} to partition {}: priority={}, region={}",
value.get("order_id"), partition, priority, region)
return partition
# Default to partition 0 if no value
return 0
resultType: integer
pipelines:
partition_orders:
from: order_events
via:
# Transform to add routing metadata
- type: transformValue
mapper:
code: |
# Add partition routing info to the order
if value:
# Calculate which partition this will go to
priority = value.get("priority", "standard")
region = value.get("region", "UNKNOWN")
value["routing_info"] = f"Priority: {priority}, Region: {region}"
result = value
expression: result
resultType: json
# Peek to log the routing decision
- type: peek
forEach:
code: |
if value:
log.info("Processing order {}: {} -> will be partitioned by priority/region",
key, value.get("routing_info", "unknown"))
# Use the streamPartitioner function when writing to output topic
to:
topic: partitioned_orders
keyType: string
valueType: json
partitioner: priority_region_partitioner
The streamPartitioner function provides custom control over how records are distributed across topic partitions. This example demonstrates intelligent order routing based on business priorities and geographic regions.
What the example does:
Implements a sophisticated partitioning strategy for order processing:
- Routes orders to specific partitions based on priority (express/standard/economy)
- Further segments by geographic region within each priority tier
- Ensures orders with same priority+region go to same partition
- Producer generates realistic order events with various priorities/regions
Key Features:
- Custom partition calculation based on multiple fields
- Guaranteed ordering for related records (same priority+region)
- Improved data locality and processing efficiency
- Explicit partition count handling (9 partitions total)
- Fallback to partition 0 for edge cases
Expected Results:
When running this example, you'll see log messages like:
"Generated order: ORD-0001 with priority=economy, region=CENTRAL"
- Producer creating orders"Processing order ORD-0001: Priority: economy, Region: CENTRAL -> will be partitioned by priority/region"
- Routing decision- Express orders (priority=express) go to partitions 0-2
- Standard orders (priority=standard) go to partitions 3-5
- Economy orders (priority=economy) go to partitions 6-8
- Within each priority range, regions determine the exact partition
valueJoiner
Combines values from two streams or tables during join operations, creating enriched records that contain data from both sources. The function must handle cases where one or both values might be null, depending on the join type (inner, left, outer). This function has access to the join key for context-aware value combination.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The join key used to match records |
value1 | Any | The value from the first stream/table |
value2 | Any | The value from the second stream/table |
Return Value
Combined value
Example
join_order_with_product:
type: valueJoiner
code: |
# Combine order and product information
result = {}
# Add order details
if value1 is not None:
result.update(value1)
# Add product details
if value2 is not None:
result["product_details"] = value2
# Calculate total price
quantity = value1.get("quantity", 0) if value1 else 0
price = value2.get("price", 0) if value2 else 0
result["total_price"] = quantity * price
new_value = result
expression: new_value
resultType: json
Full example for valueJoiner
:
- Tutorial: Joins for learning about stream enrichment
Stream Related Functions
timestampExtractor
Extracts timestamps from messages for time-based operations.
Parameters
Parameter | Type | Description |
---|---|---|
record | Object | The ConsumerRecord containing key, value, timestamp, and metadata |
previousTimestamp | Long | The previous timestamp (can be used as fallback) |
Return Value
Timestamp in milliseconds (long)
Example
timestampExtractor:
code: |
# Extract custom timestamp from message value
try:
# Try different ways to get the value data
value_data = None
# Method 1: If record is a ConsumerRecord with .value() method
if hasattr(record, 'value') and callable(record.value):
value_data = record.value()
# Method 2: If record is the value directly (dict)
elif isinstance(record, dict):
value_data = record
# Extract custom timestamp from event data
if value_data and "event_timestamp" in value_data:
event_time = value_data.get("event_timestamp")
if event_time and event_time > 0:
log.info("Using event timestamp: {} for {}", event_time, value_data.get("event_id"))
return event_time
except Exception as e:
log.warn("Error extracting custom timestamp: {}", str(e))
# Fallback to record timestamp or current time
import time
current_time = int(time.time() * 1000)
log.info("Using current time as fallback: {}", current_time)
return current_time
Producer - timestampExtractor
example (click to expand)
functions:
generate_events_with_timestamps:
type: generator
globalCode: |
import random, time
counter = 0
base_t = int(time.time() * 1000)
code: |
global counter, base_t
counter += 1
eid = f"e{counter:04d}"
offset = random.randint(-300, 60) # -5min to +1min
return eid, {
"event_id": eid,
"event_timestamp": base_t + (counter * 1000) + (offset * 1000),
"event_type": random.choice(["action", "system", "data"]),
"user_id": f"u{random.randint(1, 100):03d}",
"delay": abs(offset) if offset < 0 else 0
}
resultType: (string, json)
producers:
event_producer:
generator: generate_events_with_timestamps
interval: 1s
to:
topic: timestamped_events
keyType: string
valueType: json
Processor - timestampExtractor
example (click to expand)
streams:
events_input:
topic: timestamped_events
keyType: string
valueType: json
# Configure custom timestamp extraction
timestampExtractor:
code: |
# Extract custom timestamp from message value
try:
# Try different ways to get the value data
value_data = None
# Method 1: If record is a ConsumerRecord with .value() method
if hasattr(record, 'value') and callable(record.value):
value_data = record.value()
# Method 2: If record is the value directly (dict)
elif isinstance(record, dict):
value_data = record
# Extract custom timestamp from event data
if value_data and "event_timestamp" in value_data:
event_time = value_data.get("event_timestamp")
if event_time and event_time > 0:
log.info("Using event timestamp: {} for {}", event_time, value_data.get("event_id"))
return event_time
except Exception as e:
log.warn("Error extracting custom timestamp: {}", str(e))
# Fallback to record timestamp or current time
import time
current_time = int(time.time() * 1000)
log.info("Using current time as fallback: {}", current_time)
return current_time
ordered_events:
topic: time_ordered_events
keyType: string
valueType: json
functions:
log_timestamp_info:
type: forEach
code: |
event_time = value.get("event_timestamp") if value else 0
delay = value.get("processing_delay", 0) if value else 0
log.info("Event processed in time order: {} (event_time={}, delay={}s)",
key, event_time, delay)
pipelines:
process_with_event_time:
from: events_input
via:
- type: peek
forEach: log_timestamp_info
to: ordered_events
What the example does:
Demonstrates custom timestamp extraction for event-time processing:
- Creates events with custom timestamps that simulate out-of-order and delayed processing scenarios
- Extracts event-time timestamps from message content rather than using record timestamps
- Processes events based on their event time rather than arrival time
- Provides robust fallback mechanisms for missing or invalid timestamps
- Uses custom
timestampExtractor
function in stream definition - Uses Python time manipulation and timestamp handling
- Support for both ConsumerRecord and direct value access patterns
Expected Results:
When running this example, you'll see events processed in time order:
Event processed in time order: event_0001 (event_time=1755974335641, delay=155s)
Event processed in time order: event_0002 (event_time=1755974539885, delay=41s)
- Log messages showing: "Using event timestamp: 1755974601885 for event_0015"
topicNameExtractor
Dynamically determines the target topic for message routing based on record content. This enables intelligent message distribution, multi-tenancy support, and content-based routing patterns without requiring separate processing pipelines. The function has access to record context for advanced routing decisions.
Parameters
Parameter | Type | Description |
---|---|---|
key | Any | The key of the record being processed |
value | Any | The value of the record being processed |
recordContext | Object | Record metadata and processing context |
Return Value
String representing the topic name to send the message to
Example
route_by_sensor_type:
type: topicNameExtractor
code: |
if value is None:
return "unknown_sensor_data"
sensor_type = value.get("sensor_type", "unknown")
alert_level = value.get("alert_level", "normal")
# Route critical alerts to dedicated topic regardless of sensor type
if alert_level == "critical":
log.warn("Critical alert from sensor {}: {} reading = {}",
value.get("sensor_id"), sensor_type, value.get("reading"))
return "critical_sensor_alerts"
# Route by sensor type for normal and warning levels
if sensor_type == "temperature":
return "temperature_sensors"
elif sensor_type == "humidity":
return "humidity_sensors"
elif sensor_type == "pressure":
return "pressure_sensors"
else:
return "unknown_sensor_data"
Producer - topicNameExtractor
example (click to expand)
functions:
generate_sensor_data:
type: generator
globalCode: |
import random
counter = 0
sensor_types = ["temperature", "humidity", "pressure"]
locations = ["factory_a", "factory_b", "warehouse"]
code: |
global counter, sensor_types, locations
# Generate sensor ID as key
sensor_id = f"sensor_{counter % 10:02d}"
counter += 1
# Generate sensor reading
sensor_type = random.choice(sensor_types)
value = {
"sensor_id": sensor_id,
"sensor_type": sensor_type,
"location": random.choice(locations),
"reading": round(random.uniform(10.0, 100.0), 2),
"timestamp": counter * 1000,
"alert_level": random.choices(["normal", "warning", "critical"], [80, 15, 5])[0]
}
expression: (sensor_id, value)
resultType: (string, json)
producers:
sensor_producer:
generator: generate_sensor_data
interval: 2s
to:
topic: mixed_sensor_data
keyType: string
valueType: json
Processor - topicNameExtractor
example (click to expand)
streams:
mixed_sensors:
topic: mixed_sensor_data
keyType: string
valueType: json
temperature_data:
topic: temperature_sensors
keyType: string
valueType: json
humidity_data:
topic: humidity_sensors
keyType: string
valueType: json
pressure_data:
topic: pressure_sensors
keyType: string
valueType: json
critical_alerts:
topic: critical_sensor_alerts
keyType: string
valueType: json
functions:
route_by_sensor_type:
type: topicNameExtractor
code: |
if value is None:
return "unknown_sensor_data"
sensor_type = value.get("sensor_type", "unknown")
alert_level = value.get("alert_level", "normal")
# Route critical alerts to dedicated topic regardless of sensor type
if alert_level == "critical":
log.warn("Critical alert from sensor {}: {} reading = {}",
value.get("sensor_id"), sensor_type, value.get("reading"))
return "critical_sensor_alerts"
# Route by sensor type for normal and warning levels
if sensor_type == "temperature":
return "temperature_sensors"
elif sensor_type == "humidity":
return "humidity_sensors"
elif sensor_type == "pressure":
return "pressure_sensors"
else:
return "unknown_sensor_data"
log_routing:
type: forEach
code: |
log.info("Sensor data: {} type={} level={} reading={}",
key, value.get("sensor_type"),
value.get("alert_level"), value.get("reading"))
pipelines:
route_sensor_data:
from: mixed_sensors
via:
- type: peek
forEach: log_routing
toTopicNameExtractor:
topicNameExtractor: route_by_sensor_type
What the example does:
Demonstrates dynamic topic routing based on message content:
- Creates mixed sensor data (temperature, humidity, pressure) with varying alert levels
- Routes messages to different topics based on sensor type and priority
- Prioritizes critical alerts to a dedicated topic regardless of sensor type
- Distributes normal/warning messages to type-specific topics
Key Technical Features:
topicNameExtractor
function for dynamic topic selection- Priority-based routing with alert level evaluation
- Fallback topic handling for unknown sensor types
toTopicNameExtractor
operation instead of staticto
operation- Integration with logging for routing visibility
Expected Results:
When running this example, you'll see messages routed to different topics:
- Critical alerts:
"Critical alert from sensor sensor_05: pressure reading = 78.26"
→critical_sensor_alerts
topic - Normal temperature readings →
temperature_sensors
topic - Normal humidity readings →
humidity_sensors
topic - Normal pressure readings →
pressure_sensors
topic - Unknown sensor types →
unknown_sensor_data
topic
Other Functions
generic
Generic custom function that can be used for any purpose. It can accept custom parameters and return any type of value.
Parameters
User-defined parameters
Return Value
Any value, depending on the function's purpose
Example
calculate_price:
type: generic
parameters:
- name: base_price
type: double
- name: discount_rate
type: double
- name: tax_rate
type: double
code: |
# Calculate discounted price
discount_amount = base_price * (discount_rate / 100)
discounted_price = base_price - discount_amount
# Calculate tax on discounted price
tax_amount = discounted_price * (tax_rate / 100)
final_price = discounted_price + tax_amount
return {
"original_price": base_price,
"discount_amount": discount_amount,
"discounted_price": discounted_price,
"tax_amount": tax_amount,
"final_price": final_price,
"total_savings": discount_amount
}
resultType: json
Producer - generic
example (click to expand)
functions:
generate_product_data:
type: generator
globalCode: |
import random
counter = 0
products = ["laptop", "phone", "tablet", "headphones"]
categories = ["electronics", "accessories", "computing"]
code: |
global counter, products, categories
product_id = f"prod_{counter:03d}"
counter += 1
product_data = {
"product_id": product_id,
"name": random.choice(products),
"category": random.choice(categories),
"base_price": round(random.uniform(50.0, 1000.0), 2),
"discount_rate": random.choice([0, 5, 10, 15, 20]),
"quantity": random.randint(1, 100)
}
expression: (product_id, product_data)
resultType: (string, json)
producers:
product_producer:
generator: generate_product_data
interval: 2s
to:
topic: raw_products
keyType: string
valueType: json
Processor - generic
example (click to expand)
streams:
raw_products:
topic: raw_products
keyType: string
valueType: json
functions:
calculate_price:
type: generic
parameters:
- name: base_price
type: double
- name: discount_rate
type: double
- name: tax_rate
type: double
code: |
# Calculate discounted price
discount_amount = base_price * (discount_rate / 100)
discounted_price = base_price - discount_amount
# Calculate tax on discounted price
tax_amount = discounted_price * (tax_rate / 100)
final_price = discounted_price + tax_amount
return {
"original_price": base_price,
"discount_amount": discount_amount,
"discounted_price": discounted_price,
"tax_amount": tax_amount,
"final_price": final_price,
"total_savings": discount_amount
}
resultType: json
enrich_product:
type: valueTransformer
code: |
if value and "base_price" in value and "discount_rate" in value:
# Call our generic function with custom tax rate
price_info = calculate_price(
base_price=value["base_price"],
discount_rate=value["discount_rate"],
tax_rate=8.5
)
# Add calculated pricing to the product data
value["pricing"] = price_info
value["processed_at"] = "2024-01-01T00:00:00Z"
return value
resultType: json
pipelines:
process_products:
from: raw_products
via:
- type: transformValue
mapper: enrich_product
- type: peek
forEach:
code: |
original = value["pricing"]["original_price"]
final = value["pricing"]["final_price"]
savings = value["pricing"]["total_savings"]
log.info("Processed product: {} - Original: ${:.2f}, Final: ${:.2f}, Saved: ${:.2f}".format(
key, original, final, savings))
to:
topic: enriched_products
keyType: string
valueType: json
What the example does:
Demonstrates how to create reusable business logic with generic functions:
- Creates sample product data with base prices and discount rates
calculate_price
accepts parameters to compute final pricing with tax- The generic function is called from within a valueTransformer
- Adds calculated pricing information to product records
Key Technical Features:
type: generic
for reusable custom functions- Custom parameter definitions with types (double, string, etc.)
- Return any data structure (JSON objects, arrays, primitives)
- Call generic functions from other functions using standard Python syntax
- Mix generic functions with standard KSML function types
Expected Results:
When running these examples, you will see:
- Product data being generated with random base prices and discount rates
- Log messages showing: "Processed product: prod_001 - Original: $856.58, Final: $922.50, Saved: $128.49"
- Each product enriched with detailed pricing calculations including tax
- Generic function providing consistent pricing logic across all products
How KSML Functions Relate to Kafka Streams
KSML functions are Python implementations that map directly to Kafka Streams Java interfaces. Understanding this relationship helps you leverage Kafka Streams documentation and concepts:
Direct Mappings
Functions for stateless operations
KSML Function Type | Kafka Streams Interface | Purpose |
---|---|---|
forEach | ForeachAction<K,V> |
Process records for side effects |
keyTransformer | KeyValueMapper<K,V,KR> |
Transform keys |
keyValueTransformer | KeyValueMapper<K,V,KeyValue<KR,VR>> |
Transform both key and value |
predicate | Predicate<K,V> |
Filter records based on conditions |
valueTransformer | ValueTransformer<V,VR> / ValueMapper<V,VR> |
Transform values |
Functions for stateful operations
KSML Function Type | Kafka Streams Interface | Purpose |
---|---|---|
aggregator | Aggregator<K,V,VA> |
Aggregate records incrementally |
initializer | Initializer<VA> |
Provide initial aggregation values |
merger | Merger<K,V> |
Merge aggregation results |
reducer | Reducer<V> |
Combine values of same type |
Special Purpose Functions
KSML Function Type | Kafka Streams Interface | Purpose |
---|---|---|
foreignKeyExtractor | Function<V,FK> |
Extract foreign key for joins |
keyValueMapper | KeyValueMapper<K,V,VR> |
Convert key-value to single output |
valueJoiner | ValueJoiner<V1,V2,VR> |
Join values from two streams |
Stream Related Functions
KSML Function Type | Kafka Streams Interface | Purpose |
---|---|---|
streamPartitioner | StreamPartitioner<K,V> |
Custom partition selection |
timestampExtractor | TimestampExtractor |
Extract event time from records |
topicNameExtractor | TopicNameExtractor<K,V> |
Dynamic topic routing |
Function Execution Context
When your Python functions execute, they have access to:
-
Logger: For outputting information to application logs
log.<log-level>("Debug message")
-can be debug, info, warn, error, trace
-
Metrics: For monitoring function performance and behavior
metrics.counter("name").increment()
- Count occurrencesmetrics.gauge("name").record(value)
- Record valueswith metrics.timer("name"):
- Measure execution time
-
State Stores: For maintaining state between function invocations (when configured)
store.get(key)
- Retrieve value from storestore.put(key, value)
- Store a valuestore.delete(key)
- Remove a value- Must be declared in the function's
stores
parameter
This execution context provides the tools needed for debugging, monitoring, and implementing stateful processing.