Branching in KSML: Conditional Message Routing
What is Branching?
Branching in KSML allows you to split a stream into multiple paths based on conditions. Each branch can apply different transformations and route messages to different output topics. This is essential for building sophisticated data pipelines that need to handle different types of data or route messages based on business rules.
Prerequisites
Before starting this tutorial:
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_input && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic datacenter_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic warehouse_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic office_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic unknown_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic order_input && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic priority_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic regional_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic international_orders && \
Relationship to Kafka Streams
KSML branching is built on top of Kafka Streams' BranchedKStream
functionality. When you define a branch
section in KSML:
- KSML generates Kafka Streams code that calls
KStream.split()
to create aBranchedKStream
- Each branch condition becomes a predicate function passed to
BranchedKStream.branch()
- Branch order matters - messages are evaluated against conditions in the order they're defined
- Default branches handle messages that don't match any specific condition
This abstraction allows you to define complex routing logic in YAML without writing Java code.
Basic Branching Syntax
The basic structure of branching in KSML:
pipelines:
my_pipeline:
from: input_stream
via:
# Optional transformations before branching
- type: mapValues
mapper: some_transformer
branch:
# Branch 1: Messages matching a condition
- if: condition_predicate
via:
# Optional transformations for this branch
- type: mapValues
mapper: branch1_transformer
to: output_topic_1
# Branch 2: Another condition
- if: another_predicate
to: output_topic_2
# Default branch: Everything else
- to: default_output_topic
Example 1: Simple Content-Based Routing
Let's start with a basic example that routes sensor data based on location.
Producer Definition
Sensor Data Producer - click to expand
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json
# Producer for branching tutorial - generates sensor data from different locations
functions:
generate_location_sensors:
type: generator
globalCode: |
import random
counter = 0
locations = ["data_center", "warehouse", "office", "factory", "unknown_site"]
code: |
global counter, locations
counter += 1
# Generate sensor data with different locations
sensor_data = {
"sensor_id": f"sensor_{counter:03d}",
"location": random.choice(locations),
"temperature": round(random.uniform(15.0, 35.0), 1),
"humidity": random.randint(30, 80),
"timestamp": counter * 1000
}
key = f"sensor_{counter:03d}"
expression: (key, sensor_data)
resultType: (string, json)
producers:
location_sensor_producer:
generator: generate_location_sensors
interval: 2s
to:
topic: sensor_input
keyType: string
valueType: json
Processor Definition
Location-Based Routing Processor - click to expand
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json
# Simple location-based routing example demonstrating basic branching
streams:
sensor_input:
topic: sensor_input
keyType: string
valueType: json
datacenter_sensors:
topic: datacenter_sensors
keyType: string
valueType: json
warehouse_sensors:
topic: warehouse_sensors
keyType: string
valueType: json
office_sensors:
topic: office_sensors
keyType: string
valueType: json
unknown_sensors:
topic: unknown_sensors
keyType: string
valueType: json
functions:
is_datacenter:
type: predicate
expression: value.get("location") == "data_center"
is_warehouse:
type: predicate
expression: value.get("location") == "warehouse"
is_office:
type: predicate
expression: value.get("location") == "office"
pipelines:
location_routing:
from: sensor_input
via:
- type: peek
forEach:
code: |
log.info("Processing sensor from: {}", value.get("location"))
branch:
- if: is_datacenter
via:
- type: peek
forEach:
code: |
log.info("Routing data center sensor: {}", key)
to: datacenter_sensors
- if: is_warehouse
via:
- type: peek
forEach:
code: |
log.info("Routing warehouse sensor: {}", key)
to: warehouse_sensors
- if: is_office
via:
- type: peek
forEach:
code: |
log.info("Routing office sensor: {}", key)
to: office_sensors
# Default branch for unknown locations
- via:
- type: peek
forEach:
code: |
log.warn("Unknown location sensor: {} from {}", key, value.get("location"))
to: unknown_sensors
This example demonstrates:
- Simple condition matching: Routes messages based on sensor location
- Multiple branches: Separate paths for data center, warehouse, and office sensors
- Default branch: Handles sensors from unknown locations
- Logging: Each branch logs messages for monitoring
Expected Behavior:
- Messages from "data_center" are routed to
datacenter_sensors
topic - Messages from "warehouse" are routed to
warehouse_sensors
topic - Messages from "office" are routed to
office_sensors
topic - All other locations are routed to
unknown_sensors
topic
Example 2: Multi-Condition Data Processing Pipeline
This example shows more complex branching with data transformation and business logic.
Producer Definition
Order Events Producer - click to expand
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json
# Producer for order processing branching example
functions:
generate_orders:
type: generator
globalCode: |
import random
counter = 0
regions = ["US", "EU", "APAC", "LATAM"]
customer_types = ["premium", "standard", "basic"]
products = ["laptop", "phone", "tablet", "monitor", "keyboard"]
code: |
global counter, regions, customer_types, products
counter += 1
# Generate diverse order data for branching examples
order = {
"order_id": f"order_{counter:04d}",
"customer_id": f"customer_{random.randint(1, 100):03d}",
"customer_type": random.choice(customer_types),
"product": random.choice(products),
"quantity": random.randint(1, 5),
"unit_price": round(random.uniform(50.0, 2000.0), 2),
"region": random.choice(regions),
"timestamp": counter * 1000
}
# Calculate total
order["total_amount"] = round(order["quantity"] * order["unit_price"], 2)
# Add some order-specific flags
if order["total_amount"] > 1000:
order["high_value"] = True
if order["region"] not in ["US", "EU"]:
order["international"] = True
key = f"order_{counter:04d}"
expression: (key, order)
resultType: (string, json)
producers:
order_producer:
generator: generate_orders
interval: 3s
to:
topic: order_input
keyType: string
valueType: json
Processor Definition
Order Processing Pipeline - click to expand
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json
# Complex order processing pipeline demonstrating multi-condition branching
streams:
order_input:
topic: order_input
keyType: string
valueType: json
priority_orders:
topic: priority_orders
keyType: string
valueType: json
regional_orders:
topic: regional_orders
keyType: string
valueType: json
international_orders:
topic: international_orders
keyType: string
valueType: json
functions:
is_priority:
type: predicate
code: |
# Priority: Premium customers with high-value orders
return (value.get("customer_type") == "premium" and
value.get("total_amount", 0) > 1000)
is_regional:
type: predicate
code: |
# Regional: US/EU orders that aren't priority
return (value.get("region") in ["US", "EU"] and
not (value.get("customer_type") == "premium" and value.get("total_amount", 0) > 1000))
is_international:
type: predicate
code: |
# International: Non-US/EU regions
return value.get("region") not in ["US", "EU"]
add_priority_processing:
type: valueTransformer
code: |
# Add priority processing metadata
value["processing_tier"] = "priority"
value["sla_hours"] = 4
value["processed_at"] = "2025-01-01T00:00:00Z"
expression: value
resultType: json
add_regional_processing:
type: valueTransformer
code: |
# Add regional processing metadata
value["processing_tier"] = "regional"
value["sla_hours"] = 24
value["processed_at"] = "2025-01-01T00:00:00Z"
expression: value
resultType: json
add_international_processing:
type: valueTransformer
code: |
# Add international processing metadata
value["processing_tier"] = "international"
value["sla_hours"] = 72
value["customs_required"] = True
value["processed_at"] = "2025-01-01T00:00:00Z"
expression: value
resultType: json
pipelines:
order_processing:
from: order_input
via:
- type: peek
forEach:
code: |
log.info("Processing order: {} (${} from {} customer in {})",
value.get("order_id"),
value.get("total_amount"),
value.get("customer_type"),
value.get("region"))
branch:
# Branch 1: Priority orders (premium customers, high value)
- if: is_priority
via:
- type: mapValues
mapper: add_priority_processing
- type: peek
forEach:
code: |
log.info("PRIORITY: Order {} - ${} premium order",
value.get("order_id"), value.get("total_amount"))
to: priority_orders
# Branch 2: Regional orders (US/EU, not priority)
- if: is_regional
via:
- type: mapValues
mapper: add_regional_processing
- type: peek
forEach:
code: |
log.info("REGIONAL: Order {} from {}",
value.get("order_id"), value.get("region"))
to: regional_orders
# Branch 3: International orders
- if: is_international
via:
- type: mapValues
mapper: add_international_processing
- type: peek
forEach:
code: |
log.info("INTERNATIONAL: Order {} from {} (customs required)",
value.get("order_id"), value.get("region"))
to: international_orders
This example demonstrates:
- Complex conditions: Multiple criteria (order value, customer type, region)
- Transformations per branch: Different processing logic for each order type
- Business rules: Priority routing based on customer tier and order value
- Data enrichment: Adding processing timestamps and status fields
Expected Behavior:
- High-value orders (>$1000) from premium customers are routed to
priority_orders
topic - US/EU orders (not priority) are routed to
regional_orders
topic - International orders (APAC/LATAM) are routed to
international_orders
topic
Advanced Branching Patterns
Nested Branching
You can create nested branching by having branches that contain their own branching logic:
pipelines:
nested_example:
from: input_stream
branch:
- if:
expression: value.get("category") == "electronics"
via:
- type: transformValue
mapper: enrich_electronics
branch:
- if:
expression: value.get("price") > 500
to: expensive_electronics
- to: affordable_electronics
- if:
expression: value.get("category") == "clothing"
to: clothing_items
Multiple Output Topics
A single branch can route to multiple topics:
Branch with Complex Via Operations
Branches can contain multiple transformation steps:
branch:
- if: needs_processing
via:
- type: filter
if: additional_validation
- type: transformValue
mapper: enrich_data
- type: mapKey
mapper: generate_new_key
- type: peek
forEach:
code: |
log.info("Processed: {}", value)
to: processed_output
Error Handling in Branching
Branching is commonly used for error handling patterns:
functions:
is_valid:
type: predicate
code: |
try:
# Validation logic
required_fields = ["id", "timestamp", "data"]
return all(field in value for field in required_fields)
except:
return False
is_error:
type: predicate
expression: "'error' in value or value.get('status') == 'failed'"
pipelines:
error_handling:
from: input_stream
via:
- type: mapValues
mapper: safe_processor # This adds error info if processing fails
branch:
# Route errors to dead letter queue
- if: is_error
to: error_topic
# Route valid data for further processing
- if: is_valid
to: processed_data
# Route everything else to manual review
- to: manual_review
Best Practices
1. Order Matters
Branches are evaluated in the order they're defined. Place more specific conditions first:
branch:
# Specific condition first
- if:
expression: value.get("priority") == "urgent" and value.get("amount") > 10000
to: urgent_high_value
# General condition second
- if:
expression: value.get("priority") == "urgent"
to: urgent_orders
# Default branch last
- to: standard_orders
2. Use Meaningful Branch Names
When working with complex branching, use the name attribute for clarity:
3. Keep Conditions Simple
Complex logic should be in predicate functions, not inline expressions:
functions:
is_complex_condition:
type: predicate
code: |
# Complex business logic here
return (value.get("score") > 85 and
value.get("region") in ["US", "EU"] and
len(value.get("tags", [])) > 2)
branch:
- if: is_complex_condition # Clean and readable
to: qualified_items
4. Handle All Cases
Always ensure messages have somewhere to go:
branch:
- if: condition_a
to: topic_a
- if: condition_b
to: topic_b
# Always include a default case
- to: default_topic
5. Add Monitoring
Use peek
operations to monitor branch behavior:
branch:
- if: important_condition
via:
- type: peek
forEach:
code: |
log.info("Important message routed: {}", key)
to: important_topic
Performance Considerations
Predicate Function Performance
- Keep predicate functions lightweight
- Avoid expensive operations in conditions
- Cache results when possible
Branch Count
- Too many branches can impact performance
- Consider using lookup tables for many conditions
- Group similar conditions when possible
Memory Usage
- Each branch creates a separate stream processing path
- Monitor memory usage with many concurrent branches
Conclusion
Branching in KSML provides powerful conditional routing capabilities that map directly to Kafka Streams' branching functionality. It enables you to:
- Route messages based on content, business rules, or data quality
- Apply different transformations to different types of data
- Implement error handling patterns like dead letter queues
- Build complex pipelines with multiple processing paths