KSML Definition Reference
This reference guide covers the structure and organization of KSML definition files. A KSML definition is a YAML file that describes your complete stream processing application.
KSML File Structure
KSML supports two main patterns for applications. Choose the pattern that matches your use case:
Stream Processing Applications
Process data from input topics to output topics:
# Application metadata (optional)
name: "order-processor" # Optional
version: "1.0.0" # Optional
description: "Process orders" # Optional
# Data sources and sinks (optional - can be inlined)
streams: # KStream definitions (optional)
tables: # KTable definitions (optional)
globalTables: # GlobalKTable definitions (optional)
# State storage (optional - only if needed)
stores: # State store definitions (optional)
# Processing logic
functions: # Python function definitions (optional)
pipelines: # Data flow pipelines (REQUIRED)
Required sections: pipelines
Optional sections: All others (streams/tables can be inlined in pipelines)
Data Generation Applications
Generate and produce data to Kafka topics:
# Application metadata (optional)
name: "sensor-data-generator" # Optional
version: "1.0.0" # Optional
description: "Generate sensor data" # Optional
# Processing logic
functions: # Python function definitions (REQUIRED - must include generator functions)
producers: # Data producer definitions (REQUIRED)
Required sections: functions
(with generator functions), producers
Optional sections: name
, version
, description
Note: Data generation applications don't use streams/tables/stores sections since they only produce data.
Application Metadata
Optional metadata to describe your KSML application:
Property | Type | Required | Description |
---|---|---|---|
name |
String | No | The name of the KSML definition |
version |
String | No | The version of the KSML definition |
description |
String | No | A description of the KSML definition |
name: "order-processing-app"
version: "1.2.3"
description: "Processes orders and enriches them with customer data"
Data Sources and Targets
KSML supports three types of data streams based on Kafka Streams concepts. Each stream type has different characteristics and use cases for processing streaming data.
Streams (KStream)
Use for: Event-based processing where each record is an independent event.
streams:
user_clicks:
topic: user-clicks
keyType: string
valueType: json
offsetResetPolicy: earliest # Optional
timestampExtractor: click_timestamp_extractor # Optional
partitioner: click_partitioner # Optional
Key characteristics:
- Records are immutable and processed individually
- Each record represents an independent event or fact
- Records arrive in order and are processed one at a time
- Ideal for: user actions, sensor readings, transactions, logs
Property | Type | Required | Description |
---|---|---|---|
topic |
String | Yes | The Kafka topic to read from or write to |
keyType |
String | Yes | The type of the record key |
valueType |
String | Yes | The type of the record value |
offsetResetPolicy |
String | No | The offset reset policy. Valid values: earliest, latest, none, by_duration: |
timestampExtractor |
String | No | Function name to extract timestamps from records. Default: Kafka Streams default (message timestamp, fallback to current time) |
partitioner |
String | No | Function name that determines message partitioning for this stream/table. Default: Kafka default (hash-based on key) |
Stream Example with offsetResetPolicy
Stream Example with timestampExtractor
Tables (KTable)
Use for: State-based processing where records represent updates to entities.
tables:
user_profiles:
topic: user-profiles
keyType: string
valueType: avro:UserProfile
store: user_profiles_store # Optional state store name
Key characteristics:
- Records with the same key represent updates to the same entity
- Only the latest record for each key is retained (compacted)
- Represents a changelog stream with the current state
- Ideal for: user profiles, inventory levels, configuration settings
Property | Type | Required | Description |
---|---|---|---|
topic |
String | Yes | The Kafka topic to read from or write to |
keyType |
String | Yes | The type of the record key |
valueType |
String | Yes | The type of the record value |
offsetResetPolicy |
String | No | The offset reset policy. Valid values: earliest, latest, none, by_duration: |
timestampExtractor |
String | No | Function name to extract timestamps from records. Default: Kafka Streams default (message timestamp, fallback to current time) |
partitioner |
String | No | Function that determines message partitioning |
store |
String | No | The name of the key/value state store to use. Default: Auto-created store using topic name |
Table Example without store
Table Example with store
This example demonstrates using a custom inline state store for a table. The table uses custom persistence and caching settings, and the processor function accesses the table to enrich streaming data.
tables:
user_profiles:
topic: user_profiles
keyType: string
valueType: json
store:
type: keyValue
keyType: string
valueType: json
persistent: true
caching: true
logging: false
Producer - User Profile Data (click to expand)
streams:
user_profiles_output:
topic: user_profiles
keyType: string
valueType: json
functions:
generate_user_profile:
type: generator
resultType: (string, json)
code: |
import random
import time
# Generate realistic user profile data
user_id = "user_" + str(random.randint(1, 1000))
departments = ["engineering", "marketing", "sales", "support", "hr"]
locations = ["new_york", "london", "tokyo", "sydney"]
profile = {
"user_id": user_id,
"name": f"User {user_id.split('_')[1]}",
"email": f"{user_id}@company.com",
"department": random.choice(departments),
"location": random.choice(locations),
"created_at": int(time.time() * 1000),
"is_active": True
}
return (user_id, profile)
producers:
user_profile_producer:
to:
topic: user_profiles
keyType: string
valueType: json
generator: generate_user_profile
interval: 3000
Processor - Enrich Activity with Profiles (click to expand)
streams:
user_activity:
topic: user_activity
keyType: string
valueType: json
enriched_activity_output:
topic: enriched_user_activity
keyType: string
valueType: json
tables:
user_profiles:
topic: user_profiles
keyType: string
valueType: json
store:
type: keyValue
keyType: string
valueType: json
persistent: true
caching: true
logging: false
functions:
enrich_with_user_profile:
type: valueTransformer
stores:
- user_profiles
code: |
import time
# Look up user profile from the custom store
user_id = value.get("user_id")
if not user_id:
log.warn("No user_id found in activity event")
return value
# Access the table which uses the custom store
profile = user_profiles.get(user_id)
if profile:
# Enrich activity with user profile data
enriched_activity = {
**value,
"user_name": profile.get("name", "Unknown"),
"user_department": profile.get("department", "Unknown"),
"user_location": profile.get("location", "Unknown"),
"enriched_at": int(time.time() * 1000),
"store_used": "custom_inline_store"
}
log.info("Enriched activity for user: {}, department: {}, location: {}",
user_id, profile.get("department"), profile.get("location"))
return enriched_activity
else:
log.warn("User profile not found for user_id: {}", user_id)
return {
**value,
"enriched_at": int(time.time() * 1000),
"enrichment_status": "profile_not_found"
}
producers:
user_activity_producer:
to:
topic: user_activity
keyType: string
valueType: json
generator:
type: generator
resultType: (string, json)
code: |
import random
import time
# Generate activity events referencing existing users
user_id = "user_" + str(random.randint(1, 1000))
activities = ["login", "logout", "page_view", "document_upload", "meeting_join"]
activity = {
"user_id": user_id,
"activity_type": random.choice(activities),
"timestamp": int(time.time() * 1000),
"session_id": "session_" + str(random.randint(1000, 9999))
}
return (user_id, activity)
interval: 2000
pipelines:
enrich_user_activity:
from: user_activity
via:
- type: peek
forEach:
code: |
log.info("Processing activity: {} for user: {}",
value.get("activity_type"), value.get("user_id"))
- type: transformValue
mapper: enrich_with_user_profile
to: enriched_activity_output
The table user_profiles
uses an inline store definition with custom settings:
persistent: true
- Data survives application restartscaching: true
- Enables local caching for better performancelogging: false
- Disables changelog topic creation
The enrichment function accesses the table via stores: [user_profiles]
declaration and performs lookups using user_profiles.get(user_id)
.
Global Tables (GlobalKTable)
Use for: Reference data that needs to be available on all application instances.
globalTables:
product_catalog:
topic: product-catalog
keyType: string
valueType: avro:Product
store: product_catalog_store # Optional state store name
Key characteristics:
- Fully replicated on each application instance (not partitioned)
- Allows joins without requiring co-partitioning
- Provides global access to reference data
- Ideal for: product catalogs, country codes, small to medium reference datasets
Property | Type | Required | Description |
---|---|---|---|
topic |
String | Yes | The Kafka topic to read from |
keyType |
String | Yes | The type of the record key |
valueType |
String | Yes | The type of the record value |
offsetResetPolicy |
String | No | The offset reset policy. Valid values: earliest, latest, none, by_duration: |
timestampExtractor |
String | No | Function name to extract timestamps from records. Default: Kafka Streams default (message timestamp, fallback to current time) |
partitioner |
String | No | Function that determines message partitioning |
store |
String | No | The name of the key/value state store to use. Default: Auto-created store using topic name |
Global Table Example
Choosing the Right Stream Type
If you need to... | Consider using... |
---|---|
Process individual events as they occur | KStream |
Maintain the latest state of entities | KTable |
Join with data that's needed across all partitions | GlobalKTable |
Process time-ordered events | KStream |
Track changes to state over time | KTable |
Access reference data without worrying about partitioning | GlobalKTable |
Pipelines
Define how data flows through your application:
pipelines:
process_orders:
from: orders
via:
- type: filter
if: is_valid_order
- type: mapValues
mapper: enrich_order
to: processed_orders
For complete pipeline documentation, see Pipeline Reference.
Functions
Define reusable Python logic for processing:
For complete function documentation, see Function Reference.
Operations
Operations are the building blocks that transform, filter, and process your data within pipelines:
via:
- type: filter # Keep matching records
if: is_valid_order
- type: mapValues # Transform record values
mapper: enrich_order
- type: join # Combine with other streams
with: customers
For complete operation documentation, see Operation Reference.
State Stores
Define persistent state stores for stateful operations:
stores:
session_store:
type: keyValue
keyType: string
valueType: json
persistent: true
caching: true
For details, see State Store Reference.
Producers
Define data generators for testing and simulation: