Skip to content

Branching in KSML: Conditional Message Routing

What is Branching?

Branching in KSML allows you to split a stream into multiple paths based on conditions. Each branch can apply different transformations and route messages to different output topics. This is essential for building sophisticated data pipelines that need to handle different types of data or route messages based on business rules.

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_input && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic datacenter_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic warehouse_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic office_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic unknown_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic order_input && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic priority_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic regional_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic international_orders && \

Relationship to Kafka Streams

KSML branching is built on top of Kafka Streams' BranchedKStream functionality. When you define a branch section in KSML:

  1. KSML generates Kafka Streams code that calls KStream.split() to create a BranchedKStream
  2. Each branch condition becomes a predicate function passed to BranchedKStream.branch()
  3. Branch order matters - messages are evaluated against conditions in the order they're defined
  4. Default branches handle messages that don't match any specific condition

This abstraction allows you to define complex routing logic in YAML without writing Java code.

Basic Branching Syntax

The basic structure of branching in KSML:

pipelines:
  my_pipeline:
    from: input_stream
    via:
      # Optional transformations before branching
      - type: mapValues
        mapper: some_transformer
    branch:
      # Branch 1: Messages matching a condition
      - if: condition_predicate
        via:
          # Optional transformations for this branch
          - type: mapValues
            mapper: branch1_transformer
        to: output_topic_1

      # Branch 2: Another condition
      - if: another_predicate
        to: output_topic_2

      # Default branch: Everything else
      - to: default_output_topic

Example 1: Simple Content-Based Routing

Let's start with a basic example that routes sensor data based on location.

Producer Definition

Sensor Data Producer - click to expand
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json

# Producer for branching tutorial - generates sensor data from different locations

functions:
  generate_location_sensors:
    type: generator
    globalCode: |
      import random
      counter = 0
      locations = ["data_center", "warehouse", "office", "factory", "unknown_site"]
    code: |
      global counter, locations
      counter += 1

      # Generate sensor data with different locations
      sensor_data = {
        "sensor_id": f"sensor_{counter:03d}",
        "location": random.choice(locations),
        "temperature": round(random.uniform(15.0, 35.0), 1),
        "humidity": random.randint(30, 80),
        "timestamp": counter * 1000
      }

      key = f"sensor_{counter:03d}"
    expression: (key, sensor_data)
    resultType: (string, json)

producers:
  location_sensor_producer:
    generator: generate_location_sensors
    interval: 2s
    to:
      topic: sensor_input
      keyType: string
      valueType: json

Processor Definition

Location-Based Routing Processor - click to expand
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json

# Simple location-based routing example demonstrating basic branching

streams:
  sensor_input:
    topic: sensor_input
    keyType: string
    valueType: json

  datacenter_sensors:
    topic: datacenter_sensors
    keyType: string
    valueType: json

  warehouse_sensors:
    topic: warehouse_sensors
    keyType: string
    valueType: json

  office_sensors:
    topic: office_sensors
    keyType: string
    valueType: json

  unknown_sensors:
    topic: unknown_sensors
    keyType: string
    valueType: json

functions:
  is_datacenter:
    type: predicate
    expression: value.get("location") == "data_center"

  is_warehouse:
    type: predicate
    expression: value.get("location") == "warehouse"

  is_office:
    type: predicate
    expression: value.get("location") == "office"

pipelines:
  location_routing:
    from: sensor_input
    via:
      - type: peek
        forEach:
          code: |
            log.info("Processing sensor from: {}", value.get("location"))
    branch:
      - if: is_datacenter
        via:
          - type: peek
            forEach:
              code: |
                log.info("Routing data center sensor: {}", key)
        to: datacenter_sensors

      - if: is_warehouse
        via:
          - type: peek
            forEach:
              code: |
                log.info("Routing warehouse sensor: {}", key)
        to: warehouse_sensors

      - if: is_office
        via:
          - type: peek
            forEach:
              code: |
                log.info("Routing office sensor: {}", key)
        to: office_sensors

      # Default branch for unknown locations
      - via:
          - type: peek
            forEach:
              code: |
                log.warn("Unknown location sensor: {} from {}", key, value.get("location"))
        to: unknown_sensors

This example demonstrates:

  • Simple condition matching: Routes messages based on sensor location
  • Multiple branches: Separate paths for data center, warehouse, and office sensors
  • Default branch: Handles sensors from unknown locations
  • Logging: Each branch logs messages for monitoring

Expected Behavior:

  • Messages from "data_center" are routed to datacenter_sensors topic
  • Messages from "warehouse" are routed to warehouse_sensors topic
  • Messages from "office" are routed to office_sensors topic
  • All other locations are routed to unknown_sensors topic

Example 2: Multi-Condition Data Processing Pipeline

This example shows more complex branching with data transformation and business logic.

Producer Definition

Order Events Producer - click to expand
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json

# Producer for order processing branching example

functions:
  generate_orders:
    type: generator
    globalCode: |
      import random
      counter = 0
      regions = ["US", "EU", "APAC", "LATAM"]
      customer_types = ["premium", "standard", "basic"]
      products = ["laptop", "phone", "tablet", "monitor", "keyboard"]
    code: |
      global counter, regions, customer_types, products
      counter += 1

      # Generate diverse order data for branching examples
      order = {
        "order_id": f"order_{counter:04d}",
        "customer_id": f"customer_{random.randint(1, 100):03d}",
        "customer_type": random.choice(customer_types),
        "product": random.choice(products),
        "quantity": random.randint(1, 5),
        "unit_price": round(random.uniform(50.0, 2000.0), 2),
        "region": random.choice(regions),
        "timestamp": counter * 1000
      }

      # Calculate total
      order["total_amount"] = round(order["quantity"] * order["unit_price"], 2)

      # Add some order-specific flags
      if order["total_amount"] > 1000:
        order["high_value"] = True

      if order["region"] not in ["US", "EU"]:
        order["international"] = True

      key = f"order_{counter:04d}"
    expression: (key, order)
    resultType: (string, json)

producers:
  order_producer:
    generator: generate_orders
    interval: 3s
    to:
      topic: order_input
      keyType: string
      valueType: json

Processor Definition

Order Processing Pipeline - click to expand
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json

# Complex order processing pipeline demonstrating multi-condition branching

streams:
  order_input:
    topic: order_input
    keyType: string
    valueType: json

  priority_orders:
    topic: priority_orders
    keyType: string
    valueType: json

  regional_orders:
    topic: regional_orders
    keyType: string
    valueType: json

  international_orders:
    topic: international_orders
    keyType: string
    valueType: json

functions:
  is_priority:
    type: predicate
    code: |
      # Priority: Premium customers with high-value orders
      return (value.get("customer_type") == "premium" and 
              value.get("total_amount", 0) > 1000)

  is_regional:
    type: predicate
    code: |
      # Regional: US/EU orders that aren't priority
      return (value.get("region") in ["US", "EU"] and 
              not (value.get("customer_type") == "premium" and value.get("total_amount", 0) > 1000))

  is_international:
    type: predicate
    code: |
      # International: Non-US/EU regions
      return value.get("region") not in ["US", "EU"]

  add_priority_processing:
    type: valueTransformer
    code: |
      # Add priority processing metadata
      value["processing_tier"] = "priority"
      value["sla_hours"] = 4
      value["processed_at"] = "2025-01-01T00:00:00Z"
    expression: value
    resultType: json

  add_regional_processing:
    type: valueTransformer
    code: |
      # Add regional processing metadata
      value["processing_tier"] = "regional" 
      value["sla_hours"] = 24
      value["processed_at"] = "2025-01-01T00:00:00Z"
    expression: value
    resultType: json

  add_international_processing:
    type: valueTransformer
    code: |
      # Add international processing metadata
      value["processing_tier"] = "international"
      value["sla_hours"] = 72
      value["customs_required"] = True
      value["processed_at"] = "2025-01-01T00:00:00Z"
    expression: value
    resultType: json

pipelines:
  order_processing:
    from: order_input
    via:
      - type: peek
        forEach:
          code: |
            log.info("Processing order: {} (${} from {} customer in {})", 
                   value.get("order_id"), 
                   value.get("total_amount"),
                   value.get("customer_type"), 
                   value.get("region"))
    branch:
      # Branch 1: Priority orders (premium customers, high value)
      - if: is_priority
        via:
          - type: mapValues
            mapper: add_priority_processing
          - type: peek
            forEach:
              code: |
                log.info("PRIORITY: Order {} - ${} premium order", 
                       value.get("order_id"), value.get("total_amount"))
        to: priority_orders

      # Branch 2: Regional orders (US/EU, not priority)
      - if: is_regional
        via:
          - type: mapValues
            mapper: add_regional_processing
          - type: peek
            forEach:
              code: |
                log.info("REGIONAL: Order {} from {}", 
                       value.get("order_id"), value.get("region"))
        to: regional_orders

      # Branch 3: International orders  
      - if: is_international
        via:
          - type: mapValues
            mapper: add_international_processing
          - type: peek
            forEach:
              code: |
                log.info("INTERNATIONAL: Order {} from {} (customs required)", 
                       value.get("order_id"), value.get("region"))
        to: international_orders

This example demonstrates:

  • Complex conditions: Multiple criteria (order value, customer type, region)
  • Transformations per branch: Different processing logic for each order type
  • Business rules: Priority routing based on customer tier and order value
  • Data enrichment: Adding processing timestamps and status fields

Expected Behavior:

  • High-value orders (>$1000) from premium customers are routed to priority_orders topic
  • US/EU orders (not priority) are routed to regional_orders topic
  • International orders (APAC/LATAM) are routed to international_orders topic

Advanced Branching Patterns

Nested Branching

You can create nested branching by having branches that contain their own branching logic:

pipelines:
  nested_example:
    from: input_stream
    branch:
      - if: 
          expression: value.get("category") == "electronics"
        via:
          - type: transformValue
            mapper: enrich_electronics
        branch:
          - if:
              expression: value.get("price") > 500
            to: expensive_electronics
          - to: affordable_electronics

      - if:
          expression: value.get("category") == "clothing" 
        to: clothing_items

Multiple Output Topics

A single branch can route to multiple topics:

branch:
  - if: urgent_condition
    to:
      - urgent_processing
      - audit_log
      - notifications

Branch with Complex Via Operations

Branches can contain multiple transformation steps:

branch:
  - if: needs_processing
    via:
      - type: filter
        if: additional_validation
      - type: transformValue
        mapper: enrich_data
      - type: mapKey
        mapper: generate_new_key
      - type: peek
        forEach:
          code: |
            log.info("Processed: {}", value)
    to: processed_output

Error Handling in Branching

Branching is commonly used for error handling patterns:

functions:
  is_valid:
    type: predicate
    code: |
      try:
        # Validation logic
        required_fields = ["id", "timestamp", "data"]
        return all(field in value for field in required_fields)
      except:
        return False

  is_error:
    type: predicate
    expression: "'error' in value or value.get('status') == 'failed'"

pipelines:
  error_handling:
    from: input_stream
    via:
      - type: mapValues
        mapper: safe_processor  # This adds error info if processing fails
    branch:
      # Route errors to dead letter queue
      - if: is_error
        to: error_topic

      # Route valid data for further processing  
      - if: is_valid
        to: processed_data

      # Route everything else to manual review
      - to: manual_review

Best Practices

1. Order Matters

Branches are evaluated in the order they're defined. Place more specific conditions first:

branch:
  # Specific condition first
  - if:
      expression: value.get("priority") == "urgent" and value.get("amount") > 10000
    to: urgent_high_value

  # General condition second  
  - if:
      expression: value.get("priority") == "urgent"
    to: urgent_orders

  # Default branch last
  - to: standard_orders

2. Use Meaningful Branch Names

When working with complex branching, use the name attribute for clarity:

branch:
  - name: high_priority_branch
    if: high_priority_predicate
    to: priority_queue

3. Keep Conditions Simple

Complex logic should be in predicate functions, not inline expressions:

functions:
  is_complex_condition:
    type: predicate
    code: |
      # Complex business logic here
      return (value.get("score") > 85 and 
              value.get("region") in ["US", "EU"] and
              len(value.get("tags", [])) > 2)

branch:
  - if: is_complex_condition  # Clean and readable
    to: qualified_items

4. Handle All Cases

Always ensure messages have somewhere to go:

branch:
  - if: condition_a
    to: topic_a
  - if: condition_b  
    to: topic_b
  # Always include a default case
  - to: default_topic

5. Add Monitoring

Use peek operations to monitor branch behavior:

branch:
  - if: important_condition
    via:
      - type: peek
        forEach:
          code: |
            log.info("Important message routed: {}", key)
    to: important_topic

Performance Considerations

Predicate Function Performance

  • Keep predicate functions lightweight
  • Avoid expensive operations in conditions
  • Cache results when possible

Branch Count

  • Too many branches can impact performance
  • Consider using lookup tables for many conditions
  • Group similar conditions when possible

Memory Usage

  • Each branch creates a separate stream processing path
  • Monitor memory usage with many concurrent branches

Conclusion

Branching in KSML provides powerful conditional routing capabilities that map directly to Kafka Streams' branching functionality. It enables you to:

  • Route messages based on content, business rules, or data quality
  • Apply different transformations to different types of data
  • Implement error handling patterns like dead letter queues
  • Build complex pipelines with multiple processing paths

Next Steps