Error Handling and Recovery in KSML
This tutorial explores comprehensive strategies for handling errors and implementing recovery mechanisms in KSML applications, helping you build robust and resilient stream processing pipelines.
Prerequisites
Before starting this tutorial:
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic incoming_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic valid_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic invalid_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_readings && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic processed_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_errors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_requests && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic processed_payments && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic failed_payments && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic api_operations && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic successful_operations && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic failed_operations && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic service_requests && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic service_responses && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic circuit_events && \
Introduction to Error Handling
Error handling is critical for production stream processing applications. Common error scenarios include:
- Data Quality Issues: Invalid, malformed, or missing data
- External Service Failures: Network timeouts, API errors, service unavailability
- Resource Constraints: Memory limitations, disk space issues
- Business Rule Violations: Data that doesn't meet application requirements
- Transient Failures: Temporary network issues, rate limiting, service overload
Without proper error handling, these issues can cause application crashes, data loss, incorrect results, or inconsistent state.
Core Error Handling Patterns
1. Validation and Filtering
Proactively validate data and filter out problematic messages before they cause downstream errors.
Order Events Producer - click to expand
# Producer for validation example - generates orders with various quality issues
functions:
generate_orders:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
counter += 1
# Generate different order scenarios
if counter % 4 == 1:
# Valid order
order = {
"order_id": f"order_{counter}",
"customer_id": f"customer_{random.randint(1, 5)}",
"product_id": f"product_{random.randint(1, 3)}",
"quantity": random.randint(1, 5),
"price": round(random.uniform(10.0, 100.0), 2)
}
elif counter % 4 == 2:
# Missing required field
order = {
"order_id": f"order_{counter}",
"customer_id": f"customer_{random.randint(1, 5)}"
# Missing product_id, quantity, price
}
elif counter % 4 == 3:
# Invalid quantity
order = {
"order_id": f"order_{counter}",
"customer_id": f"customer_{random.randint(1, 5)}",
"product_id": f"product_{random.randint(1, 3)}",
"quantity": -1,
"price": round(random.uniform(10.0, 100.0), 2)
}
else:
# Malformed order
order = {"malformed": True}
key = f"order_{counter}"
expression: (key, order)
resultType: (string, json)
producers:
order_producer:
generator: generate_orders
interval: 2s
count: 16
to:
topic: incoming_orders
keyType: string
valueType: json
Validation and Filtering Processor - click to expand
# Validation and filtering example using branch operation
streams:
incoming_orders:
topic: incoming_orders
keyType: string
valueType: json
valid_orders:
topic: valid_orders
keyType: string
valueType: json
invalid_orders:
topic: invalid_orders
keyType: string
valueType: json
functions:
add_validation_status:
type: valueTransformer
code: |
value["status"] = "valid"
value["validated_at"] = key
expression: value
resultType: json
add_error_details:
type: valueTransformer
code: |
value["status"] = "invalid"
if "malformed" in value:
value["error_reason"] = "malformed_data"
elif "product_id" not in value:
value["error_reason"] = "missing_required_fields"
elif value.get("quantity", 0) <= 0:
value["error_reason"] = "invalid_quantity"
else:
value["error_reason"] = "validation_failed"
expression: value
resultType: json
pipelines:
validate_orders:
from: incoming_orders
via:
- type: peek
forEach:
code: |
log.info("Processing order: {}", key)
branch:
# Valid orders branch
- if:
expression: value and "malformed" not in value and "product_id" in value and value.get("quantity", 0) > 0
via:
- type: transformValue
mapper: add_validation_status
- type: peek
forEach:
code: |
log.info("VALID ORDER: {}", key)
to: valid_orders
# Invalid orders branch
- via:
- type: transformValue
mapper: add_error_details
- type: peek
forEach:
code: |
log.info("INVALID ORDER: {} - {}", key, value.get("error_reason"))
to: invalid_orders
What it does:
- Produces order events: Creates orders with varying quality - some valid with all fields, some missing product_id, some with invalid quantity, some marked as "malformed"
- Validates with branching: Uses branch operation with expression to check
value and "malformed" not in value and "product_id" in value and value.get("quantity", 0) > 0
- Routes by validity: Valid orders go to valid_orders topic with "valid" status, everything else goes to invalid_orders topic
- Categorizes errors: Adds specific error_reason (malformed_data, missing_required_fields, invalid_quantity, validation_failed) to invalid orders
- Logs decisions: Tracks processing with logging for valid/invalid determinations and specific error reasons for debugging
2. Try-Catch Error Handling
Use try-catch blocks in Python functions to handle exceptions gracefully and provide fallback behavior.
Sensor Data Producer - click to expand
# Producer for try-catch example - generates sensor data with various error conditions
functions:
generate_sensor_data:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
counter += 1
# Generate different sensor scenarios
if counter % 5 == 1:
# Normal reading
data = {
"sensor_id": f"sensor_{counter % 3 + 1}",
"temperature": round(random.uniform(20.0, 80.0), 2),
"humidity": round(random.uniform(30.0, 90.0), 2)
}
elif counter % 5 == 2:
# Invalid temperature (string instead of number)
data = {
"sensor_id": f"sensor_{counter % 3 + 1}",
"temperature": "invalid",
"humidity": round(random.uniform(30.0, 90.0), 2)
}
elif counter % 5 == 3:
# Missing humidity field
data = {
"sensor_id": f"sensor_{counter % 3 + 1}",
"temperature": round(random.uniform(20.0, 80.0), 2)
}
elif counter % 5 == 4:
# Extreme values that should trigger warnings
data = {
"sensor_id": f"sensor_{counter % 3 + 1}",
"temperature": round(random.uniform(100.0, 150.0), 2),
"humidity": round(random.uniform(95.0, 100.0), 2)
}
else:
# Null/None data
data = None
key = f"sensor_{counter % 3 + 1}"
expression: (key, data)
resultType: (string, json)
producers:
sensor_producer:
generator: generate_sensor_data
interval: 2s
count: 20
to:
topic: sensor_readings
keyType: string
valueType: json
Try-Catch Processor - click to expand
# Try-catch error handling example using exception handling in functions
streams:
sensor_readings:
topic: sensor_readings
keyType: string
valueType: json
processed_sensors:
topic: processed_sensors
keyType: string
valueType: json
sensor_errors:
topic: sensor_errors
keyType: string
valueType: json
functions:
process_sensor_reading:
type: valueTransformer
code: |
try:
# Attempt to process the sensor data
if value is None:
raise ValueError("Null sensor data")
sensor_id = value.get("sensor_id")
if not sensor_id:
raise ValueError("Missing sensor_id")
# Try to get and validate temperature
temp = value.get("temperature")
if temp is None:
raise ValueError("Missing temperature")
# Convert temperature to float (may raise ValueError)
temp_float = float(temp)
# Get humidity with default
humidity = float(value.get("humidity", 0))
# Create processed result
result = {
"sensor_id": sensor_id,
"temperature": temp_float,
"humidity": humidity,
"status": "normal" if temp_float < 100 else "warning",
"processed_at": sensor_id # Use sensor_id as timestamp marker
}
except (ValueError, TypeError) as e:
# Handle processing errors
result = {
"sensor_id": value.get("sensor_id", "unknown") if value else "unknown",
"error": str(e),
"status": "error",
"original_data": value,
"processed_at": key
}
expression: result
resultType: json
pipelines:
process_sensors:
from: sensor_readings
via:
- type: transformValue
mapper: process_sensor_reading
- type: peek
forEach:
code: |
status = value.get("status", "unknown")
sensor_id = value.get("sensor_id", "unknown")
if status == "error":
log.error("SENSOR ERROR - {}: {}", sensor_id, value.get("error"))
else:
log.info("SENSOR OK - {}: temp={}C", sensor_id, value.get("temperature"))
branch:
# Route successful processing
- if:
expression: value.get("status") != "error"
to: processed_sensors
# Route errors to error topic
- to: sensor_errors
What it does:
- Produces sensor data: Creates sensor readings with varying quality - some valid with all fields, some with missing sensor_id or temperature, some with non-numeric temperature values
- Handles with try-catch: Uses try-catch in valueTransformer to catch ValueError and TypeError exceptions during processing
- Processes safely: Attempts to convert temperature to float, validate sensor_id, set default humidity values within exception handling
- Creates error objects: When exceptions occur, returns error object with sensor_id, error message, status="error", original_data for debugging
- Routes by success: Uses branch to send successful processing to processed_sensors, errors to sensor_errors topic based on status field
3. Dead Letter Queue Pattern
Route messages that cannot be processed to dedicated error topics for later analysis or reprocessing.
Payment Requests Producer - click to expand
# Producer for dead letter queue example - generates payment requests with various conditions
functions:
generate_payment_requests:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
counter += 1
# Generate different payment scenarios
if counter % 6 == 1:
# Valid payment
payment = {
"payment_id": f"pay_{counter}",
"amount": round(random.uniform(10.0, 500.0), 2),
"currency": "USD",
"merchant_id": f"merchant_{random.randint(1, 3)}"
}
elif counter % 6 == 2:
# Invalid amount
payment = {
"payment_id": f"pay_{counter}",
"amount": -50.0,
"currency": "USD",
"merchant_id": f"merchant_{random.randint(1, 3)}"
}
elif counter % 6 == 3:
# Missing required fields
payment = {
"payment_id": f"pay_{counter}",
"amount": round(random.uniform(10.0, 500.0), 2)
}
elif counter % 6 == 4:
# Invalid currency
payment = {
"payment_id": f"pay_{counter}",
"amount": round(random.uniform(10.0, 500.0), 2),
"currency": "INVALID",
"merchant_id": f"merchant_{random.randint(1, 3)}"
}
elif counter % 6 == 5:
# Retry-eligible error (temporary network issue simulation)
payment = {
"payment_id": f"pay_{counter}",
"amount": round(random.uniform(10.0, 500.0), 2),
"currency": "USD",
"merchant_id": "temp_failure_merchant"
}
else:
# Completely malformed
payment = {"invalid": True, "data": counter}
key = f"payment_{counter}"
expression: (key, payment)
resultType: (string, json)
producers:
payment_producer:
generator: generate_payment_requests
interval: 2s
count: 18
to:
topic: payment_requests
keyType: string
valueType: json
Dead Letter Queue Processor - click to expand
# Dead letter queue pattern with retry logic for transient failures
streams:
payment_requests:
topic: payment_requests
keyType: string
valueType: json
processed_payments:
topic: processed_payments
keyType: string
valueType: json
failed_payments:
topic: failed_payments
keyType: string
valueType: json
functions:
process_payment:
type: valueTransformer
code: |
# Validate payment request
if value is None or "invalid" in value:
result = {
"payment_id": "unknown",
"status": "permanent_failure",
"error": "malformed_request",
"retry_eligible": False,
"original_request": value
}
elif value.get("amount", 0) <= 0:
result = {
"payment_id": value.get("payment_id", "unknown"),
"status": "permanent_failure",
"error": "invalid_amount",
"retry_eligible": False,
"original_request": value
}
elif "currency" not in value or "merchant_id" not in value:
result = {
"payment_id": value.get("payment_id", "unknown"),
"status": "permanent_failure",
"error": "missing_required_fields",
"retry_eligible": False,
"original_request": value
}
elif value.get("currency") not in ["USD", "EUR", "GBP"]:
result = {
"payment_id": value.get("payment_id", "unknown"),
"status": "permanent_failure",
"error": "unsupported_currency",
"retry_eligible": False,
"original_request": value
}
elif value.get("merchant_id") == "temp_failure_merchant":
# Simulate temporary network failure
result = {
"payment_id": value.get("payment_id", "unknown"),
"status": "temporary_failure",
"error": "network_timeout",
"retry_eligible": True,
"original_request": value
}
else:
# Process successful payment
result = {
"payment_id": value.get("payment_id"),
"amount": value.get("amount"),
"currency": value.get("currency"),
"merchant_id": value.get("merchant_id"),
"status": "processed",
"processed_at": key
}
expression: result
resultType: json
pipelines:
process_payments:
from: payment_requests
via:
- type: transformValue
mapper: process_payment
- type: peek
forEach:
code: |
status = value.get("status", "unknown")
payment_id = value.get("payment_id", "unknown")
if status == "processed":
log.info("PAYMENT SUCCESS - {}: ${}", payment_id, value.get("amount"))
elif status == "temporary_failure":
log.warn("PAYMENT RETRY - {}: {}", payment_id, value.get("error"))
else:
log.error("PAYMENT FAILED - {}: {}", payment_id, value.get("error"))
branch:
# Route successful payments
- if:
expression: value.get("status") == "processed"
to: processed_payments
# Route failed payments to dead letter queue
- to: failed_payments
What it does:
- Produces payment requests: Creates payment events with varying scenarios - some successful, some with insufficient funds, some with invalid cards, some with network issues
- Simulates processing: Mock payment processing with different failure types - permanent (invalid_card, insufficient_funds) vs transient (network_error, timeout)
- Classifies errors: Determines retry eligibility based on error type - network/timeout errors are retryable, invalid card/insufficient funds are permanent
- Enriches with context: Adds processing metadata, error classification, retry recommendations, original request data to failed messages
- Routes by result: Uses branch to send successful payments to processed_payments, failures to failed_payments topic with full error context
4. Retry Strategies with Exponential Backoff
Implement sophisticated retry logic for transient failures with exponential backoff and jitter.
API Operations Producer - click to expand
# Producer for retry strategies example - generates API operations with failure scenarios
functions:
generate_api_operations:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
counter += 1
# Generate different API operation scenarios
if counter % 5 == 1:
# Successful operation
operation = {
"operation_id": f"op_{counter}",
"api_endpoint": "/users/create",
"should_fail": False,
"retry_count": 0
}
elif counter % 5 == 2:
# Transient network error (retryable)
operation = {
"operation_id": f"op_{counter}",
"api_endpoint": "/users/update",
"should_fail": True,
"failure_type": "network_timeout",
"retry_count": 0
}
elif counter % 5 == 3:
# Rate limit error (retryable)
operation = {
"operation_id": f"op_{counter}",
"api_endpoint": "/data/fetch",
"should_fail": True,
"failure_type": "rate_limit",
"retry_count": 0
}
elif counter % 5 == 4:
# Permanent error (not retryable)
operation = {
"operation_id": f"op_{counter}",
"api_endpoint": "/users/delete",
"should_fail": True,
"failure_type": "not_found",
"retry_count": 0
}
else:
# Authentication error (not retryable)
operation = {
"operation_id": f"op_{counter}",
"api_endpoint": "/admin/config",
"should_fail": True,
"failure_type": "auth_failed",
"retry_count": 0
}
key = f"op_{counter}"
expression: (key, operation)
resultType: (string, json)
producers:
api_operation_producer:
generator: generate_api_operations
interval: 3s
count: 15
to:
topic: api_operations
keyType: string
valueType: json
Retry Strategies Processor - click to expand
# Retry strategies with exponential backoff for transient failures
streams:
api_operations:
topic: api_operations
keyType: string
valueType: json
successful_operations:
topic: successful_operations
keyType: string
valueType: json
failed_operations:
topic: failed_operations
keyType: string
valueType: json
functions:
process_with_retry:
type: valueTransformer
globalCode: |
import random
import math
MAX_RETRIES = 3
BASE_DELAY_MS = 1000
def calculate_backoff_delay(attempt):
# Exponential backoff: 1s, 2s, 4s
delay = BASE_DELAY_MS * (2 ** attempt)
# Add 20% jitter
jitter = delay * 0.2 * (random.random() * 2 - 1)
return int(delay + jitter)
code: |
operation_id = value.get("operation_id", "unknown")
retry_count = value.get("retry_count", 0)
should_fail = value.get("should_fail", False)
failure_type = value.get("failure_type", "")
# Determine if this operation should succeed or fail
if not should_fail:
# Success case
result = {
"operation_id": operation_id,
"api_endpoint": value.get("api_endpoint"),
"status": "success",
"retry_count": retry_count,
"final_attempt": True
}
else:
# Check if error is retryable
retryable_errors = ["network_timeout", "rate_limit", "server_error"]
is_retryable = failure_type in retryable_errors
if is_retryable and retry_count < MAX_RETRIES:
# Increment retry count and calculate delay
new_retry_count = retry_count + 1
backoff_delay = calculate_backoff_delay(new_retry_count - 1)
# Simulate: 30% chance of success after retry
if random.random() < 0.3:
result = {
"operation_id": operation_id,
"api_endpoint": value.get("api_endpoint"),
"status": "success_after_retry",
"retry_count": new_retry_count,
"backoff_delay": backoff_delay,
"final_attempt": True
}
else:
result = {
"operation_id": operation_id,
"api_endpoint": value.get("api_endpoint"),
"status": "retry_needed",
"retry_count": new_retry_count,
"failure_type": failure_type,
"backoff_delay": backoff_delay,
"final_attempt": new_retry_count >= MAX_RETRIES
}
else:
# Not retryable or max retries exceeded
result = {
"operation_id": operation_id,
"api_endpoint": value.get("api_endpoint"),
"status": "permanent_failure",
"retry_count": retry_count,
"failure_type": failure_type,
"final_attempt": True
}
expression: result
resultType: json
pipelines:
retry_processor:
from: api_operations
via:
- type: transformValue
mapper: process_with_retry
- type: peek
forEach:
code: |
status = value.get("status", "unknown")
op_id = value.get("operation_id", "unknown")
retry_count = value.get("retry_count", 0)
if status == "success":
log.info("API SUCCESS - {}: completed", op_id)
elif status == "success_after_retry":
log.info("API RETRY SUCCESS - {}: completed after {} retries", op_id, retry_count)
elif status == "retry_needed":
delay = value.get("backoff_delay", 0)
log.warn("API RETRY - {}: attempt {} failed, retry in {}ms", op_id, retry_count, delay)
else:
log.error("API FAILED - {}: {}", op_id, value.get("failure_type"))
branch:
# Route successful operations
- if:
expression: value.get("status") in ["success", "success_after_retry"]
to: successful_operations
# Route failed operations
- to: failed_operations
What it does:
- Produces API operations: Creates operations (fetch_user, update_profile, delete_account) with deliberate failures for some operations to test retry logic
- Simulates API calls: Mock external API with different failure scenarios (timeout, rate_limit, server_error) and success cases
- Implements retry logic: Calculates exponential backoff delays (1s, 2s, 4s, 8s) with added jitter to prevent retry storms
- Tracks attempts: Maintains retry count, determines retry eligibility based on error type and max attempts (3), stores original operation data
- Routes by outcome: Successful operations go to successful_operations, failed/exhausted retries go to failed_operations with retry history
5. Circuit Breaker Pattern
Prevent cascading failures by temporarily stopping calls to failing services, allowing them to recover.
Service Requests Producer - click to expand
# Producer for circuit breaker example - generates service requests with failure patterns
functions:
generate_service_requests:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
counter += 1
# Generate different service request patterns
if counter <= 5:
# Initial successful requests
request = {
"request_id": f"req_{counter}",
"service": "user_service",
"should_succeed": True
}
elif counter <= 10:
# Start failing to trigger circuit breaker
request = {
"request_id": f"req_{counter}",
"service": "user_service",
"should_succeed": False,
"error_type": "timeout"
}
elif counter <= 15:
# Continue with failures (circuit should be open)
request = {
"request_id": f"req_{counter}",
"service": "user_service",
"should_succeed": False,
"error_type": "connection_failed"
}
else:
# Service recovery - mix of success/failure
request = {
"request_id": f"req_{counter}",
"service": "user_service",
"should_succeed": random.choice([True, True, False]) # 66% success
}
key = f"req_{counter}"
expression: (key, request)
resultType: (string, json)
producers:
service_request_producer:
generator: generate_service_requests
interval: 2s
count: 20
to:
topic: service_requests
keyType: string
valueType: json
Circuit Breaker Processor - click to expand
# Circuit breaker pattern to prevent cascading failures
streams:
service_requests:
topic: service_requests
keyType: string
valueType: json
service_responses:
topic: service_responses
keyType: string
valueType: json
circuit_events:
topic: circuit_events
keyType: string
valueType: json
functions:
circuit_breaker_handler:
type: valueTransformer
globalCode: |
# Circuit breaker state
failure_count = 0
success_count = 0
circuit_state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
last_failure_time = 0
# Configuration
FAILURE_THRESHOLD = 3
SUCCESS_THRESHOLD = 2
TIMEOUT_MS = 10000 # 10 seconds
code: |
global failure_count, success_count, circuit_state, last_failure_time
import time
current_time = int(time.time() * 1000)
request_id = value.get("request_id", "unknown")
should_succeed = value.get("should_succeed", True)
# Check if we should transition from OPEN to HALF_OPEN
if circuit_state == "OPEN" and (current_time - last_failure_time) > TIMEOUT_MS:
circuit_state = "HALF_OPEN"
success_count = 0
# Handle request based on circuit state
if circuit_state == "OPEN":
# Circuit is open - reject request immediately
result = {
"request_id": request_id,
"status": "circuit_open",
"circuit_state": circuit_state,
"failure_count": failure_count,
"message": "Circuit breaker is OPEN - request rejected"
}
else:
# Circuit is CLOSED or HALF_OPEN - try to process request
if should_succeed:
# Request succeeds
success_count += 1
if circuit_state == "HALF_OPEN" and success_count >= SUCCESS_THRESHOLD:
# Reset circuit breaker
circuit_state = "CLOSED"
failure_count = 0
success_count = 0
result = {
"request_id": request_id,
"status": "success",
"circuit_state": circuit_state,
"success_count": success_count,
"failure_count": failure_count,
"service": value.get("service")
}
else:
# Request fails
failure_count += 1
success_count = 0
last_failure_time = current_time
if failure_count >= FAILURE_THRESHOLD:
circuit_state = "OPEN"
result = {
"request_id": request_id,
"status": "failure",
"circuit_state": circuit_state,
"failure_count": failure_count,
"error_type": value.get("error_type", "unknown"),
"service": value.get("service")
}
expression: result
resultType: json
pipelines:
circuit_breaker_processor:
from: service_requests
via:
- type: transformValue
mapper: circuit_breaker_handler
- type: peek
forEach:
code: |
status = value.get("status", "unknown")
circuit_state = value.get("circuit_state", "unknown")
req_id = value.get("request_id", "unknown")
if status == "circuit_open":
log.warn("CIRCUIT OPEN - {}: Request rejected", req_id)
elif status == "success":
log.info("REQUEST OK - {} [{}]: Success", req_id, circuit_state)
else:
failures = value.get("failure_count", 0)
log.error("REQUEST FAILED - {} [{}]: {} failures", req_id, circuit_state, failures)
branch:
# Route successful requests
- if:
expression: value.get("status") == "success"
to: service_responses
# Route circuit breaker events and failures
- to: circuit_events
What it does:
- Produces service requests: Creates requests with request_ids, some designed to fail to trigger circuit breaker state changes
- Tracks circuit state: Uses global variables (failure_count, circuit_state) to maintain CLOSED/OPEN/HALF_OPEN states across all requests
- Opens on failures: After 3 consecutive failures, circuit transitions from CLOSED to OPEN state to protect failing service
- Rejects when open: In OPEN state, immediately returns circuit_open status without attempting processing, showing current failure count
- Processes requests: In CLOSED state, simulates service calls with success/failure scenarios, updating failure counter and circuit state accordingly
Error Handling Best Practices
Data Type Recommendations
- Use JSON types: Provides flexibility for error objects and easy inspection in Kowl UI
- Include context: Add timestamps, retry counts, and error classifications to all error messages
- Preserve original data: Keep original messages in error objects for debugging
Function Design Patterns
- Handle null values: Always check for
None
/null
values explicitly - Use appropriate exceptions: Choose specific exception types for different error conditions
- Provide meaningful errors: Include context about what went wrong and potential solutions
- Log appropriately: Use different log levels (info/warn/error) based on severity
Monitoring and Alerting
- Track error rates: Monitor error percentages by type and set appropriate thresholds
- Circuit breaker metrics: Alert when circuits open and track recovery times
- Retry success rates: Measure effectiveness of retry strategies
- Dead letter queue size: Monitor unprocessable message volume
Testing Error Scenarios
- Simulate failures: Test error handling code with various failure scenarios
- Load testing: Ensure error handling works under high load conditions
- Recovery testing: Verify systems can recover from failure states
Error Pattern Selection Guide
Pattern | Use Case | Benefits | When to Use |
---|---|---|---|
Validation & Filtering | Data quality issues | Early detection, clear routing | Input data validation, format checking |
Try-Catch | Function-level errors | Graceful degradation | Type conversion, calculations, parsing |
Dead Letter Queue | Permanent failures | No data loss, failure analysis | Malformed data, business rule violations |
Retry Strategies | Transient failures | Fault tolerance, automatic recovery | Network timeouts, rate limits, temporary errors |
Circuit Breaker | External service failures | Prevents cascading failures | API calls, database connections, service dependencies |