Performance Optimization in KSML
This tutorial covers optimization techniques for KSML applications to improve throughput, reduce latency, and minimize resource usage.
Introduction
Performance optimization helps you process more data efficiently while reducing costs and maintaining low latency under high loads.
Prerequisites
Before starting this tutorial:
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic mixed_quality_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic filtered_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic final_processed_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic binary_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic optimized_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic readable_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_activity && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_metrics_summary && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic high_volume_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic processed_events && \
Core Performance Concepts
Identifying Bottlenecks
Before optimizing, identify where performance bottlenecks exist:
- Processing time: How long it takes to process each message
- Throughput: Messages processed per second
- State store size: How much data is stored in state stores
- Memory usage: JVM heap and non-heap memory
- GC activity: Frequency and duration of garbage collection
Key Optimization Areas
- Python Function Efficiency: Optimize code for minimal object creation and fast execution
- State Store Configuration: Configure stores for your specific workload
- Pipeline Design: Structure pipelines to filter early and process efficiently
- Data Serialization: Choose efficient formats and minimize serialization overhead
Efficient Processing Patterns
This example demonstrates optimized Python code and efficient data handling techniques.
What it does:
- Produces high-volume events: Creates user events (click, purchase, view) with categories, values, but also includes low-value events (scroll, hover)
- Filters early: Immediately discards uninteresting events (scroll, hover) to avoid processing overhead downstream
- Pre-computes efficiently: Uses global dictionaries for priority events and category multipliers, calculates scores with minimal object creation
- Processes with JSON: Extracts fields from JSON using
.get()
, builds result objects for Kowl UI readability - Logs selectively: Only logs high-score events to reduce I/O overhead while maintaining performance visibility
High-volume producer (compact format) - click to expand
# Producer for performance optimization demo - generates high volume events
functions:
generate_high_volume_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
user_ids = [f"user_{i:04d}" for i in range(1, 101)] # 100 users
event_types = ["view", "click", "purchase", "search", "scroll", "hover"]
product_categories = ["electronics", "clothing", "books", "home", "sports"]
code: |
global event_counter, user_ids, event_types, product_categories
event_counter += 1
user_id = random.choice(user_ids)
event_type = random.choice(event_types)
# Generate realistic event data for performance testing
event_data = {
"event_id": f"evt_{event_counter:08d}",
"type": event_type,
"user_id": user_id,
"timestamp": int(time.time() * 1000),
"product_id": f"prod_{random.randint(1, 1000):04d}",
"category": random.choice(product_categories),
"value": round(random.uniform(1.0, 100.0), 2) if event_type == "purchase" else 0,
"session_id": f"sess_{hash(user_id) % 50:03d}",
"metadata": {
"source": "web",
"device": random.choice(["desktop", "mobile", "tablet"]),
"browser": random.choice(["chrome", "firefox", "safari"])
}
}
expression: (user_id, event_data)
resultType: (string, json)
producers:
high_volume_producer:
generator: generate_high_volume_events
interval: 1s # High frequency for performance testing
to:
topic: high_volume_events
keyType: string
valueType: json
Efficient processor (optimized Python) - click to expand
# Processor demonstrating efficient processing patterns
streams:
high_volume_events:
topic: high_volume_events
keyType: string
valueType: json
functions:
efficient_event_processor:
type: valueTransformer
globalCode: |
# Pre-compute expensive operations outside the processing loop
PRIORITY_EVENTS = {"purchase", "search"}
MULTIPLIERS = {"electronics": 1.5, "clothing": 1.2, "books": 1.0, "home": 1.3, "sports": 1.1}
# Use efficient data structures and avoid object creation
def calculate_score(event_type, category, value):
"""Efficient score calculation with minimal object creation"""
base_score = 10 if event_type in PRIORITY_EVENTS else 5
category_multiplier = MULTIPLIERS.get(category, 1.0)
value_component = min(value * 0.1, 10) # Cap value component
return round(base_score * category_multiplier + value_component, 2)
code: |
# Early filtering - discard uninteresting events immediately
event_type = value.get("type")
if event_type in ["scroll", "hover"]:
return None
# Extract needed fields from JSON
user_id = value.get("user_id")
category = value.get("category")
value_amount = value.get("value", 0)
timestamp = value.get("timestamp")
product_id = value.get("product_id")
# Efficient processing with pre-computed values
score = calculate_score(event_type, category, value_amount)
# Build result as JSON for better readability in Kowl UI
result = {
"status": "PROCESSED",
"event_type": event_type,
"user_id": user_id,
"product_id": product_id,
"category": category,
"score": score,
"value": round(value_amount, 2),
"timestamp": timestamp,
"metadata": value.get("metadata")
}
# Log only important events to reduce I/O
if event_type in PRIORITY_EVENTS:
log.info("High-priority event processed: {} for user {} with score {:.2f}", event_type, user_id, score)
return result
expression: result if result else None
resultType: json
pipelines:
efficient_processing_pipeline:
from: high_volume_events
via:
- type: mapValues
mapper: efficient_event_processor
- type: filter
if:
expression: value is not None
to:
topic: processed_events
keyType: string
valueType: json
Key optimization techniques:
- Global code optimization: Pre-compute expensive operations outside processing loops
- Early filtering: Discard unwanted events immediately to reduce downstream processing
- Efficient logging: Log only important events to reduce I/O overhead
State Store Optimization
This example shows how to configure and use state stores efficiently for high-performance scenarios.
What it does:
- Produces activity events: Creates user activities (login, page_view, click, purchase, logout) with scores, durations, timestamps in JSON format
- Stores metrics compactly: Converts JSON to compact string format "total_count:login_count:page_view_count:click_count:purchase_count:logout_count:total_score"
- Updates efficiently: Parses compact string to array, updates counters by index mapping, avoids object creation during updates
- Uses optimized store: Configures state store with increased cache size (16MB) and segments (32) for better performance
- Outputs JSON summaries: Returns readable JSON results with averages and totals while keeping internal storage compact
User metrics producer (activity data) - click to expand
# Producer for state store optimization demo - generates user activity metrics
functions:
generate_user_activity:
type: generator
globalCode: |
import random
import time
activity_counter = 0
user_ids = [f"user_{i:03d}" for i in range(1, 21)] # 20 users for state demo
activities = ["login", "page_view", "click", "purchase", "logout"]
code: |
global activity_counter, user_ids, activities
activity_counter += 1
user_id = random.choice(user_ids)
activity = random.choice(activities)
# Create activity with metrics that need aggregation
activity_data = {
"activity_type": activity,
"score": random.randint(1, 100),
"timestamp": int(time.time() * 1000),
"session_id": f"sess_{hash(user_id) % 10:02d}",
"duration_ms": random.randint(100, 5000)
}
expression: (user_id, activity_data)
resultType: (string, json)
producers:
user_activity_producer:
generator: generate_user_activity
interval: 1s
to:
topic: user_activity
keyType: string
valueType: json
Optimized state processor (compact storage) - click to expand
# Processor demonstrating optimized state store configuration and usage
streams:
user_activity:
topic: user_activity
keyType: string
valueType: json
stores:
# Optimized state store configuration
user_metrics_store:
type: keyValue
keyType: string
valueType: string # Using compact string format for state storage performance
persistent: true
caching: true
# Increased cache size for better performance
cacheSizeBytes: 52428800 # 50MB
functions:
update_user_metrics:
type: valueTransformer
globalCode: |
# Use compact string format for state storage to reduce serialization overhead
def parse_metrics(metrics_str):
"""Parse compact metrics format: count:login:page_view:click:purchase:logout:total_score"""
if not metrics_str:
return [0, 0, 0, 0, 0, 0, 0] # Default metrics
parts = metrics_str.split(":")
return [int(x) for x in parts] if len(parts) == 7 else [0, 0, 0, 0, 0, 0, 0]
def format_metrics(metrics_list):
"""Format metrics into compact string"""
return ":".join(str(x) for x in metrics_list)
code: |
# Extract activity data from JSON
activity_type = value.get("activity_type")
if not activity_type:
return None
score = value.get("score", 0)
timestamp = value.get("timestamp", 0)
duration_ms = value.get("duration_ms", 0)
# Get current metrics from optimized state store
current_metrics_str = user_metrics_store.get(key)
metrics = parse_metrics(current_metrics_str)
# Update metrics efficiently - avoid object creation
metrics[0] += 1 # total_count
# Update activity-specific counters using index mapping
activity_indices = {
"login": 1, "page_view": 2, "click": 3, "purchase": 4, "logout": 5
}
if activity_type in activity_indices:
metrics[activity_indices[activity_type]] += 1
metrics[6] += score # total_score
# Store back in compact format
user_metrics_store.put(key, format_metrics(metrics))
# Calculate derived metrics efficiently
avg_score = metrics[6] / metrics[0] if metrics[0] > 0 else 0
# Return summary as JSON for better readability in Kowl UI
result = {
"user_id": key,
"metrics": {
"total_count": metrics[0],
"login_count": metrics[1],
"page_view_count": metrics[2],
"click_count": metrics[3],
"purchase_count": metrics[4],
"logout_count": metrics[5],
"total_score": metrics[6],
"avg_score": round(avg_score, 2)
},
"last_activity": {
"type": activity_type,
"score": score,
"duration_ms": duration_ms,
"timestamp": timestamp
}
}
# Log only significant changes to reduce I/O
if metrics[0] % 10 == 0: # Log every 10th event
log.info("User {} metrics: {} total activities, avg score: {:.2f}", key, metrics[0], avg_score)
return result
expression: result if result else None
resultType: json
stores:
- user_metrics_store
pipelines:
optimized_state_pipeline:
from: user_activity
via:
- type: mapValues
mapper: update_user_metrics
- type: filter
if:
expression: value is not None
to:
topic: user_metrics_summary
keyType: string
valueType: json
State store optimizations:
- JSON input/output: Uses JSON for better debugging while maintaining compact string storage internally
- Hybrid approach: JSON for input/output messages, compact strings for state storage efficiency
- Increased cache: Configure larger cache sizes (50MB) for better performance
- Direct field access: Extract JSON fields using
value.get()
instead of string parsing - Conditional logging: Log only significant changes to reduce I/O
Pipeline Optimization
This example demonstrates optimized pipeline design with early filtering and staged processing.
What it does:
- Produces mixed quality events: Creates events with valid/invalid data, various priorities (high, low, spam), different event types for filtering tests
- Filters early: Uses predicate function to immediately discard invalid events, spam, and bot traffic before expensive processing
- Processes in stages: Stage 1 = lightweight enrichment (add status, extract fields), Stage 2 = heavy processing (complex calculations)
- Separates concerns: Lightweight operations (field extraction) happen first, expensive operations (calculations) happen on filtered data only
- Outputs progressively: filtered_events → enriched_events → final_processed_events, each stage adds more data while maintaining JSON readability
Mixed events producer (quality testing) - click to expand
# Producer for pipeline optimization demo - generates mixed quality events
functions:
generate_mixed_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
event_types = ["valid_purchase", "valid_view", "spam", "invalid", "test", "bot_traffic"]
priorities = ["high", "medium", "low", "spam"]
code: |
global event_counter, event_types, priorities
event_counter += 1
event_type = random.choice(event_types)
priority = random.choice(priorities)
# Mix of valid and invalid events to demonstrate filtering optimization
is_valid = event_type.startswith("valid") and priority != "spam"
# Create event data with quality indicators as JSON
event_data = {
"event_type": event_type,
"priority": priority,
"is_valid": is_valid,
"score": random.randint(1, 1000),
"timestamp": int(time.time() * 1000),
"batch_id": event_counter // 100, # Group events in batches
"source": "pipeline_test"
}
expression: (f"event_{event_counter:06d}", event_data)
resultType: (string, json)
producers:
mixed_events_producer:
generator: generate_mixed_events
interval: 1s # High frequency to test filtering efficiency
to:
topic: mixed_quality_events
keyType: string
valueType: json
Pipeline optimization processor (staged processing) - click to expand
# Processor demonstrating optimized pipeline design with early filtering
streams:
mixed_quality_events:
topic: mixed_quality_events
keyType: string
valueType: json
filtered_events:
topic: filtered_events
keyType: string
valueType: json
enriched_events:
topic: enriched_events
keyType: string
valueType: json
functions:
early_filter:
type: predicate
code: |
# Early filtering - extract minimal data and filter out unwanted events immediately
event_type = value.get("event_type", "")
priority = value.get("priority", "")
is_valid = value.get("is_valid", False)
# Filter out spam, invalid, and bot traffic early to reduce downstream processing
return is_valid and event_type.startswith("valid") and priority != "spam"
expression: result
resultType: boolean
lightweight_enricher:
type: valueTransformer
code: |
# Lightweight enrichment - add only essential data
event_type = value.get("event_type", "")
priority = value.get("priority", "")
score = value.get("score", 0)
timestamp = value.get("timestamp", 0)
# Add minimal enrichment data as JSON for better readability
enriched = {
"status": "ENRICHED",
"event_type": event_type,
"priority": priority,
"score": score,
"timestamp": timestamp,
"batch_id": value.get("batch_id", 0),
"source": value.get("source", "unknown")
}
return enriched
expression: result if result else None
resultType: json
heavy_processor:
type: valueTransformer
globalCode: |
# Expensive operations that should only run on filtered data
def calculate_complex_score(event_type, priority, base_score):
"""Simulate expensive calculation"""
multipliers = {"high": 3.0, "medium": 2.0, "low": 1.0}
type_bonus = 50 if "purchase" in event_type else 10
# Simulate complex computation
complex_score = base_score * multipliers.get(priority, 1.0) + type_bonus
# Additional expensive operations
for i in range(100): # Simulate computational overhead
complex_score += (i % 5) * 0.1
return round(complex_score, 2)
code: |
# Heavy processing - only runs on pre-filtered valid events
event_type = value.get("event_type", "")
priority = value.get("priority", "")
base_score = value.get("score", 0)
timestamp = value.get("timestamp", 0)
# Expensive calculation only on filtered data
complex_score = calculate_complex_score(event_type, priority, base_score)
# Generate comprehensive result as JSON for better readability in Kowl UI
result = {
"status": "PROCESSED",
"event_type": event_type,
"priority": priority,
"scores": {
"base_score": base_score,
"complex_score": complex_score
},
"timestamp": timestamp,
"batch_id": value.get("batch_id", 0),
"source": value.get("source", "unknown"),
"processing_stage": "final"
}
log.info("Completed heavy processing for {}: score {:.2f}", event_type, complex_score)
return result
expression: result if result else None
resultType: json
pipelines:
# Stage 1: Early filtering to reduce data volume
filter_stage:
from: mixed_quality_events
via:
- type: filter
if: early_filter # Filter out 70-80% of events early
to:
topic: filtered_events
keyType: string
valueType: json
# Stage 2: Lightweight enrichment on filtered data
enrich_stage:
from: filtered_events
via:
- type: mapValues
mapper: lightweight_enricher
- type: filter
if:
expression: value is not None
to:
topic: enriched_events
keyType: string
valueType: json
# Stage 3: Heavy processing only on high-quality filtered data
process_stage:
from: enriched_events
via:
- type: mapValues
mapper: heavy_processor
- type: filter
if:
expression: value is not None
to:
topic: final_processed_events
keyType: string
valueType: json
Pipeline design principles:
- Filter early: Remove unwanted data before expensive processing using JSON field checks
- Staged processing: Separate lightweight and heavy operations into different stages
- Efficient predicates: Use
value.get()
for fast field-based filtering
Serialization Optimization
This example shows efficient data format usage and minimal serialization overhead.
What it does:
- Produces compact data: Creates events with numeric event_type_ids (1=view, 2=click, 3=purchase) instead of strings for efficiency
- Uses lookup tables: Pre-computes event type mappings in globalCode for fast ID-to-name conversions without string operations
- Filters by score: Early filtering discards events with score <10 to reduce processing volume
- Processes by type: Applies different logic based on event_type_id (purchases get 10% value bonus + 20 score bonus)
Binary data producer (compact format) - click to expand
# Producer for serialization optimization demo - generates binary data for performance
functions:
generate_binary_events:
type: generator
globalCode: |
import random
import time
import struct
event_counter = 0
user_ids = list(range(1, 101)) # Numeric user IDs for binary encoding
code: |
global event_counter, user_ids
event_counter += 1
user_id = random.choice(user_ids)
# Generate data for binary serialization comparison
timestamp = int(time.time() * 1000)
event_type_id = random.randint(1, 5) # Encoded event types
value = random.randint(1, 10000)
score = random.randint(1, 100)
# Create structured data in JSON format (better than compact strings for readability)
# While JSON has more overhead than binary, it provides better debugging/monitoring
event_data = {
"user_id": user_id,
"timestamp": timestamp,
"event_type_id": event_type_id,
"value": value,
"score": score,
"metadata": {
"batch": event_counter // 50,
"version": 1
}
}
expression: (str(user_id), event_data)
resultType: (string, json)
producers:
binary_data_producer:
generator: generate_binary_events
interval: 1s # High frequency for performance testing
to:
topic: binary_events
keyType: string
valueType: json
Serialization optimization processor (efficient parsing) - click to expand
# Processor demonstrating serialization optimization patterns
streams:
binary_events:
topic: binary_events
keyType: string
valueType: json
optimized_events:
topic: optimized_events
keyType: string
valueType: json
functions:
efficient_binary_processor:
type: valueTransformer
globalCode: |
# Lookup tables for efficient conversion (avoid string operations)
EVENT_TYPES = {1: "view", 2: "click", 3: "purchase", 4: "search", 5: "logout"}
def create_optimized_output(data, event_type):
"""Create optimized JSON output with structured data"""
return {
"processed_event": {
"type": event_type,
"user_id": data["user_id"],
"value": data["value"],
"score": data["score"],
"timestamp": data["timestamp"]
},
"metadata": data.get("metadata", {}),
"processing_info": {
"optimized": True,
"version": "v2"
}
}
code: |
# Extract data from JSON (more readable than binary parsing)
user_id = value.get("user_id")
timestamp = value.get("timestamp")
event_type_id = value.get("event_type_id")
current_value = value.get("value", 0)
score = value.get("score", 0)
# Early filtering based on score
if score < 10: # Filter low-quality events
return None
# Efficient processing with direct field access
if event_type_id == 3: # Purchase events
# Apply purchase-specific logic
processed_value = int(current_value * 1.1) # 10% bonus
final_score = min(score + 20, 100) # Bonus capped at 100
else:
processed_value = current_value
final_score = score
# Get event type name for output
event_type = EVENT_TYPES.get(event_type_id, "unknown")
# Create optimized JSON output
optimized_data = {
"user_id": user_id,
"timestamp": timestamp,
"event_type_id": event_type_id,
"value": processed_value,
"score": final_score
}
result = create_optimized_output(optimized_data, event_type)
return result
expression: result if result else None
resultType: json
convert_to_readable:
type: valueTransformer
code: |
# Convert optimized JSON to human-readable final format
processed_event = value.get("processed_event", {})
if not processed_event:
return None
event_type = processed_event.get("type", "unknown")
user_id = processed_event.get("user_id")
event_value = processed_event.get("value")
score = processed_event.get("score")
timestamp = processed_event.get("timestamp")
# Create final readable JSON output for Kowl UI
readable = {
"summary": f"EVENT:{event_type} USER:{user_id} VALUE:{event_value} SCORE:{score}",
"details": {
"event_type": event_type,
"user_id": user_id,
"value": event_value,
"score": score,
"timestamp": timestamp
},
"metadata": value.get("metadata", {}),
"processing_info": value.get("processing_info", {})
}
return readable
expression: result if result else None
resultType: json
pipelines:
# High-performance processing pipeline
binary_processing_pipeline:
from: binary_events
via:
- type: mapValues
mapper: efficient_binary_processor
- type: filter
if:
expression: value is not None
to:
topic: optimized_events
keyType: string
valueType: json
# Convert to readable format for final output
readable_output_pipeline:
from: optimized_events
via:
- type: mapValues
mapper: convert_to_readable
- type: filter
if:
expression: value is not None
to:
topic: readable_events
keyType: string
valueType: json
Serialization best practices:
- Lookup tables: Pre-compute mappings to avoid repeated string operations
- Progressive transformation: Transform data from raw → optimized → final readable formats
- Field-based access: Use
value.get()
instead of string parsing for cleaner, faster code
Configuration Optimization
Kafka Streams Configuration
Optimize Kafka Streams configuration for your workload based on KSML's internal configuration system:
runner:
type: streams
config:
application.id: optimized-ksml-app
bootstrap.servers: kafka:9092
# Threading Configuration
num.stream.threads: 8 # Parallel processing threads (default: 1)
# State Store Caching
cache.max.bytes.buffering: 104857600 # 100MB total cache (default: 10MB)
commit.interval.ms: 30000 # Commit frequency in ms (default: 30000)
# Topology Optimization
topology.optimization: all # Enable all optimizations (default: all)
# Producer Performance Settings
producer.linger.ms: 100 # Wait for batching (default: 0)
producer.batch.size: 16384 # Batch size in bytes (default: 16384)
producer.buffer.memory: 33554432 # 32MB producer buffer (default: 32MB)
producer.acks: 1 # Acknowledgment level (1 = leader only)
# Consumer Performance Settings
consumer.fetch.max.bytes: 52428800 # 50MB max fetch (default: 52428800)
consumer.max.poll.records: 500 # Records per poll (default: 500)
consumer.session.timeout.ms: 45000 # Session timeout (default: 45000)
consumer.heartbeat.interval.ms: 3000 # Heartbeat frequency (default: 3000)
# Processing Guarantees
processing.guarantee: at_least_once # Options: at_least_once, exactly_once_v2
# Error Handling
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.production.exception.handler: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Key Configuration Details:
- num.stream.threads: Controls parallelism. Set based on CPU cores and partition count
- cache.max.bytes.buffering: KSML automatically enables caching when
caching: true
is set on stores - topology.optimization: KSML defaults to
StreamsConfig.OPTIMIZE
for performance - commit.interval.ms: Balance between throughput and latency. Higher values = better throughput
- processing.guarantee:
exactly_once_v2
provides stronger guarantees but lower performance
KSML-Specific Optimizations:
KSML automatically configures several optimizations:
- Custom exception handlers for production and deserialization errors
- Automatic header cleanup interceptors for all consumers
- Optimized client suppliers for resolving configurations
- Built-in metrics reporters with topology enrichment
State Store Configuration
Configure state stores for optimal performance based on KSML's store configuration system:
stores:
# High-performance key-value store
optimized_keyvalue_store:
type: keyValue
keyType: string
valueType: string # Use string over JSON for performance-critical stores
persistent: true # Enable durability (uses RocksDB)
caching: true # Enable caching for frequent access
logging: true # Enable changelog for fault tolerance
timestamped: false # Disable if timestamps not needed
versioned: false # Disable if versioning not needed
# Memory-optimized store for temporary data
temp_cache_store:
type: keyValue
keyType: string
valueType: string
persistent: false # In-memory only for speed
caching: true # Still enable caching
logging: false # No changelog needed for temp data
# Window store with retention
metrics_window_store:
type: window
keyType: string
valueType: json # JSON acceptable for metrics
windowSize: PT5M # 5-minute windows
retention: PT1H # Keep 1 hour of data
persistent: true
caching: true
logging: true
# Session store for user sessions
user_session_store:
type: session
keyType: string
valueType: string # Compact string format for sessions
retention: PT30M # 30-minute session timeout
persistent: true
caching: true
logging: true
Store Type Performance Characteristics:
Store Type | Use Case | Performance Notes |
---|---|---|
keyValue | General caching, counters, lookups | Fastest access, least memory overhead |
window | Time-based aggregations | Automatic cleanup, good for time-series |
session | User sessions, activity tracking | Session-aware, handles gaps automatically |
Configuration Parameter Details:
-
persistent:
true
: Uses RocksDB for disk persistence, survives restartsfalse
: In-memory only, faster but data lost on restart
-
caching:
true
: KSML enables write caching to reduce downstream trafficfalse
: Direct writes, lower latency but higher network usage
-
logging:
true
: Creates changelog topic for fault tolerance and exactly-once processingfalse
: No changelog, faster but no fault tolerance
-
timestamped:
true
: Stores values with timestamps for time-aware processingfalse
: Plain values, more efficient when timestamps not needed
-
versioned:
true
: Maintains multiple versions of values with configurable retentionfalse
: Single version only, more memory efficient
KSML Store Optimizations:
KSML automatically optimizes store configuration:
- Uses efficient Serde types based on configured data types
- Applies proper supplier selection (persistent vs. in-memory)
- Configures caching and logging based on specified options
- Handles timestamped and versioned variants automatically
Python Function Best Practices
Optimize Code Structure
functions:
optimized_function:
type: valueTransformer
globalCode: |
# Pre-compute expensive operations
MULTIPLIERS = {"high": 3.0, "medium": 2.0, "low": 1.0}
def efficient_calculation(value, priority):
return value * MULTIPLIERS.get(priority, 1.0)
code: |
# Use pre-computed values and avoid object creation
priority = value.get("priority", "low")
result = efficient_calculation(value.get("amount", 0), priority)
return f"processed:{result:.2f}"
Key techniques:
- Use globalCode: Pre-compute expensive operations outside the processing loop
- Minimize object creation: Reuse objects and data structures when possible
- Use built-in functions: They're typically faster than custom implementations
- Avoid unnecessary computations: Compute values only when needed
Efficient Data Structures
Choose optimal data structures based on KSML's data type system and your use case:
KSML Data Types (Performance Ranking)
# Fastest - Use for high-throughput scenarios
keyType: string # Most efficient for keys
valueType: string # Fastest serialization
# Fast - Good balance of performance and functionality
keyType: long # Efficient numeric keys
valueType: avro:Schema # Efficient binary format with schema
# Moderate - Flexible but more overhead
keyType: json # Flexible structure
valueType: json # Easy to work with, good for Kowl UI
# Slower - Use only when necessary
valueType: xml # XML processing overhead
valueType: soap # SOAP envelope overhead
Python Data Structure Guidelines
Optimal patterns for different scenarios:
-
Fast counters/accumulators
-
Compact State Representation
-
Efficient Collections (limit size)
-
Fast Lookup Tables (use globalCode)
-
Efficient String Operations
Memory-Efficient Patterns
Optimal memory usage techniques:
-
Reuse Objects
-
Generator Patterns
-
Lazy Evaluation
-
Compact Data Types
KSML-Specific Optimizations:
- Serde Selection: KSML automatically chooses optimal serializers based on data type
- Union Types: For flexible schemas, KSML uses efficient UnionSerde implementation
- Type Flattening: Complex types are automatically flattened for performance
- Caching: Serde instances are cached and reused across operations
Data Type Performance Comparison:
Data Type | Serialization Speed | Size Efficiency | Schema Evolution | Use Case |
---|---|---|---|---|
string | Fastest | Good | Limited | Simple values, IDs, compact formats |
long/int | Fastest | Excellent | None | Counters, timestamps, numeric keys |
avro | Fast | Excellent | Excellent | Complex schemas, production systems |
json | Moderate | Good | Good | Development, debugging, flexible data |
protobuf(coming soon) | Fast | Excellent | Good | High-performance, cross-language |
xml/soap | Slow | Poor | Limited | Legacy systems, specific protocols |
Monitoring Performance
Key Metrics to Track
Monitor these metrics to identify performance issues:
functions:
performance_monitor:
type: forEach
code: |
# Record processing time
start_time = time.time()
process_message(key, value)
processing_time = (time.time() - start_time) * 1000
# Log performance metrics periodically
if processing_time > 100: # Log slow operations
log.warn("Slow processing detected: {:.2f}ms for key {}", processing_time, key)
Important metrics:
- Processing latency: Time to process individual messages
- Throughput: Messages processed per second
- Memory usage: JVM heap utilization
- State store size: Growth rate and total size
- Error rates: Failed processing attempts
Best Practices Summary
Do's
- Measure first: Establish baselines before optimizing
- Filter early: Remove unwanted data as soon as possible
- Use caching: Configure appropriate cache sizes for state stores
- Optimize hot paths: Focus on frequently executed code
- Pre-compute values: Move expensive operations to globalCode
- Choose efficient formats: Use compact data representations
Don'ts
- Don't over-optimize: Focus on actual bottlenecks, not theoretical ones
- Don't ignore trade-offs: Balance throughput, latency, and resource usage
- Don't optimize without measuring: Always measure the impact of changes
- Don't create unnecessary objects: Reuse data structures when possible
- Don't log excessively: Reduce I/O overhead from logging
- Don't use complex serialization: Avoid JSON when simple formats suffice
Conclusion
Performance optimization in KSML involves a combination of efficient Python code, optimized configuration, and smart pipeline design. By applying these techniques systematically and measuring their impact, you can build KSML applications that efficiently process high volumes of data with consistent performance.
The key is to start with measurement, identify actual bottlenecks, and apply optimizations incrementally while monitoring their effectiveness.