Data Types and Notations Reference
KSML supports a wide range of data types and formats for both keys and values in your streams. This comprehensive reference covers all data types, notation formats, and type conversion capabilities available in KSML.
Data Types in KSML
Primitive Types
KSML supports the following primitive types:
Type | Description | Example | Java Equivalent |
---|---|---|---|
boolean |
True or false values | true , false |
Boolean |
byte |
8-bit integer | 42 |
Byte |
short |
16-bit integer | 1000 |
Short |
int |
32-bit integer | 1000000 |
Integer |
long |
64-bit integer | 9223372036854775807 |
Long |
float |
Single-precision floating point | 3.14 |
Float |
double |
Double-precision floating point | 3.141592653589793 |
Double |
string |
Text string | "Hello, World!" |
String |
bytes |
Array of bytes | Binary data | byte[] |
null |
Null value | null |
null |
Complex Types
KSML also supports several complex types that can contain multiple values:
Enum
An enumeration defines a set of allowed values.
Syntax:
Example:
streams:
order_status_stream:
topic: order-statuses
keyType: string
valueType: enum(PENDING, PROCESSING, SHIPPED, DELIVERED, CANCELLED) # Works without quotes
# valueType: "enum(PENDING, PROCESSING, SHIPPED, DELIVERED, CANCELLED)" # Also works with quotes
In Python code, an enum value is always represented as a string:
functions:
update_status:
type: valueTransformer
code: |
if value.get("shipped"):
return "SHIPPED"
elif value.get("processing"):
return "PROCESSING"
expression: "PENDING"
resultType: string
Producer - Enum example (click to expand)
# Demonstrates enum data type usage in KSML
streams:
order_events:
topic: order_events
keyType: string
valueType: json
functions:
generate_order_event:
type: generator
resultType: (string, json)
code: |
order = nextOrder()
return (order["order_id"], order)
globalCode: |
# Define orders with enum status values
orders = [
{"order_id": "ORD001", "customer": "Alice", "amount": 150.00, "status": "PENDING"},
{"order_id": "ORD002", "customer": "Bob", "amount": 75.50, "status": "PROCESSING"},
{"order_id": "ORD003", "customer": "Charlie", "amount": 200.00, "status": "SHIPPED"},
{"order_id": "ORD004", "customer": "Diana", "amount": 50.00, "status": "DELIVERED"},
{"order_id": "ORD005", "customer": "Eve", "amount": 125.00, "status": "CANCELLED"},
]
index = 0
done = False
def nextOrder():
global index, done
if index >= len(orders):
done = True
index = 0
order = orders[index]
index += 1
return order
producers:
order_producer:
to: order_events
interval: 1000
until:
expression: done
generator: generate_order_event
Processor - Enum example (click to expand)
# Demonstrates enum data type usage and validation in KSML
streams:
order_events:
topic: order_events
keyType: string
valueType: json
# Stream with enum type for order status
# Enum values are represented as strings with validation
order_status_stream:
topic: order_statuses
keyType: string
valueType: enum(PENDING, PROCESSING, SHIPPED, DELIVERED, CANCELLED)
functions:
extract_and_validate_status:
type: valueTransformer
resultType: string
code: |
# Extract status from order event
# In Python, enum values are always represented as strings
status = value.get("status", "PENDING")
# Validate against allowed enum values
valid_statuses = ["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED"]
if status in valid_statuses:
return status
else:
# Return default if invalid
return "PENDING"
pipelines:
# Process orders and extract enum status
process_order_status:
from: order_events
via:
# Transform JSON value to validated enum string
- type: transformValue
mapper: extract_and_validate_status
- type: peek
forEach:
code: |
log.info("Order {} has status: {}", key, value)
to: order_status_stream
List
A list contains multiple elements of the same type.
Syntax:
# Function notation (recommended - avoids YAML validation warnings)
valueType: list(<element_type>) # valueType: [<element_type>] also works
Example:
streams:
tags_stream:
topic: tags
keyType: string
valueType: list(string) # Function notation, valueType: [string] is also valid
categories_stream:
topic: categories
keyType: string
valueType: list(string) # Alternative notation (no quotes needed)
In Python code, a list is represented as a Python list:
functions:
extract_tags:
type: keyValueToValueListTransformer
expression: value.get("tags", [])
resultType: list(string) # Function notation, resultType: [string] is also valid
extract_categories:
type: keyValueToValueListTransformer
expression: value.get("categories", [])
resultType: list(string) # Alternative notation (no quotes needed)
See it in action:
- List example for predicate functions for data filtering
Example
functions:
enhance_grades:
type: valueTransformer
resultType: list(int) # Alternative to "[int]" - demonstrates list() syntax without quotes
code: |
grades = value.get("grades", [])
# Add a bonus point to each grade
enhanced_grades = [grade + 5 for grade in grades if grade < 95]
expression: enhanced_grades
This example demonstrates using list(int)
syntax in function result types to avoid YAML validation warnings:
Producer - list()
syntax example (click to expand)
# Simple producer demonstrating list() and tuple() syntax alternatives
functions:
generate_grades:
type: generator
resultType: tuple(string, json) # Alternative to "(string, json)" - no quotes needed
globalCode: |
import random
code: |
student_id = f"student_{random.randint(1000, 9999)}"
# Generate simple student grades data
student_data = {
"grades": [random.randint(70, 100) for _ in range(3)]
}
expression: (student_id, student_data)
streams:
student_grades:
topic: student_grades
keyType: string
valueType: json
producers:
grades_producer:
to: student_grades
interval: 3000
generator: generate_grades
Processor - list()
syntax example (click to expand)
# Simple processor demonstrating list() syntax alternative
streams:
student_grades:
topic: student_grades
keyType: string
valueType: json
enhanced_grades:
topic: enhanced_grades
keyType: string
valueType: json
functions:
enhance_grades:
type: valueTransformer
resultType: list(int) # Alternative to "[int]" - demonstrates list() syntax without quotes
code: |
grades = value.get("grades", [])
# Add a bonus point to each grade
enhanced_grades = [grade + 5 for grade in grades if grade < 95]
expression: enhanced_grades
pipelines:
process_grades:
from: student_grades
via:
- type: transformValue
mapper: enhance_grades
- type: convertValue
into: json
- type: peek
forEach:
code: |
log.info("ENHANCED GRADES - Student: {}, enhanced_grades: {}", key, value)
to: enhanced_grades
What this example does:
- Producer uses
resultType: tuple(string, json)
instead of"(string, json)"
to avoid quotes - Processor uses
resultType: list(int)
instead of"[int]"
to avoid YAML validation warnings - Functionality remains identical - the new syntax is purely for YAML compatibility
Map
A map contains key-value pairs where keys are always strings and values are of a specified type.
Syntax:
Example:
streams:
user_preferences:
topic: user-preferences
keyType: string
valueType: map(string) # Map with string keys and string values (quotes optional)
scores:
topic: scores
keyType: string
valueType: map(int) # Map with string keys and integer values, "map(int)" also valid
In Python code, a map is represented as a Python dictionary:
functions:
create_preferences:
type: valueTransformer
code: |
return {
"theme": value.get("selected_theme", "default"),
"language": value.get("user_language", "en"),
"notifications": value.get("notify_enabled", "true")
}
expression: result
resultType: map(string) # "map(string)" also valid
calculate_scores:
type: valueTransformer
code: |
return {
"math": 85,
"science": 92,
"english": 78
}
expression: result
resultType: map(int) # "map(int)" also valid
Key characteristics:
- Keys are always strings (this is enforced by the type system)
- All values must be of the same type as specified in
map(<value_type>)
- Useful for representing configuration objects, dictionaries, and key-value stores
Example
streams:
user_preferences:
topic: user_preferences
keyType: string
valueType: map(string) # Map with string keys and string values
user_scores:
topic: user_scores
keyType: string
valueType: map(int) # Map with string keys and integer values
This simple example demonstrates using map(string)
and map(int)
types in stream definitions and function result types:
Producer - map
example (click to expand)
# Simple producer demonstrating map(string) and map(int) data types
functions:
generate_preferences:
type: generator
resultType: (string, map(string)) # Returns a map with string values
globalCode: |
import random
import time
code: |
user_id = f"user_{random.randint(1000, 9999)}"
# Generate preferences map with string values
preferences = {
"theme": random.choice(["dark", "light"]),
"language": random.choice(["en", "es", "fr"]),
"layout": random.choice(["grid", "list"])
}
expression: (user_id, preferences)
generate_scores:
type: generator
resultType: (string, map(int)) # Returns a map with integer values
globalCode: |
import random
code: |
user_id = f"user_{random.randint(1000, 9999)}"
# Generate scores map with integer values
scores = {
"math": random.randint(60, 100),
"science": random.randint(60, 100),
"english": random.randint(60, 100)
}
expression: (user_id, scores)
streams:
user_preferences:
topic: user_preferences
keyType: string
valueType: map(string) # Map with string keys and string values
user_scores:
topic: user_scores
keyType: string
valueType: map(int) # Map with string keys and integer values
producers:
preferences_producer:
to: user_preferences
interval: 3000
generator: generate_preferences
scores_producer:
to: user_scores
interval: 4000
generator: generate_scores
Processor - map
example (click to expand)
# Simple processor demonstrating map(string) and map(int) usage
streams:
user_preferences:
topic: user_preferences
keyType: string
valueType: map(string) # Map with string keys and string values
user_scores:
topic: user_scores
keyType: string
valueType: map(int) # Map with string keys and integer values
processed_preferences:
topic: processed_preferences
keyType: string
valueType: map(string) # Output also as map(string)
processed_scores:
topic: processed_scores
keyType: string
valueType: map(int) # Output also as map(int)
functions:
enhance_preferences:
type: valueTransformer
resultType: map(string) # Function returns map(string)
code: |
# Add a status field to the preferences map
enhanced = dict(value) # Copy input map
enhanced["status"] = "active" # Add string value
expression: enhanced
calculate_stats:
type: valueTransformer
resultType: map(int) # Function returns map(int)
code: |
# Calculate some statistics from scores map
scores = dict(value) # Copy input map
total = sum(scores.values())
average = total // len(scores) # Integer division
stats = dict(scores) # Start with original scores
stats["total"] = total # Add integer values
stats["average"] = average
expression: stats
pipelines:
process_preferences:
from: user_preferences
via:
- type: transformValue
mapper: enhance_preferences
- type: peek
forEach:
code: |
log.info("PREFERENCES MAP - User: {}, prefs: {}", key, value)
to: processed_preferences
process_scores:
from: user_scores
via:
- type: transformValue
mapper: calculate_stats
- type: peek
forEach:
code: |
log.info("SCORES MAP - User: {}, total: {}, average: {}",
key, value.get("total"), value.get("average"))
to: processed_scores
What this example does:
- Stream definitions use
valueType: map(string)
andvalueType: map(int)
to define strongly-typed maps - Function result types use
resultType: (string, map(string))
to return maps with type safety - Processing functions use
resultType: map(string)
andresultType: map(int)
to transform and validate map contents - Demonstrates how the
map(valuetype)
syntax ensures all values in a map conform to the specified type
Struct
A struct is a key-value map where all keys are strings. This is the most common complex type and is used for JSON objects, Avro records, etc.
Syntax:
Example:
In Python code, a struct is represented as a dictionary:
functions:
create_user:
type: valueTransformer
expression: |
return {
"id": value.get("user_id"),
"name": value.get("first_name") + " " + value.get("last_name"),
"email": value.get("email"),
"age": value.get("age")
}
Producer - Struct example (click to expand)
# Demonstrates struct data type usage in KSML
streams:
user_profiles:
topic: user_profiles
keyType: string
valueType: struct # Using struct value type
functions:
generate_user_profile:
type: generator
globalCode: |
import time
user_id = 1
def get_user_profile():
global user_id
# Create a struct (dictionary) for user profile
profile = {
"user_id": f"USER_{user_id:03d}",
"name": f"User Name {user_id}",
"age": 20 + (user_id % 50),
"email": f"user{user_id}@example.com",
"preferences": {
"newsletter": user_id % 2 == 0,
"notifications": user_id % 3 != 0
},
"created_at": int(time.time() * 1000)
}
user_id = (user_id % 5) + 1 # Cycle through 5 users
return profile
code: |
profile = get_user_profile()
return (profile["user_id"], profile)
resultType: (string, struct) # Returning struct type
producers:
profile_producer:
to: user_profiles
interval: 3000 # Generate profile every 3 seconds
generator: generate_user_profile
Processor - Struct example (click to expand)
# Demonstrates struct data type manipulation in KSML
streams:
user_profiles:
topic: user_profiles
keyType: string
valueType: struct # Input as struct type
enriched_profiles:
topic: enriched_profiles
keyType: string
valueType: struct # Output as struct type
functions:
enrich_user_profile:
type: valueTransformer
code: |
# Working with struct data (dictionary)
# Access nested struct fields
preferences = value.get("preferences", {})
# Create enriched struct with additional fields
enriched = {
"user_id": value.get("user_id"),
"name": value.get("name"),
"age": value.get("age"),
"email": value.get("email"),
"age_group": "young" if value.get("age", 0) < 30 else "adult",
"subscription_status": "active" if preferences.get("newsletter", False) else "inactive",
"original_preferences": preferences,
"enriched_at": int(time.time() * 1000)
}
return enriched
resultType: struct # Function returns struct type
globalCode: |
import time
pipelines:
enrich_profiles:
from: user_profiles
via:
# Transform struct to enriched struct
- type: transformValue
mapper: enrich_user_profile
# Log the enriched struct
- type: peek
forEach:
code: |
log.info("Enriched profile: {} - {} ({} years, {})",
key,
value.get("name"),
value.get("age"),
value.get("subscription_status"))
to: enriched_profiles
Tuple
A tuple combines multiple elements of different types into a single value.
Syntax:
# Standard bracket notation
valueType: (<type1>, <type2>, ...)
# Alternative function notation (avoids YAML validation warnings)
valueType: tuple(<type1>, <type2>, ...)
Example:
streams:
sensor_stream:
topic: sensor-data
keyType: string
valueType: (string, avro:SensorData) # Standard notation
coordinate_stream:
topic: coordinates
keyType: string
valueType: tuple(double, double) # Alternative notation (no quotes needed)
In Python code, a tuple is represented as a Python tuple:
functions:
create_user_age_pair:
type: keyValueTransformer
expression: (value.get("name"), value.get("age"))
resultType: (string, int) # Standard notation
create_coordinate_pair:
type: keyValueTransformer
expression: (value.get("lat"), value.get("lng"))
resultType: tuple(double, double) # Alternative notation (no quotes needed)
See it in action:
Example
functions:
generate_grades:
type: generator
resultType: tuple(string, json) # Alternative to "(string, json)" - no quotes needed
globalCode: |
import random
code: |
student_id = f"student_{random.randint(1000, 9999)}"
# Generate simple student grades data
student_data = {
"grades": [random.randint(70, 100) for _ in range(3)]
}
This example demonstrates using tuple(string, json)
syntax in function result types to avoid YAML validation warnings:
Producer - tuple()
syntax example (click to expand)
# Simple producer demonstrating list() and tuple() syntax alternatives
functions:
generate_grades:
type: generator
resultType: tuple(string, json) # Alternative to "(string, json)" - no quotes needed
globalCode: |
import random
code: |
student_id = f"student_{random.randint(1000, 9999)}"
# Generate simple student grades data
student_data = {
"grades": [random.randint(70, 100) for _ in range(3)]
}
expression: (student_id, student_data)
streams:
student_grades:
topic: student_grades
keyType: string
valueType: json
producers:
grades_producer:
to: student_grades
interval: 3000
generator: generate_grades
Processor - tuple()
syntax example (click to expand)
# Simple processor demonstrating list() syntax alternative
streams:
student_grades:
topic: student_grades
keyType: string
valueType: json
enhanced_grades:
topic: enhanced_grades
keyType: string
valueType: json
functions:
enhance_grades:
type: valueTransformer
resultType: list(int) # Alternative to "[int]" - demonstrates list() syntax without quotes
code: |
grades = value.get("grades", [])
# Add a bonus point to each grade
enhanced_grades = [grade + 5 for grade in grades if grade < 95]
expression: enhanced_grades
pipelines:
process_grades:
from: student_grades
via:
- type: transformValue
mapper: enhance_grades
- type: convertValue
into: json
- type: peek
forEach:
code: |
log.info("ENHANCED GRADES - Student: {}, enhanced_grades: {}", key, value)
to: enhanced_grades
What this example does:
- Producer function uses
resultType: tuple(string, json)
instead of"(string, json)"
to avoid quotes - Processor function uses
resultType: list(int)
to demonstrate both new syntaxes working together - No functional difference - the new syntax provides YAML-friendly alternatives
Union
A union type can be one of several possible types.
Syntax:
Example:
Union types are used in two main places in KSML:
1. In stream definitions - to specify that a stream can contain multiple types:
streams:
optional_messages:
topic: optional-messages
keyType: string
valueType: union(null, json) # This stream accepts either null OR a JSON object
2. In function return types - to specify that a function can return multiple types:
functions:
generate_optional:
type: generator
code: |
# Can return either null or a message
if random.random() > 0.5:
return ("key1", {"data": "value"})
else:
return ("key1", None)
resultType: (string, union(null, json)) # Returns a tuple with union type
What union types mean:
union(null, json)
means the value can be eithernull
OR a JSON object- When processing union types, your code must check which type was received and handle each case
Complete example showing both usages:
Producer - Union example (click to expand)
# Demonstrates union data type usage in KSML
streams:
optional_messages:
topic: optional_messages
keyType: string
valueType: json # Using JSON for Kafka serialization
functions:
generate_optional_message:
type: generator
globalCode: |
import time
import random
message_id = 1
def get_optional_message():
global message_id
# Generate union type: either a message or null
if random.random() > 0.3: # 70% chance of message
message = {
"id": message_id,
"content": f"Message {message_id}",
"timestamp": int(time.time() * 1000)
}
else: # 30% chance of null
message = None
key = f"MSG_{message_id:04d}"
message_id += 1
return (key, message)
code: |
return get_optional_message()
resultType: (string, union(null, json)) # Function returns union type
producers:
optional_producer:
to: optional_messages
interval: 2000 # Generate message every 2 seconds
generator: generate_optional_message
Processor - Union example (click to expand)
# Demonstrates processing of union types in KSML
streams:
optional_messages:
topic: optional_messages
keyType: string
valueType: union(null, json) # Input as union type
processed_messages:
topic: processed_messages
keyType: string
valueType: json # Output as JSON
functions:
# This function accepts a union type
process_optional:
type: valueTransformer
code: |
# Handle union type (null or JSON message)
if value is None:
# Handle null case
return {
"status": "empty",
"message": "No content received",
"processed_at": int(time.time() * 1000)
}
else:
# Handle JSON message case
return {
"status": "processed",
"original_id": value.get("id"),
"content_length": len(value.get("content", "")),
"processed_at": int(time.time() * 1000)
}
# Function signature shows it processes union type
resultType: json
globalCode: |
import time
pipelines:
process_optional_messages:
from: optional_messages
via:
# Transform the union type value
- type: transformValue
mapper: process_optional
# Log the processing result
- type: peek
forEach:
code: |
status = value.get("status")
if status == "empty":
log.info("Received null value for key: {}", key)
else:
log.info("Processed message {} for key: {}",
value.get("original_id"), key)
to: processed_messages
Windowed
Windowing operations in Kafka Streams group messages together in time-based windows. KSML provides the windowed(<base_type>)
syntax to work with these windowed keys.
Syntax:
# Without notation - requires manual transformation for Kafka output
keyType: windowed(<base_type>)
# With notation - automatically serializes to the specified format
keyType: <notation>:windowed(<base_type>) # e.g., json:windowed(string), avro:windowed(string)
Understanding Windowed Keys:
After windowing operations (like windowByTime
), Kafka Streams internally creates windowed keys that contain:
- The original key value
- Window start timestamp (milliseconds)
- Window end timestamp (milliseconds)
- Human-readable start/end times
Two Approaches for Handling Windowed Keys:
1. Without Notation (Manual Transformation Required):
When using plain windowed(string)
, the windowed keys cannot be directly serialized to Kafka topics. You must manually transform them to a regular type:
2. With Notation Prefix (Automatic Serialization):
Using a notation prefix like json:windowed(string)
or avro:windowed(string)
enables automatic serialization of the windowed key structure:
- type: convertKey
into: json:windowed(string) # Recommended: use notation prefix for automatic serialization
The notation automatically serializes the windowed key as a structured object with fields: start
, end
, startTime
, endTime
, and key
.
Complete Examples:
Producer - Generates events for windowing (click to expand)
# Demonstrates generating events for windowed processing
streams:
user_events:
topic: user_events
keyType: string
valueType: json # Using JSON for readability in Kowl UI
functions:
generate_user_event:
type: generator
globalCode: |
import time
import random
# Simulate events from 5 different users
users = ["user1", "user2", "user3", "user4", "user5"]
event_types = ["click", "view", "purchase"]
def get_user_event():
user = random.choice(users)
event_type = random.choice(event_types)
event = {
"user": user,
"type": event_type,
"timestamp": int(time.time() * 1000)
}
return (user, event) # Key by user for windowing
code: |
return get_user_event()
resultType: (string, json)
producers:
event_producer:
to: user_events
interval: 1000 # Generate event every second
generator: generate_user_event
Processor - Manual transformation approach (click to expand)
This example shows how to manually transform windowed keys to regular strings when not using notation:
# Demonstrates windowed type usage in KSML
#
# This example shows how to use the windowed(<base_type>) syntax with convertKey operation.
# After windowing operations, KSML internally uses windowed keys. The convertKey operation
# with 'into: windowed(string)' explicitly converts to this type for internal processing.
#
# IMPORTANT: While windowed types can be used internally and in stream definitions,
# they cannot be serialized to Kafka topics. The final transformation to regular strings
# is necessary for writing to Kafka.
streams:
user_events:
topic: user_events
keyType: string
valueType: json # Input events
# Output stream with regular string keys (transformed from windowed keys)
windowed_counts:
topic: windowed_counts
keyType: string # Regular string key after transformation
valueType: json # JSON value containing count and window info
stores:
event_counts_store:
type: window
retention: 1m # Keep window data for 1 minute
keyType: string
valueType: long
pipelines:
count_events_by_window:
from: user_events
via:
# Group by key (user) for counting
- type: groupByKey
# Apply a 10-second tumbling window
- type: windowByTime
windowType: tumbling
duration: 10s
# Count events in each window
- type: count
store: event_counts_store
# Convert to stream for processing
- type: toStream
# DEMONSTRATES windowed(<base_type>) USAGE:
# Explicitly convert the key to windowed(string) type
# This shows how KSML handles windowed keys internally
- type: convertKey
into: windowed(string)
# Log the windowed counts with the windowed key type
- type: peek
forEach:
code: |
log.info("Windowed key type - User {} had {} events in window [{} - {}]",
key['key'], value, key['startTime'], key['endTime'])
# Transform windowed key to string for Kafka output
# This is necessary because Kafka topics cannot serialize windowed keys
- type: transformKeyValue
mapper:
resultType: (string, json)
code: |
# Extract window information from the windowed key
# Convert to a string key format: "user_startTime_endTime"
new_key = f"{key['key']}_{key['start']}_{key['end']}"
# Create a JSON value with all the information
new_value = {
"user": key['key'],
"count": value,
"window_start": key['start'],
"window_end": key['end'],
"window_start_time": key['startTime'],
"window_end_time": key['endTime']
}
log.info("Transformed to string key - User {} had {} events in window [{} - {}]",
key['key'], value, key['startTime'], key['endTime'])
return (new_key, new_value)
# Now we can write to the topic with regular string keys
to: windowed_counts
Processor - Automatic serialization with notation (click to expand)
This example shows the simpler approach using notation for automatic serialization:
# Demonstrates windowed type usage in KSML
#
# This example shows how to use the windowed(<base_type>) syntax with convertKey operation.
# After windowing operations, KSML internally uses windowed keys.
#
# RECOMMENDED APPROACH:
# Use a notation prefix like 'json:windowed(string)' or 'avro:windowed(string)' instead of
# plain 'windowed(string)'. The notation automatically handles serialization of the windowed
# key structure (start, end, startTime, endTime, key) to the specified format, allowing you
# to write directly to Kafka topics without manual transformation.
#
# Example:
# - type: convertKey
# into: json:windowed(string) # Automatically serializes as JSON structure
#
# Without notation, windowed keys cannot be serialized to Kafka topics and require manual
# transformation to regular types.
streams:
user_events:
topic: user_events
keyType: string
valueType: json # Input events
# Output stream with JSON-serialized windowed keys
windowed_counts:
topic: windowed_counts
keyType: json # JSON notation automatically serializes the windowed key structure
valueType: long # Simple count value
stores:
event_counts_store:
type: window
retention: 1m # Keep window data for 1 minute
keyType: string
valueType: long
pipelines:
count_events_by_window:
from: user_events
via:
# Group by key (user) for counting
- type: groupByKey
# Apply a 10-second tumbling window
- type: windowByTime
windowType: tumbling
duration: 10s
# Count events in each window
- type: count
store: event_counts_store
# Convert to stream for processing
- type: toStream
# DEMONSTRATES windowed(<base_type>) USAGE WITH NOTATION:
# Using json:windowed(string) automatically serializes windowed keys as JSON structures
# containing: start, end, startTime, endTime, and key fields
- type: convertKey
into: json:windowed(string) # Recommended: use notation prefix for automatic serialization
# Log the windowed counts
- type: peek
forEach:
code: |
log.info("JSON Windowed key - User {} had {} events in window [{} - {}]",
key['key'], value, key['startTime'], key['endTime'])
# Now we can write to the topic with regular string keys
to: windowed_counts
When to Use Each Approach:
-
Use notation prefix (
json:windowed(string)
) when you want to:- Write windowed keys directly to Kafka topics
- Preserve the complete window structure in a standard format
- Avoid manual transformation code
-
Use plain windowed type (
windowed(string)
) when you:- Only need windowed keys for internal processing
- Want custom key formatting for output
- Need to extract specific window information
Key Takeaway:
Windowed types enable time-based analytics like counting events per time window, calculating moving averages, or detecting patterns over time intervals. The notation prefix approach simplifies working with windowed data by handling serialization automatically.
The Any and "?" Types
KSML supports wildcard types any
and ?
(which are equivalent) that represent unknown or variable data types. These map internally to DataType.UNKNOWN
and can only be used for function parameters when the exact type is not known at definition time. They cannot be used for stream types or function result types due to serialization and type system requirements.
Syntax:
The any
and ?
types can be used in:
- Function parameters only (
type: any
ortype: "?"
)
# Function parameters (SUPPORTED)
functions:
my_function:
type: generic
parameters:
- name: input
type: any # Accepts any type
- name: other
type: "?" # Alternative syntax (quote to avoid YAML issues)
code: |
# Process the input parameter of unknown type
return "processed"
resultType: string # Must be a concrete type
# Stream types (NOT SUPPORTED)
# valueType: any # ❌ This will fail with "JSON serde not found"
# keyType: "?" # ❌ This will fail with serialization error
# Function result types (NOT SUPPORTED)
# resultType: any # ❌ This will fail with topology type checking error
Why the limitations exist:
- Stream types: Kafka requires concrete serialization formats. The
any
type cannot be serialized to Kafka topics because there's no serde for unknown data types. - Result types: The topology type system requires concrete types for type checking and ensuring data flows correctly between operations.
Key Use Cases:
- Generic utility functions that accept multiple data types as parameters
- Helper functions that need to handle variable input types
- Functions that process data generically before converting to concrete output types
Producer - Any type demonstration (click to expand)
This example demonstrates using the ?
type for function parameters, showing how to create generic utility functions.
# Simple demonstration of 'any' and '?' types in KSML
streams:
any_data:
topic: any_data
keyType: string
valueType: json
functions:
generate_data:
type: generator
code: |
import random
# Generate different JSON-compatible data structures
if random.choice([True, False]):
data = {"type": "text", "value": "hello world"}
else:
data = {"type": "number", "value": 42}
return ("key1", data)
resultType: (string, json)
# Function with '?' parameter type
describe_data:
type: generic
parameters:
- name: value
type: "?" # Accepts any type
code: |
return "processed"
resultType: string
producers:
test_producer:
generator: generate_data
interval: 2s
to: any_data
Processor - Any type processing (click to expand)
This example shows how to process data using the any
type for function parameters, demonstrating type-agnostic helper functions.
# Simple demonstration of 'any' type processing
streams:
any_input:
topic: any_data
keyType: string
valueType: json
any_output:
topic: processed_any_data
keyType: string
valueType: json
functions:
# Helper function with 'any' parameter type
process_data:
type: generic
parameters:
- name: data
type: any # Accepts any type
code: |
return {
"processed_data": data,
"status": "processed"
}
resultType: json
# Function that uses the helper
process_any:
type: valueTransformer
code: |
# Use helper function that accepts 'any' type
return process_data(value)
resultType: json
pipelines:
process_data:
from: any_input
via:
- type: transformValue
mapper: process_any
- type: peek
forEach:
code: |
log.info("Processed any type: {} = {}",
value.get("input_type"), value.get("input_value"))
to: any_output
Notation Formats
KSML uses notations to allow reading/writing different message formats to Kafka topics. Notations are specified as a prefix to the schema name.
Examples
See a working example for every data format in this tutorial:
Format Selection Guide
The choice of notation depends on your specific requirements:
If you need... | Consider using... |
---|---|
Schema evolution and backward compatibility | Avro or Protobuf |
Human-readable data for debugging | JSON |
Integration with legacy systems | XML or SOAP |
Simple tabular data | CSV |
Compact binary format | Avro or Protobuf |
Raw binary data handling | Binary |
Avro
Avro is a binary format that supports schema evolution.
Syntax:
Example:
JSON
JSON is a text-based, human-readable format for data transfer.
Syntax:
Example:
streams:
user_profiles:
topic: user-profiles
keyType: string
valueType: json
orders:
topic: orders
keyType: string
valueType: json:Order
Python functions can return JSON by returning a dictionary:
functions:
merge_key_value_data:
type: valueTransformer
expression: { 'key': key, 'value': value }
resultType: json
JSON Schema
JSON Schema adds vendor-specific schema support to JSON serialization.
Syntax:
# For schema registry lookup:
valueType: jsonschema
# For JSON with a schema:
valueType: jsonschema:<schema_name>
Example:
CSV
CSV (Comma-Separated Values) is a simple tabular data format.
Syntax:
Example:
streams:
sales_data:
topic: sales-data
keyType: string
valueType: csv
inventory_data:
topic: inventory-data
keyType: string
valueType: csv:InventoryRecord
XML
XML (Extensible Markup Language) is used for complex hierarchical data.
Syntax:
Example:
Protobuf
Protobuf is a popular encoding format developed by Google.
Syntax:
# For schema registry lookup:
valueType: protobuf
# For Protobuf with a schema:
valueType: protobuf:<schema_name>
Example:
Binary
Binary data represents raw bytes for custom protocols.
Syntax:
Example:
SOAP
SOAP (Simple Object Access Protocol) is an XML-based messaging protocol.
Syntax:
Example:
Schema Management
When working with structured data, it's important to manage your schemas effectively.
Examples
See a working example for every type of schema in this tutorial:
Local Files vs. Schema Registry
Local Schema Files:
When a schema is specified, KSML loads the schema from a local file from the schemaDirectory
. The notation determines the filename extension:
- Avro schemas:
.avsc
extension - XML schemas:
.xsd
extension - CSV schemas:
.csv
extension - JSON schemas:
.json
extension
streams:
sensor_data:
topic: sensor-reading
keyType: string
valueType: avro:SensorReading # Looks for SensorReading.avsc
Schema Registry Lookup: When no schema is specified, KSML assumes the schema is loadable from Schema Registry:
streams:
sensor_data:
topic: sensor-reading
keyType: string
valueType: avro # Schema fetched from registry
Type Conversion
KSML handles type conversion differently depending on the context:
Context | Conversion Type | When to Use |
---|---|---|
Functions | Automatic | When resultType differs from returned value |
Streams | Explicit | When input/output stream formats differ |
Function Type Conversion (Automatic)
Functions automatically convert return values to match their declared resultType
when possible:
Successful Conversions:
- Any type → string: Always works via automatic
.toString()
conversion - String → numeric types (int, long, float, double): Works only if string contains a valid numeric value (e.g., "123" → int)
- Numeric conversions: Work between compatible numeric types (int ↔ long, float ↔ double)
- Complex types: Dict → JSON, lists/structs/tuples with matching schemas
Failed Conversions:
- Invalid string → numeric: Throws exception and stops processing (e.g., "not_a_number" → int fails)
- Incompatible complex types: Mismatched schemas or structures
Example:
functions:
string_to_int:
type: valueTransformer
code: |
result = "123" # Valid numeric string
expression: result
resultType: int # ← Succeeds: converts "123" → 123
invalid_conversion:
type: valueTransformer
code: |
result = "not_a_number" # Invalid numeric string
expression: result
resultType: int # ← Fails: throws conversion exception
Working example - Automatic type conversion in functions
Producer:
# Producer for automatic type conversion example
# Generates test data for demonstrating function result type conversion
streams:
sensor_data:
topic: sensor_data
keyType: string
valueType: json
functions:
generate_sensor_data:
type: generator
globalCode: |
import random
counter = 0
cities = ["Amsterdam", "Rotterdam", "Utrecht", "The Hague"]
code: |
global counter, cities
counter += 1
# Create sensor data
data = {
"sensor_id": f"sensor_{counter}",
"temperature": round(random.uniform(15.0, 30.0), 1),
"humidity": round(random.uniform(40.0, 80.0), 1),
"city": random.choice(cities),
"timestamp": counter * 1000
}
return (data["sensor_id"], data)
resultType: (string, json)
producers:
sensor_generator:
generator: generate_sensor_data
interval: 2s
to: sensor_data
Processor:
# Demonstrates automatic type conversion in KSML functions
#
# This example shows how KSML automatically converts function return values
# to match the declared resultType, without requiring explicit conversion code.
streams:
sensor_input:
topic: sensor_data
keyType: string
valueType: json
offsetResetPolicy: earliest
processed_output:
topic: processed_data
keyType: string
valueType: json
functions:
# Example 1: Returns dict, automatically converted to string
dict_to_string_auto:
type: valueTransformer
code: |
# Function returns a Python dictionary
result = {
"sensor": key,
"temp_fahrenheit": value.get("temperature", 0) * 1.8 + 32,
"humidity_percent": value.get("humidity", 0),
"location": value.get("city", "unknown").upper()
}
# Note: We return a dict, not a string
expression: result
resultType: string # ← Automatic conversion: dict → JSON string
# Example 2: String parsing with valid numeric conversion
valid_string_to_int:
type: valueTransformer
code: |
# Extract temperature and convert to string representation
temp_str = str(int(value.get("temperature", 20)))
result = temp_str # Return valid numeric string like "20"
expression: result
resultType: int # ← Automatic conversion: "20" → 20
# Example 3: Integer to string conversion
int_to_string:
type: valueTransformer
code: |
# At this point, value is an integer from previous conversion
# We'll just multiply it by 2 to show we're working with an int
result = value * 2 # Return an integer
expression: result
resultType: string # ← Automatic conversion: 40 → "40"
pipelines:
demonstrate_auto_conversion:
from: sensor_input
via:
# Test automatic dict to string conversion
- type: transformValue
mapper: dict_to_string_auto
- type: peek
forEach:
code: |
# Value is now a string (automatic conversion happened)
log.info("After auto dict→string: type={}, content={}",
type(value).__name__, value)
# Parse the JSON string back to dict for next operations
- type: transformValue
mapper:
type: valueTransformer
code: |
import json
result = json.loads(value) # Parse JSON string back to dict
expression: result
resultType: json
# Test valid string to int conversion
- type: transformValue
mapper: valid_string_to_int
- type: peek
forEach:
code: |
# Value is now an integer (automatic conversion happened)
log.info("After auto string→int: type={}, value={}",
type(value).__name__, value)
# Test int to string conversion
- type: transformValue
mapper: int_to_string
- type: peek
forEach:
code: |
# Value is now a string (automatic conversion happened)
log.info("After auto int→string: type={}, value={}",
type(value).__name__, value)
# Wrap in JSON for output
- type: transformValue
mapper:
type: valueTransformer
code: |
result = {
"sensor_id": key,
"converted_value": value,
"processed": True
}
expression: result
resultType: json
to: processed_output
Stream Format Conversion (Explicit)
Streams require explicit convertValue
operations when formats differ:
pipelines:
example_pipeline:
from: json_input # JSON format
via:
- type: convertValue
into: string # Must explicitly convert
to: string_output # String format
Without convertValue
, KSML will fail with a type mismatch error.
Working example - Explicit stream conversion
Producer:
# Producer for explicit format conversion example
# Generates JSON messages that will be converted to different formats
streams:
json_messages:
topic: json_messages
keyType: string
valueType: json
functions:
generate_json_data:
type: generator
globalCode: |
import random
import time
counter = 0
cities = ["Amsterdam", "Rotterdam", "Utrecht", "The Hague", "Eindhoven"]
types = ["temperature", "humidity", "pressure"]
code: |
global counter, cities, types
counter += 1
sensor_data = {
"sensor_id": f"sensor_{counter % 10}",
"city": random.choice(cities),
"type": random.choice(types),
"value": round(random.uniform(0, 100), 2),
"unit": "%" if types == "humidity" else "°C" if types == "temperature" else "hPa",
"timestamp": int(time.time())
}
return (sensor_data["sensor_id"], sensor_data)
resultType: (string, json)
producers:
json_generator:
generator: generate_json_data
interval: 2s
to: json_messages
Processor:
# Demonstrates explicit format conversion in KSML
#
# This example shows that KSML requires explicit conversion operations when
# outputting to a stream with a different format type. Without explicit conversion,
# KSML will fail with a type mismatch error at the pipeline sink.
streams:
json_input:
topic: json_messages
keyType: string
valueType: json
offsetResetPolicy: earliest
# Output stream with different format - requires explicit conversion
string_output:
topic: string_messages
keyType: string
valueType: string
pipelines:
explicit_conversion_pipeline:
from: json_input
via:
# Log the incoming JSON data
- type: peek
forEach:
code: |
log.info("JSON input - sensor: {}, city: {}, type: {}, value: {}{}",
value.get("sensor_id"), value.get("city"),
value.get("type"), value.get("value"), value.get("unit"))
# REQUIRED: Explicit conversion from JSON to string
# Without this, KSML will fail with:
# "Target topic valueType is expected to be type json, but found string"
- type: convertValue
into: string
# Log the converted string data
- type: peek
forEach:
code: |
log.info("String output - sensor: {}, data: {}", key, value[:100] if len(value) > 100 else value)
# Output to string format stream
to: string_output
Chaining Multiple Conversions
Chain convertValue
operations for complex transformations:
pipelines:
multi_conversion:
from: json_stream
via:
- type: convertValue
into: string # JSON → String
- type: convertValue
into: json # String → JSON
to: json_output
Working example - Chained conversions
Producer:
# Producer for multiple format conversions example
# Generates JSON data that will be converted through multiple formats
streams:
multi_format_data:
topic: multi_format_data
keyType: string
valueType: json
functions:
generate_data:
type: generator
globalCode: |
import random
import time
counter = 0
code: |
global counter
counter += 1
data = {
"id": counter,
"product": f"product_{counter}",
"price": round(random.uniform(10.0, 500.0), 2),
"quantity": random.randint(1, 10),
"available": counter % 2 == 0,
"timestamp": int(time.time())
}
return (f"item_{counter}", data)
resultType: (string, json)
producers:
data_generator:
generator: generate_data
interval: 2s
to: multi_format_data
Processor:
# Demonstrates chaining multiple format conversions in KSML
#
# This example shows how to chain multiple convertValue operations
# to transform data through several formats in a single pipeline.
streams:
json_input:
topic: multi_format_data
keyType: string
valueType: json
offsetResetPolicy: earliest
final_output:
topic: final_output
keyType: string
valueType: string
pipelines:
multi_format_conversion:
from: json_input
via:
# Starting with JSON format
- type: peek
forEach:
code: |
log.info("Step 1 - JSON input: id={}, product={}, price=${}",
value.get("id"), value.get("product"), value.get("price"))
# Convert JSON to String (serialization)
- type: convertValue
into: string
- type: peek
forEach:
code: |
log.info("Step 2 - Converted to String: {}", value[:80])
# Convert String back to JSON (parsing)
- type: convertValue
into: json
- type: peek
forEach:
code: |
log.info("Step 3 - Parsed back to JSON: available={}, quantity={}",
value.get("available"), value.get("quantity"))
# Transform the data while in JSON format
- type: transformValue
mapper:
type: valueTransformer
code: |
# Calculate total value and add metadata
result = {
"product_id": f"PRD-{value.get('id'):04d}",
"name": value.get("product", "").upper(),
"total_value": value.get("price", 0) * value.get("quantity", 0),
"in_stock": value.get("available", False),
"processed_at": int(time.time())
}
expression: result
resultType: json
globalCode: |
import time
- type: peek
forEach:
code: |
log.info("Step 4 - Transformed data: product_id={}, total_value=${}",
value.get("product_id"), value.get("total_value"))
# Final conversion to String for output
- type: convertValue
into: string
- type: peek
forEach:
code: |
log.info("Step 5 - Final String output: {}", value)
# Output as string format
to: final_output
Key Takeaway: Functions convert automatically, streams need explicit conversion.
Working with Multiple Formats in a Single Pipeline
Process different data formats within one KSML definition using separate pipelines.
This producer generates both JSON config data and Avro sensor data:
Producer definition (click to expand)
functions:
generate_device_config:
type: generator
globalCode: |
import time
import random
configCounter = 0
code: |
global configCounter
device_id = "sensor" + str(configCounter)
configCounter = (configCounter + 1) % 10
# Generate device configuration as JSON
config = {
"device_id": device_id,
"threshold": random.randrange(50, 90),
"alert_level": random.choice(["LOW", "MEDIUM", "HIGH"]),
"calibration_factor": round(random.uniform(0.8, 1.2), 2),
"last_maintenance": str(round(time.time() * 1000))
}
expression: (device_id, config)
resultType: (string, json)
generate_sensor_reading:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor" + str(sensorCounter)
sensorCounter = (sensorCounter + 1) % 10
# Generate sensor reading data that will be output as Avro
reading = {
"name": key,
"timestamp": str(round(time.time() * 1000)),
"type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
"unit": random.choice(["C", "F", "%", "Pa"]),
"value": str(random.randrange(0, 100)),
"color": random.choice(["black", "blue", "red", "yellow", "white"]),
"owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
"city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
}
expression: (key, reading)
resultType: (string, json) # Generate as JSON, output as Avro
producers:
# Produce JSON device configuration every 10 seconds
device_config_producer:
generator: generate_device_config
interval: 10s
to:
topic: device_config
keyType: string
valueType: json
# Produce Avro sensor readings every 3 seconds
sensor_reading_producer:
generator: generate_sensor_reading
interval: 3s
to:
topic: sensor_readings
keyType: string
valueType: avro:SensorData
This processor shows two pipelines handling different formats (Avro and JSON) and combining results:
Processor definition for working with multiple formats in a single pipeline (click to expand)
streams:
avro_sensor_stream:
topic: sensor_readings
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
json_config_stream:
topic: device_config
keyType: string
valueType: json
offsetResetPolicy: latest
combined_output:
topic: combined_sensor_data
keyType: string
valueType: json
pipelines:
# Pipeline 1: Process Avro data and convert to JSON
avro_processing:
from: avro_sensor_stream
via:
# Log Avro input
- type: peek
forEach:
code: |
log.info("Avro sensor: name={}, type={}, value={}{}",
value.get("name"), value.get("type"),
value.get("value"), value.get("unit"))
# Add a source field to identify the format
- type: transformValue
mapper:
type: valueTransformer
code: |
result = dict(value) if value else {}
result["source_format"] = "Avro"
expression: result
resultType: json
to: combined_output
# Pipeline 2: Process JSON config data
json_processing:
from: json_config_stream
via:
# Log JSON input
- type: peek
forEach:
code: |
log.info("JSON config: device={}, threshold={}, alert={}",
key, value.get("threshold"), value.get("alert_level"))
# Transform to sensor-like format with source field
- type: transformValue
mapper:
type: valueTransformer
code: |
result = {
"name": key,
"type": "CONFIG",
"threshold": value.get("threshold"),
"alert_level": value.get("alert_level"),
"source_format": "JSON"
}
expression: result
resultType: json
to: combined_output
Type Definition Quoting Rules
In KSML, quotes around type definitions are always optional. KSML can parse all type expressions correctly whether they have quotes or not. The choice to use quotes is purely a matter of style and preference.
All Type Expressions Work Without Quotes:
# Basic types
keyType: string
valueType: json
resultType: int
# Function-style types
valueType: enum(PENDING, PROCESSING, SHIPPED)
valueType: map(string)
keyType: windowed(string)
resultType: list(int)
resultType: tuple(string, json)
# Complex expressions
valueType: union(null, string)
resultType: list(tuple(string, json)) # [(string, json)] also valid
resultType: (string, json)
# Notation prefixes (with colons)
valueType: avro:SensorData
keyType: protobuf:UserProfile
# With quotes (also valid)
resultType: list(tuple(string, json)) # [(string, json)] also valid
valueType: enum(PENDING, SHIPPED)
YAML Syntax Highlighting Note
Some YAML syntax highlighters may incorrectly interpret bracket notation like [(string, json)]
, expecting proper array syntax.
For better highlighting, use quotes "[(string, json)]"
or the cleaner list(tuple(string, json))
syntax.
Summary:
All type expressions work without quotes in KSML. Use quotes only if you prefer them for style, but they are never functionally required. For bracket notation, consider using the list()
function syntax for cleaner, more readable code.