Skip to content

Performance Optimization in KSML

This tutorial covers optimization techniques for KSML applications to improve throughput, reduce latency, and minimize resource usage.

Introduction

Performance optimization helps you process more data efficiently while reducing costs and maintaining low latency under high loads.

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic mixed_quality_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic filtered_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic final_processed_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic binary_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic optimized_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic readable_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_activity && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_metrics_summary && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic high_volume_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic processed_events && \

Core Performance Concepts

Identifying Bottlenecks

Before optimizing, identify where performance bottlenecks exist:

  • Processing time: How long it takes to process each message
  • Throughput: Messages processed per second
  • State store size: How much data is stored in state stores
  • Memory usage: JVM heap and non-heap memory
  • GC activity: Frequency and duration of garbage collection

Key Optimization Areas

  1. Python Function Efficiency: Optimize code for minimal object creation and fast execution
  2. State Store Configuration: Configure stores for your specific workload
  3. Pipeline Design: Structure pipelines to filter early and process efficiently
  4. Data Serialization: Choose efficient formats and minimize serialization overhead

Efficient Processing Patterns

This example demonstrates optimized Python code and efficient data handling techniques.

What it does:

  • Produces high-volume events: Creates user events (click, purchase, view) with categories, values, but also includes low-value events (scroll, hover)
  • Filters early: Immediately discards uninteresting events (scroll, hover) to avoid processing overhead downstream
  • Pre-computes efficiently: Uses global dictionaries for priority events and category multipliers, calculates scores with minimal object creation
  • Processes with JSON: Extracts fields from JSON using .get(), builds result objects for Kowl UI readability
  • Logs selectively: Only logs high-score events to reduce I/O overhead while maintaining performance visibility
High-volume producer (compact format) - click to expand
# Producer for performance optimization demo - generates high volume events

functions:
  generate_high_volume_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      user_ids = [f"user_{i:04d}" for i in range(1, 101)]  # 100 users
      event_types = ["view", "click", "purchase", "search", "scroll", "hover"]
      product_categories = ["electronics", "clothing", "books", "home", "sports"]
    code: |
      global event_counter, user_ids, event_types, product_categories

      event_counter += 1
      user_id = random.choice(user_ids)
      event_type = random.choice(event_types)

      # Generate realistic event data for performance testing
      event_data = {
        "event_id": f"evt_{event_counter:08d}",
        "type": event_type,
        "user_id": user_id,
        "timestamp": int(time.time() * 1000),
        "product_id": f"prod_{random.randint(1, 1000):04d}",
        "category": random.choice(product_categories),
        "value": round(random.uniform(1.0, 100.0), 2) if event_type == "purchase" else 0,
        "session_id": f"sess_{hash(user_id) % 50:03d}",
        "metadata": {
          "source": "web",
          "device": random.choice(["desktop", "mobile", "tablet"]),
          "browser": random.choice(["chrome", "firefox", "safari"])
        }
      }

    expression: (user_id, event_data)
    resultType: (string, json)

producers:
  high_volume_producer:
    generator: generate_high_volume_events
    interval: 1s  # High frequency for performance testing
    to:
      topic: high_volume_events
      keyType: string
      valueType: json
Efficient processor (optimized Python) - click to expand
# Processor demonstrating efficient processing patterns

streams:
  high_volume_events:
    topic: high_volume_events
    keyType: string
    valueType: json

functions:
  efficient_event_processor:
    type: valueTransformer
    globalCode: |
      # Pre-compute expensive operations outside the processing loop
      PRIORITY_EVENTS = {"purchase", "search"}
      MULTIPLIERS = {"electronics": 1.5, "clothing": 1.2, "books": 1.0, "home": 1.3, "sports": 1.1}

      # Use efficient data structures and avoid object creation
      def calculate_score(event_type, category, value):
        """Efficient score calculation with minimal object creation"""
        base_score = 10 if event_type in PRIORITY_EVENTS else 5
        category_multiplier = MULTIPLIERS.get(category, 1.0)
        value_component = min(value * 0.1, 10)  # Cap value component
        return round(base_score * category_multiplier + value_component, 2)

    code: |
      # Early filtering - discard uninteresting events immediately
      event_type = value.get("type")
      if event_type in ["scroll", "hover"]:
        return None

      # Extract needed fields from JSON
      user_id = value.get("user_id")
      category = value.get("category")
      value_amount = value.get("value", 0)
      timestamp = value.get("timestamp")
      product_id = value.get("product_id")

      # Efficient processing with pre-computed values
      score = calculate_score(event_type, category, value_amount)

      # Build result as JSON for better readability in Kowl UI
      result = {
        "status": "PROCESSED",
        "event_type": event_type,
        "user_id": user_id,
        "product_id": product_id,
        "category": category,
        "score": score,
        "value": round(value_amount, 2),
        "timestamp": timestamp,
        "metadata": value.get("metadata")
      }

      # Log only important events to reduce I/O
      if event_type in PRIORITY_EVENTS:
        log.info("High-priority event processed: {} for user {} with score {:.2f}", event_type, user_id, score)

      return result

    expression: result if result else None
    resultType: json

pipelines:
  efficient_processing_pipeline:
    from: high_volume_events
    via:
      - type: mapValues
        mapper: efficient_event_processor
      - type: filter
        if:
          expression: value is not None
    to:
      topic: processed_events
      keyType: string
      valueType: json

Key optimization techniques:

  • Global code optimization: Pre-compute expensive operations outside processing loops
  • Early filtering: Discard unwanted events immediately to reduce downstream processing
  • Efficient logging: Log only important events to reduce I/O overhead

State Store Optimization

This example shows how to configure and use state stores efficiently for high-performance scenarios.

What it does:

  • Produces activity events: Creates user activities (login, page_view, click, purchase, logout) with scores, durations, timestamps in JSON format
  • Stores metrics compactly: Converts JSON to compact string format "total_count:login_count:page_view_count:click_count:purchase_count:logout_count:total_score"
  • Updates efficiently: Parses compact string to array, updates counters by index mapping, avoids object creation during updates
  • Uses optimized store: Configures state store with increased cache size (16MB) and segments (32) for better performance
  • Outputs JSON summaries: Returns readable JSON results with averages and totals while keeping internal storage compact
User metrics producer (activity data) - click to expand
# Producer for state store optimization demo - generates user activity metrics

functions:
  generate_user_activity:
    type: generator
    globalCode: |
      import random
      import time
      activity_counter = 0
      user_ids = [f"user_{i:03d}" for i in range(1, 21)]  # 20 users for state demo
      activities = ["login", "page_view", "click", "purchase", "logout"]
    code: |
      global activity_counter, user_ids, activities

      activity_counter += 1
      user_id = random.choice(user_ids)
      activity = random.choice(activities)

      # Create activity with metrics that need aggregation
      activity_data = {
        "activity_type": activity,
        "score": random.randint(1, 100),
        "timestamp": int(time.time() * 1000),
        "session_id": f"sess_{hash(user_id) % 10:02d}",
        "duration_ms": random.randint(100, 5000)
      }

    expression: (user_id, activity_data)
    resultType: (string, json)

producers:
  user_activity_producer:
    generator: generate_user_activity
    interval: 1s
    to:
      topic: user_activity
      keyType: string
      valueType: json
Optimized state processor (compact storage) - click to expand
# Processor demonstrating optimized state store configuration and usage

streams:
  user_activity:
    topic: user_activity
    keyType: string
    valueType: json

stores:
  # Optimized state store configuration
  user_metrics_store:
    type: keyValue
    keyType: string
    valueType: string  # Using compact string format for state storage performance
    persistent: true
    caching: true
    # Increased cache size for better performance
    cacheSizeBytes: 52428800  # 50MB

functions:
  update_user_metrics:
    type: valueTransformer
    globalCode: |
      # Use compact string format for state storage to reduce serialization overhead
      def parse_metrics(metrics_str):
        """Parse compact metrics format: count:login:page_view:click:purchase:logout:total_score"""
        if not metrics_str:
          return [0, 0, 0, 0, 0, 0, 0]  # Default metrics
        parts = metrics_str.split(":")
        return [int(x) for x in parts] if len(parts) == 7 else [0, 0, 0, 0, 0, 0, 0]

      def format_metrics(metrics_list):
        """Format metrics into compact string"""
        return ":".join(str(x) for x in metrics_list)

    code: |
      # Extract activity data from JSON
      activity_type = value.get("activity_type")
      if not activity_type:
        return None

      score = value.get("score", 0)
      timestamp = value.get("timestamp", 0)
      duration_ms = value.get("duration_ms", 0)

      # Get current metrics from optimized state store
      current_metrics_str = user_metrics_store.get(key)
      metrics = parse_metrics(current_metrics_str)

      # Update metrics efficiently - avoid object creation
      metrics[0] += 1  # total_count

      # Update activity-specific counters using index mapping
      activity_indices = {
        "login": 1, "page_view": 2, "click": 3, "purchase": 4, "logout": 5
      }

      if activity_type in activity_indices:
        metrics[activity_indices[activity_type]] += 1

      metrics[6] += score  # total_score

      # Store back in compact format
      user_metrics_store.put(key, format_metrics(metrics))

      # Calculate derived metrics efficiently
      avg_score = metrics[6] / metrics[0] if metrics[0] > 0 else 0

      # Return summary as JSON for better readability in Kowl UI
      result = {
        "user_id": key,
        "metrics": {
          "total_count": metrics[0],
          "login_count": metrics[1],
          "page_view_count": metrics[2],
          "click_count": metrics[3],
          "purchase_count": metrics[4],
          "logout_count": metrics[5],
          "total_score": metrics[6],
          "avg_score": round(avg_score, 2)
        },
        "last_activity": {
          "type": activity_type,
          "score": score,
          "duration_ms": duration_ms,
          "timestamp": timestamp
        }
      }

      # Log only significant changes to reduce I/O
      if metrics[0] % 10 == 0:  # Log every 10th event
        log.info("User {} metrics: {} total activities, avg score: {:.2f}", key, metrics[0], avg_score)

      return result

    expression: result if result else None
    resultType: json
    stores:
      - user_metrics_store

pipelines:
  optimized_state_pipeline:
    from: user_activity
    via:
      - type: mapValues
        mapper: update_user_metrics
      - type: filter
        if:
          expression: value is not None
    to:
      topic: user_metrics_summary
      keyType: string
      valueType: json

State store optimizations:

  • JSON input/output: Uses JSON for better debugging while maintaining compact string storage internally
  • Hybrid approach: JSON for input/output messages, compact strings for state storage efficiency
  • Increased cache: Configure larger cache sizes (50MB) for better performance
  • Direct field access: Extract JSON fields using value.get() instead of string parsing
  • Conditional logging: Log only significant changes to reduce I/O

Pipeline Optimization

This example demonstrates optimized pipeline design with early filtering and staged processing.

What it does:

  • Produces mixed quality events: Creates events with valid/invalid data, various priorities (high, low, spam), different event types for filtering tests
  • Filters early: Uses predicate function to immediately discard invalid events, spam, and bot traffic before expensive processing
  • Processes in stages: Stage 1 = lightweight enrichment (add status, extract fields), Stage 2 = heavy processing (complex calculations)
  • Separates concerns: Lightweight operations (field extraction) happen first, expensive operations (calculations) happen on filtered data only
  • Outputs progressively: filtered_events → enriched_events → final_processed_events, each stage adds more data while maintaining JSON readability
Mixed events producer (quality testing) - click to expand
# Producer for pipeline optimization demo - generates mixed quality events

functions:
  generate_mixed_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      event_types = ["valid_purchase", "valid_view", "spam", "invalid", "test", "bot_traffic"]
      priorities = ["high", "medium", "low", "spam"]
    code: |
      global event_counter, event_types, priorities

      event_counter += 1
      event_type = random.choice(event_types)
      priority = random.choice(priorities)

      # Mix of valid and invalid events to demonstrate filtering optimization
      is_valid = event_type.startswith("valid") and priority != "spam"

      # Create event data with quality indicators as JSON
      event_data = {
        "event_type": event_type,
        "priority": priority,
        "is_valid": is_valid,
        "score": random.randint(1, 1000),
        "timestamp": int(time.time() * 1000),
        "batch_id": event_counter // 100,  # Group events in batches
        "source": "pipeline_test"
      }

    expression: (f"event_{event_counter:06d}", event_data)
    resultType: (string, json)

producers:
  mixed_events_producer:
    generator: generate_mixed_events
    interval: 1s  # High frequency to test filtering efficiency
    to:
      topic: mixed_quality_events
      keyType: string
      valueType: json
Pipeline optimization processor (staged processing) - click to expand
# Processor demonstrating optimized pipeline design with early filtering

streams:
  mixed_quality_events:
    topic: mixed_quality_events
    keyType: string
    valueType: json

  filtered_events:
    topic: filtered_events
    keyType: string
    valueType: json

  enriched_events:
    topic: enriched_events
    keyType: string
    valueType: json

functions:
  early_filter:
    type: predicate
    code: |
      # Early filtering - extract minimal data and filter out unwanted events immediately
      event_type = value.get("event_type", "")
      priority = value.get("priority", "")
      is_valid = value.get("is_valid", False)

      # Filter out spam, invalid, and bot traffic early to reduce downstream processing
      return is_valid and event_type.startswith("valid") and priority != "spam"

    expression: result
    resultType: boolean

  lightweight_enricher:
    type: valueTransformer
    code: |
      # Lightweight enrichment - add only essential data
      event_type = value.get("event_type", "")
      priority = value.get("priority", "")
      score = value.get("score", 0)
      timestamp = value.get("timestamp", 0)

      # Add minimal enrichment data as JSON for better readability
      enriched = {
        "status": "ENRICHED",
        "event_type": event_type,
        "priority": priority,
        "score": score,
        "timestamp": timestamp,
        "batch_id": value.get("batch_id", 0),
        "source": value.get("source", "unknown")
      }

      return enriched

    expression: result if result else None
    resultType: json

  heavy_processor:
    type: valueTransformer
    globalCode: |
      # Expensive operations that should only run on filtered data
      def calculate_complex_score(event_type, priority, base_score):
        """Simulate expensive calculation"""
        multipliers = {"high": 3.0, "medium": 2.0, "low": 1.0}
        type_bonus = 50 if "purchase" in event_type else 10

        # Simulate complex computation
        complex_score = base_score * multipliers.get(priority, 1.0) + type_bonus

        # Additional expensive operations
        for i in range(100):  # Simulate computational overhead
          complex_score += (i % 5) * 0.1

        return round(complex_score, 2)

    code: |
      # Heavy processing - only runs on pre-filtered valid events
      event_type = value.get("event_type", "")
      priority = value.get("priority", "")
      base_score = value.get("score", 0)
      timestamp = value.get("timestamp", 0)

      # Expensive calculation only on filtered data
      complex_score = calculate_complex_score(event_type, priority, base_score)

      # Generate comprehensive result as JSON for better readability in Kowl UI
      result = {
        "status": "PROCESSED",
        "event_type": event_type,
        "priority": priority,
        "scores": {
          "base_score": base_score,
          "complex_score": complex_score
        },
        "timestamp": timestamp,
        "batch_id": value.get("batch_id", 0),
        "source": value.get("source", "unknown"),
        "processing_stage": "final"
      }

      log.info("Completed heavy processing for {}: score {:.2f}", event_type, complex_score)

      return result

    expression: result if result else None
    resultType: json

pipelines:
  # Stage 1: Early filtering to reduce data volume
  filter_stage:
    from: mixed_quality_events
    via:
      - type: filter
        if: early_filter  # Filter out 70-80% of events early
    to:
      topic: filtered_events
      keyType: string
      valueType: json

  # Stage 2: Lightweight enrichment on filtered data
  enrich_stage:
    from: filtered_events
    via:
      - type: mapValues
        mapper: lightweight_enricher
      - type: filter
        if:
          expression: value is not None
    to:
      topic: enriched_events
      keyType: string
      valueType: json

  # Stage 3: Heavy processing only on high-quality filtered data
  process_stage:
    from: enriched_events
    via:
      - type: mapValues
        mapper: heavy_processor
      - type: filter
        if:
          expression: value is not None
    to:
      topic: final_processed_events
      keyType: string
      valueType: json

Pipeline design principles:

  • Filter early: Remove unwanted data before expensive processing using JSON field checks
  • Staged processing: Separate lightweight and heavy operations into different stages
  • Efficient predicates: Use value.get() for fast field-based filtering

Serialization Optimization

This example shows efficient data format usage and minimal serialization overhead.

What it does:

  • Produces compact data: Creates events with numeric event_type_ids (1=view, 2=click, 3=purchase) instead of strings for efficiency
  • Uses lookup tables: Pre-computes event type mappings in globalCode for fast ID-to-name conversions without string operations
  • Filters by score: Early filtering discards events with score <10 to reduce processing volume
  • Processes by type: Applies different logic based on event_type_id (purchases get 10% value bonus + 20 score bonus)
Binary data producer (compact format) - click to expand
# Producer for serialization optimization demo - generates binary data for performance

functions:
  generate_binary_events:
    type: generator
    globalCode: |
      import random
      import time
      import struct
      event_counter = 0
      user_ids = list(range(1, 101))  # Numeric user IDs for binary encoding
    code: |
      global event_counter, user_ids

      event_counter += 1
      user_id = random.choice(user_ids)

      # Generate data for binary serialization comparison
      timestamp = int(time.time() * 1000)
      event_type_id = random.randint(1, 5)  # Encoded event types
      value = random.randint(1, 10000)
      score = random.randint(1, 100)

      # Create structured data in JSON format (better than compact strings for readability)
      # While JSON has more overhead than binary, it provides better debugging/monitoring
      event_data = {
        "user_id": user_id,
        "timestamp": timestamp,
        "event_type_id": event_type_id,
        "value": value,
        "score": score,
        "metadata": {
          "batch": event_counter // 50,
          "version": 1
        }
      }

    expression: (str(user_id), event_data)
    resultType: (string, json)

producers:
  binary_data_producer:
    generator: generate_binary_events
    interval: 1s  # High frequency for performance testing
    to:
      topic: binary_events
      keyType: string
      valueType: json
Serialization optimization processor (efficient parsing) - click to expand
# Processor demonstrating serialization optimization patterns

streams:
  binary_events:
    topic: binary_events
    keyType: string
    valueType: json

  optimized_events:
    topic: optimized_events
    keyType: string
    valueType: json

functions:
  efficient_binary_processor:
    type: valueTransformer
    globalCode: |
      # Lookup tables for efficient conversion (avoid string operations)
      EVENT_TYPES = {1: "view", 2: "click", 3: "purchase", 4: "search", 5: "logout"}

      def create_optimized_output(data, event_type):
        """Create optimized JSON output with structured data"""
        return {
          "processed_event": {
            "type": event_type,
            "user_id": data["user_id"],
            "value": data["value"],
            "score": data["score"],
            "timestamp": data["timestamp"]
          },
          "metadata": data.get("metadata", {}),
          "processing_info": {
            "optimized": True,
            "version": "v2"
          }
        }

    code: |
      # Extract data from JSON (more readable than binary parsing)
      user_id = value.get("user_id")
      timestamp = value.get("timestamp")
      event_type_id = value.get("event_type_id")
      current_value = value.get("value", 0)
      score = value.get("score", 0)

      # Early filtering based on score
      if score < 10:  # Filter low-quality events
        return None

      # Efficient processing with direct field access
      if event_type_id == 3:  # Purchase events
        # Apply purchase-specific logic
        processed_value = int(current_value * 1.1)  # 10% bonus
        final_score = min(score + 20, 100)  # Bonus capped at 100
      else:
        processed_value = current_value
        final_score = score

      # Get event type name for output
      event_type = EVENT_TYPES.get(event_type_id, "unknown")

      # Create optimized JSON output
      optimized_data = {
        "user_id": user_id,
        "timestamp": timestamp,
        "event_type_id": event_type_id,
        "value": processed_value,
        "score": final_score
      }

      result = create_optimized_output(optimized_data, event_type)

      return result

    expression: result if result else None
    resultType: json

  convert_to_readable:
    type: valueTransformer
    code: |
      # Convert optimized JSON to human-readable final format
      processed_event = value.get("processed_event", {})
      if not processed_event:
        return None

      event_type = processed_event.get("type", "unknown")
      user_id = processed_event.get("user_id")
      event_value = processed_event.get("value")
      score = processed_event.get("score")
      timestamp = processed_event.get("timestamp")

      # Create final readable JSON output for Kowl UI
      readable = {
        "summary": f"EVENT:{event_type} USER:{user_id} VALUE:{event_value} SCORE:{score}",
        "details": {
          "event_type": event_type,
          "user_id": user_id,
          "value": event_value,
          "score": score,
          "timestamp": timestamp
        },
        "metadata": value.get("metadata", {}),
        "processing_info": value.get("processing_info", {})
      }

      return readable

    expression: result if result else None
    resultType: json

pipelines:
  # High-performance processing pipeline
  binary_processing_pipeline:
    from: binary_events
    via:
      - type: mapValues
        mapper: efficient_binary_processor
      - type: filter
        if:
          expression: value is not None
    to:
      topic: optimized_events
      keyType: string
      valueType: json

  # Convert to readable format for final output
  readable_output_pipeline:
    from: optimized_events
    via:
      - type: mapValues
        mapper: convert_to_readable
      - type: filter
        if:
          expression: value is not None
    to:
      topic: readable_events
      keyType: string
      valueType: json

Serialization best practices:

  • Lookup tables: Pre-compute mappings to avoid repeated string operations
  • Progressive transformation: Transform data from raw → optimized → final readable formats
  • Field-based access: Use value.get() instead of string parsing for cleaner, faster code

Configuration Optimization

Kafka Streams Configuration

Optimize Kafka Streams configuration for your workload based on KSML's internal configuration system:

runner:
  type: streams
  config:
    application.id: optimized-ksml-app
    bootstrap.servers: kafka:9092

    # Threading Configuration
    num.stream.threads: 8                    # Parallel processing threads (default: 1)

    # State Store Caching
    cache.max.bytes.buffering: 104857600     # 100MB total cache (default: 10MB)
    commit.interval.ms: 30000                # Commit frequency in ms (default: 30000)

    # Topology Optimization
    topology.optimization: all               # Enable all optimizations (default: all)

    # Producer Performance Settings
    producer.linger.ms: 100                  # Wait for batching (default: 0)
    producer.batch.size: 16384               # Batch size in bytes (default: 16384)
    producer.buffer.memory: 33554432         # 32MB producer buffer (default: 32MB)
    producer.acks: 1                         # Acknowledgment level (1 = leader only)

    # Consumer Performance Settings
    consumer.fetch.max.bytes: 52428800       # 50MB max fetch (default: 52428800)
    consumer.max.poll.records: 500           # Records per poll (default: 500)
    consumer.session.timeout.ms: 45000       # Session timeout (default: 45000)
    consumer.heartbeat.interval.ms: 3000     # Heartbeat frequency (default: 3000)

    # Processing Guarantees
    processing.guarantee: at_least_once       # Options: at_least_once, exactly_once_v2

    # Error Handling
    default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
    default.production.exception.handler: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler

Key Configuration Details:

  • num.stream.threads: Controls parallelism. Set based on CPU cores and partition count
  • cache.max.bytes.buffering: KSML automatically enables caching when caching: true is set on stores
  • topology.optimization: KSML defaults to StreamsConfig.OPTIMIZE for performance
  • commit.interval.ms: Balance between throughput and latency. Higher values = better throughput
  • processing.guarantee: exactly_once_v2 provides stronger guarantees but lower performance

KSML-Specific Optimizations:

KSML automatically configures several optimizations:

  • Custom exception handlers for production and deserialization errors
  • Automatic header cleanup interceptors for all consumers
  • Optimized client suppliers for resolving configurations
  • Built-in metrics reporters with topology enrichment

State Store Configuration

Configure state stores for optimal performance based on KSML's store configuration system:

stores:
  # High-performance key-value store
  optimized_keyvalue_store:
    type: keyValue
    keyType: string
    valueType: string                # Use string over JSON for performance-critical stores
    persistent: true                 # Enable durability (uses RocksDB)
    caching: true                    # Enable caching for frequent access
    logging: true                    # Enable changelog for fault tolerance
    timestamped: false               # Disable if timestamps not needed
    versioned: false                 # Disable if versioning not needed

  # Memory-optimized store for temporary data
  temp_cache_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: false                # In-memory only for speed
    caching: true                    # Still enable caching
    logging: false                   # No changelog needed for temp data

  # Window store with retention
  metrics_window_store:
    type: window
    keyType: string
    valueType: json                  # JSON acceptable for metrics
    windowSize: PT5M                 # 5-minute windows
    retention: PT1H                  # Keep 1 hour of data
    persistent: true
    caching: true
    logging: true

  # Session store for user sessions  
  user_session_store:
    type: session
    keyType: string
    valueType: string                # Compact string format for sessions
    retention: PT30M                 # 30-minute session timeout
    persistent: true
    caching: true
    logging: true

Store Type Performance Characteristics:

Store Type Use Case Performance Notes
keyValue General caching, counters, lookups Fastest access, least memory overhead
window Time-based aggregations Automatic cleanup, good for time-series
session User sessions, activity tracking Session-aware, handles gaps automatically

Configuration Parameter Details:

  • persistent:

    • true: Uses RocksDB for disk persistence, survives restarts
    • false: In-memory only, faster but data lost on restart
  • caching:

    • true: KSML enables write caching to reduce downstream traffic
    • false: Direct writes, lower latency but higher network usage
  • logging:

    • true: Creates changelog topic for fault tolerance and exactly-once processing
    • false: No changelog, faster but no fault tolerance
  • timestamped:

    • true: Stores values with timestamps for time-aware processing
    • false: Plain values, more efficient when timestamps not needed
  • versioned:

    • true: Maintains multiple versions of values with configurable retention
    • false: Single version only, more memory efficient

KSML Store Optimizations:

KSML automatically optimizes store configuration:

  • Uses efficient Serde types based on configured data types
  • Applies proper supplier selection (persistent vs. in-memory)
  • Configures caching and logging based on specified options
  • Handles timestamped and versioned variants automatically

Python Function Best Practices

Optimize Code Structure

functions:
  optimized_function:
    type: valueTransformer
    globalCode: |
      # Pre-compute expensive operations
      MULTIPLIERS = {"high": 3.0, "medium": 2.0, "low": 1.0}

      def efficient_calculation(value, priority):
        return value * MULTIPLIERS.get(priority, 1.0)

    code: |
      # Use pre-computed values and avoid object creation
      priority = value.get("priority", "low")
      result = efficient_calculation(value.get("amount", 0), priority)
      return f"processed:{result:.2f}"

Key techniques:

  • Use globalCode: Pre-compute expensive operations outside the processing loop
  • Minimize object creation: Reuse objects and data structures when possible
  • Use built-in functions: They're typically faster than custom implementations
  • Avoid unnecessary computations: Compute values only when needed

Efficient Data Structures

Choose optimal data structures based on KSML's data type system and your use case:

KSML Data Types (Performance Ranking)

# Fastest - Use for high-throughput scenarios
keyType: string         # Most efficient for keys
valueType: string       # Fastest serialization

# Fast - Good balance of performance and functionality  
keyType: long          # Efficient numeric keys
valueType: avro:Schema # Efficient binary format with schema

# Moderate - Flexible but more overhead
keyType: json          # Flexible structure  
valueType: json        # Easy to work with, good for Kowl UI

# Slower - Use only when necessary
valueType: xml         # XML processing overhead
valueType: soap        # SOAP envelope overhead

Python Data Structure Guidelines

Optimal patterns for different scenarios:

  1. Fast counters/accumulators

    count = int(store.get(key) or "0")  # String storage, int operations
    count += 1
    store.put(key, str(count))
    

  2. Compact State Representation

    # Instead of JSON: {"count": 5, "sum": 100, "avg": 20}
    compact_state = f"{count}:{total_sum}:{average}"  # String format
    store.put(key, compact_state)
    

  3. Efficient Collections (limit size)

    items = json.loads(store.get(key) or "[]")
    items.append(new_item)
    items = items[-100:]  # Keep only last 100 items
    store.put(key, json.dumps(items))
    

  4. Fast Lookup Tables (use globalCode)

    STATUS_CODES = {1: "active", 2: "inactive", 3: "pending"}  # Pre-computed
    status = STATUS_CODES.get(status_id, "unknown")  # O(1) lookup
    

  5. Efficient String Operations

    # avoid
    Avoid: result = f"processed:{type}:{user}:{score}"
    # use this instead
    Use: result = "processed:" + type + ":" + user + ":" + str(score)
    

Memory-Efficient Patterns

Optimal memory usage techniques:

  1. Reuse Objects

    result = {"status": "ok", "count": 0}  # Create once in globalCode
    result["count"] = new_count            # Reuse, don't recreate
    

  2. Generator Patterns

    def process_batch(items):
        for item in items:  # Memory efficient iteration
            yield process_item(item)
    

  3. Lazy Evaluation

    expensive_result = None
    if condition_needs_it:  # Only compute when needed
        expensive_result = expensive_calculation()
    

  4. Compact Data Types

    # use this:
    timestamp = int(time.time())      # 4-8 bytes
    # instead of this
    timestamp = str(time.time())      # ~20 bytes + overhead
    

KSML-Specific Optimizations:

  • Serde Selection: KSML automatically chooses optimal serializers based on data type
  • Union Types: For flexible schemas, KSML uses efficient UnionSerde implementation
  • Type Flattening: Complex types are automatically flattened for performance
  • Caching: Serde instances are cached and reused across operations

Data Type Performance Comparison:

Data Type Serialization Speed Size Efficiency Schema Evolution Use Case
string Fastest Good Limited Simple values, IDs, compact formats
long/int Fastest Excellent None Counters, timestamps, numeric keys
avro Fast Excellent Excellent Complex schemas, production systems
json Moderate Good Good Development, debugging, flexible data
protobuf(coming soon) Fast Excellent Good High-performance, cross-language
xml/soap Slow Poor Limited Legacy systems, specific protocols

Monitoring Performance

Key Metrics to Track

Monitor these metrics to identify performance issues:

functions:
  performance_monitor:
    type: forEach
    code: |
      # Record processing time
      start_time = time.time()
      process_message(key, value)
      processing_time = (time.time() - start_time) * 1000

      # Log performance metrics periodically
      if processing_time > 100:  # Log slow operations
        log.warn("Slow processing detected: {:.2f}ms for key {}", processing_time, key)

Important metrics:

  • Processing latency: Time to process individual messages
  • Throughput: Messages processed per second
  • Memory usage: JVM heap utilization
  • State store size: Growth rate and total size
  • Error rates: Failed processing attempts

Best Practices Summary

Do's

  • Measure first: Establish baselines before optimizing
  • Filter early: Remove unwanted data as soon as possible
  • Use caching: Configure appropriate cache sizes for state stores
  • Optimize hot paths: Focus on frequently executed code
  • Pre-compute values: Move expensive operations to globalCode
  • Choose efficient formats: Use compact data representations

Don'ts

  • Don't over-optimize: Focus on actual bottlenecks, not theoretical ones
  • Don't ignore trade-offs: Balance throughput, latency, and resource usage
  • Don't optimize without measuring: Always measure the impact of changes
  • Don't create unnecessary objects: Reuse data structures when possible
  • Don't log excessively: Reduce I/O overhead from logging
  • Don't use complex serialization: Avoid JSON when simple formats suffice

Conclusion

Performance optimization in KSML involves a combination of efficient Python code, optimized configuration, and smart pipeline design. By applying these techniques systematically and measuring their impact, you can build KSML applications that efficiently process high volumes of data with consistent performance.

The key is to start with measurement, identify actual bottlenecks, and apply optimizations incrementally while monitoring their effectiveness.