Skip to content

Error Handling and Recovery in KSML

This tutorial explores comprehensive strategies for handling errors and implementing recovery mechanisms in KSML applications, helping you build robust and resilient stream processing pipelines.

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic incoming_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic valid_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic invalid_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_readings && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic processed_sensors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_errors && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic payment_requests && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic processed_payments && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic failed_payments && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic api_operations && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic successful_operations && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic failed_operations && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic service_requests && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic service_responses && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic circuit_events && \

Introduction to Error Handling

Error handling is critical for production stream processing applications. Common error scenarios include:

  • Data Quality Issues: Invalid, malformed, or missing data
  • External Service Failures: Network timeouts, API errors, service unavailability
  • Resource Constraints: Memory limitations, disk space issues
  • Business Rule Violations: Data that doesn't meet application requirements
  • Transient Failures: Temporary network issues, rate limiting, service overload

Without proper error handling, these issues can cause application crashes, data loss, incorrect results, or inconsistent state.

Core Error Handling Patterns

1. Validation and Filtering

Proactively validate data and filter out problematic messages before they cause downstream errors.

Order Events Producer - click to expand
# Producer for validation example - generates orders with various quality issues

functions:
  generate_orders:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter
      counter += 1

      # Generate different order scenarios
      if counter % 4 == 1:
        # Valid order
        order = {
          "order_id": f"order_{counter}",
          "customer_id": f"customer_{random.randint(1, 5)}",
          "product_id": f"product_{random.randint(1, 3)}",
          "quantity": random.randint(1, 5),
          "price": round(random.uniform(10.0, 100.0), 2)
        }
      elif counter % 4 == 2:
        # Missing required field
        order = {
          "order_id": f"order_{counter}",
          "customer_id": f"customer_{random.randint(1, 5)}"
          # Missing product_id, quantity, price
        }
      elif counter % 4 == 3:
        # Invalid quantity
        order = {
          "order_id": f"order_{counter}",
          "customer_id": f"customer_{random.randint(1, 5)}",
          "product_id": f"product_{random.randint(1, 3)}",
          "quantity": -1,
          "price": round(random.uniform(10.0, 100.0), 2)
        }
      else:
        # Malformed order
        order = {"malformed": True}

      key = f"order_{counter}"
    expression: (key, order)
    resultType: (string, json)

producers:
  order_producer:
    generator: generate_orders
    interval: 2s
    count: 16
    to:
      topic: incoming_orders
      keyType: string
      valueType: json
Validation and Filtering Processor - click to expand
# Validation and filtering example using branch operation

streams:
  incoming_orders:
    topic: incoming_orders
    keyType: string
    valueType: json

  valid_orders:
    topic: valid_orders
    keyType: string
    valueType: json

  invalid_orders:
    topic: invalid_orders
    keyType: string
    valueType: json

functions:
  add_validation_status:
    type: valueTransformer
    code: |
      value["status"] = "valid"
      value["validated_at"] = key
    expression: value
    resultType: json

  add_error_details:
    type: valueTransformer
    code: |
      value["status"] = "invalid"
      if "malformed" in value:
        value["error_reason"] = "malformed_data"
      elif "product_id" not in value:
        value["error_reason"] = "missing_required_fields"
      elif value.get("quantity", 0) <= 0:
        value["error_reason"] = "invalid_quantity"
      else:
        value["error_reason"] = "validation_failed"
    expression: value
    resultType: json

pipelines:
  validate_orders:
    from: incoming_orders
    via:
      - type: peek
        forEach:
          code: |
            log.info("Processing order: {}", key)
    branch:
      # Valid orders branch
      - if:
          expression: value and "malformed" not in value and "product_id" in value and value.get("quantity", 0) > 0
        via:
          - type: transformValue
            mapper: add_validation_status
          - type: peek
            forEach:
              code: |
                log.info("VALID ORDER: {}", key)
        to: valid_orders
      # Invalid orders branch  
      - via:
          - type: transformValue
            mapper: add_error_details
          - type: peek
            forEach:
              code: |
                log.info("INVALID ORDER: {} - {}", key, value.get("error_reason"))
        to: invalid_orders

What it does:

  • Produces order events: Creates orders with varying quality - some valid with all fields, some missing product_id, some with invalid quantity, some marked as "malformed"
  • Validates with branching: Uses branch operation with expression to check value and "malformed" not in value and "product_id" in value and value.get("quantity", 0) > 0
  • Routes by validity: Valid orders go to valid_orders topic with "valid" status, everything else goes to invalid_orders topic
  • Categorizes errors: Adds specific error_reason (malformed_data, missing_required_fields, invalid_quantity, validation_failed) to invalid orders
  • Logs decisions: Tracks processing with logging for valid/invalid determinations and specific error reasons for debugging

2. Try-Catch Error Handling

Use try-catch blocks in Python functions to handle exceptions gracefully and provide fallback behavior.

Sensor Data Producer - click to expand
# Producer for try-catch example - generates sensor data with various error conditions

functions:
  generate_sensor_data:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter
      counter += 1

      # Generate different sensor scenarios
      if counter % 5 == 1:
        # Normal reading
        data = {
          "sensor_id": f"sensor_{counter % 3 + 1}",
          "temperature": round(random.uniform(20.0, 80.0), 2),
          "humidity": round(random.uniform(30.0, 90.0), 2)
        }
      elif counter % 5 == 2:
        # Invalid temperature (string instead of number)
        data = {
          "sensor_id": f"sensor_{counter % 3 + 1}",
          "temperature": "invalid",
          "humidity": round(random.uniform(30.0, 90.0), 2)
        }
      elif counter % 5 == 3:
        # Missing humidity field
        data = {
          "sensor_id": f"sensor_{counter % 3 + 1}",
          "temperature": round(random.uniform(20.0, 80.0), 2)
        }
      elif counter % 5 == 4:
        # Extreme values that should trigger warnings
        data = {
          "sensor_id": f"sensor_{counter % 3 + 1}",
          "temperature": round(random.uniform(100.0, 150.0), 2),
          "humidity": round(random.uniform(95.0, 100.0), 2)
        }
      else:
        # Null/None data
        data = None

      key = f"sensor_{counter % 3 + 1}"
    expression: (key, data)
    resultType: (string, json)

producers:
  sensor_producer:
    generator: generate_sensor_data
    interval: 2s
    count: 20
    to:
      topic: sensor_readings
      keyType: string
      valueType: json
Try-Catch Processor - click to expand
# Try-catch error handling example using exception handling in functions

streams:
  sensor_readings:
    topic: sensor_readings
    keyType: string
    valueType: json

  processed_sensors:
    topic: processed_sensors
    keyType: string
    valueType: json

  sensor_errors:
    topic: sensor_errors
    keyType: string
    valueType: json

functions:
  process_sensor_reading:
    type: valueTransformer
    code: |
      try:
        # Attempt to process the sensor data
        if value is None:
          raise ValueError("Null sensor data")

        sensor_id = value.get("sensor_id")
        if not sensor_id:
          raise ValueError("Missing sensor_id")

        # Try to get and validate temperature
        temp = value.get("temperature")
        if temp is None:
          raise ValueError("Missing temperature")

        # Convert temperature to float (may raise ValueError)
        temp_float = float(temp)

        # Get humidity with default
        humidity = float(value.get("humidity", 0))

        # Create processed result
        result = {
          "sensor_id": sensor_id,
          "temperature": temp_float,
          "humidity": humidity,
          "status": "normal" if temp_float < 100 else "warning",
          "processed_at": sensor_id  # Use sensor_id as timestamp marker
        }

      except (ValueError, TypeError) as e:
        # Handle processing errors
        result = {
          "sensor_id": value.get("sensor_id", "unknown") if value else "unknown",
          "error": str(e),
          "status": "error",
          "original_data": value,
          "processed_at": key
        }

    expression: result
    resultType: json

pipelines:
  process_sensors:
    from: sensor_readings
    via:
      - type: transformValue
        mapper: process_sensor_reading
      - type: peek
        forEach:
          code: |
            status = value.get("status", "unknown")
            sensor_id = value.get("sensor_id", "unknown")
            if status == "error":
              log.error("SENSOR ERROR - {}: {}", sensor_id, value.get("error"))
            else:
              log.info("SENSOR OK - {}: temp={}C", sensor_id, value.get("temperature"))
    branch:
      # Route successful processing
      - if:
          expression: value.get("status") != "error"
        to: processed_sensors
      # Route errors to error topic
      - to: sensor_errors

What it does:

  • Produces sensor data: Creates sensor readings with varying quality - some valid with all fields, some with missing sensor_id or temperature, some with non-numeric temperature values
  • Handles with try-catch: Uses try-catch in valueTransformer to catch ValueError and TypeError exceptions during processing
  • Processes safely: Attempts to convert temperature to float, validate sensor_id, set default humidity values within exception handling
  • Creates error objects: When exceptions occur, returns error object with sensor_id, error message, status="error", original_data for debugging
  • Routes by success: Uses branch to send successful processing to processed_sensors, errors to sensor_errors topic based on status field

3. Dead Letter Queue Pattern

Route messages that cannot be processed to dedicated error topics for later analysis or reprocessing.

Payment Requests Producer - click to expand
# Producer for dead letter queue example - generates payment requests with various conditions

functions:
  generate_payment_requests:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter
      counter += 1

      # Generate different payment scenarios
      if counter % 6 == 1:
        # Valid payment
        payment = {
          "payment_id": f"pay_{counter}",
          "amount": round(random.uniform(10.0, 500.0), 2),
          "currency": "USD",
          "merchant_id": f"merchant_{random.randint(1, 3)}"
        }
      elif counter % 6 == 2:
        # Invalid amount
        payment = {
          "payment_id": f"pay_{counter}",
          "amount": -50.0,
          "currency": "USD",
          "merchant_id": f"merchant_{random.randint(1, 3)}"
        }
      elif counter % 6 == 3:
        # Missing required fields
        payment = {
          "payment_id": f"pay_{counter}",
          "amount": round(random.uniform(10.0, 500.0), 2)
        }
      elif counter % 6 == 4:
        # Invalid currency
        payment = {
          "payment_id": f"pay_{counter}",
          "amount": round(random.uniform(10.0, 500.0), 2),
          "currency": "INVALID",
          "merchant_id": f"merchant_{random.randint(1, 3)}"
        }
      elif counter % 6 == 5:
        # Retry-eligible error (temporary network issue simulation)
        payment = {
          "payment_id": f"pay_{counter}",
          "amount": round(random.uniform(10.0, 500.0), 2),
          "currency": "USD",
          "merchant_id": "temp_failure_merchant"
        }
      else:
        # Completely malformed
        payment = {"invalid": True, "data": counter}

      key = f"payment_{counter}"
    expression: (key, payment)
    resultType: (string, json)

producers:
  payment_producer:
    generator: generate_payment_requests
    interval: 2s
    count: 18
    to:
      topic: payment_requests
      keyType: string
      valueType: json
Dead Letter Queue Processor - click to expand
# Dead letter queue pattern with retry logic for transient failures

streams:
  payment_requests:
    topic: payment_requests
    keyType: string
    valueType: json

  processed_payments:
    topic: processed_payments
    keyType: string
    valueType: json

  failed_payments:
    topic: failed_payments
    keyType: string
    valueType: json

functions:
  process_payment:
    type: valueTransformer
    code: |
      # Validate payment request
      if value is None or "invalid" in value:
        result = {
          "payment_id": "unknown",
          "status": "permanent_failure",
          "error": "malformed_request",
          "retry_eligible": False,
          "original_request": value
        }
      elif value.get("amount", 0) <= 0:
        result = {
          "payment_id": value.get("payment_id", "unknown"),
          "status": "permanent_failure", 
          "error": "invalid_amount",
          "retry_eligible": False,
          "original_request": value
        }
      elif "currency" not in value or "merchant_id" not in value:
        result = {
          "payment_id": value.get("payment_id", "unknown"),
          "status": "permanent_failure",
          "error": "missing_required_fields",
          "retry_eligible": False,
          "original_request": value
        }
      elif value.get("currency") not in ["USD", "EUR", "GBP"]:
        result = {
          "payment_id": value.get("payment_id", "unknown"),
          "status": "permanent_failure",
          "error": "unsupported_currency",
          "retry_eligible": False,
          "original_request": value
        }
      elif value.get("merchant_id") == "temp_failure_merchant":
        # Simulate temporary network failure
        result = {
          "payment_id": value.get("payment_id", "unknown"),
          "status": "temporary_failure",
          "error": "network_timeout",
          "retry_eligible": True,
          "original_request": value
        }
      else:
        # Process successful payment
        result = {
          "payment_id": value.get("payment_id"),
          "amount": value.get("amount"),
          "currency": value.get("currency"),
          "merchant_id": value.get("merchant_id"),
          "status": "processed",
          "processed_at": key
        }

    expression: result
    resultType: json

pipelines:
  process_payments:
    from: payment_requests
    via:
      - type: transformValue
        mapper: process_payment
      - type: peek
        forEach:
          code: |
            status = value.get("status", "unknown")
            payment_id = value.get("payment_id", "unknown")
            if status == "processed":
              log.info("PAYMENT SUCCESS - {}: ${}", payment_id, value.get("amount"))
            elif status == "temporary_failure":
              log.warn("PAYMENT RETRY - {}: {}", payment_id, value.get("error"))
            else:
              log.error("PAYMENT FAILED - {}: {}", payment_id, value.get("error"))
    branch:
      # Route successful payments
      - if:
          expression: value.get("status") == "processed"
        to: processed_payments
      # Route failed payments to dead letter queue
      - to: failed_payments

What it does:

  • Produces payment requests: Creates payment events with varying scenarios - some successful, some with insufficient funds, some with invalid cards, some with network issues
  • Simulates processing: Mock payment processing with different failure types - permanent (invalid_card, insufficient_funds) vs transient (network_error, timeout)
  • Classifies errors: Determines retry eligibility based on error type - network/timeout errors are retryable, invalid card/insufficient funds are permanent
  • Enriches with context: Adds processing metadata, error classification, retry recommendations, original request data to failed messages
  • Routes by result: Uses branch to send successful payments to processed_payments, failures to failed_payments topic with full error context

4. Retry Strategies with Exponential Backoff

Implement sophisticated retry logic for transient failures with exponential backoff and jitter.

API Operations Producer - click to expand
# Producer for retry strategies example - generates API operations with failure scenarios

functions:
  generate_api_operations:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter
      counter += 1

      # Generate different API operation scenarios
      if counter % 5 == 1:
        # Successful operation
        operation = {
          "operation_id": f"op_{counter}",
          "api_endpoint": "/users/create",
          "should_fail": False,
          "retry_count": 0
        }
      elif counter % 5 == 2:
        # Transient network error (retryable)
        operation = {
          "operation_id": f"op_{counter}",
          "api_endpoint": "/users/update", 
          "should_fail": True,
          "failure_type": "network_timeout",
          "retry_count": 0
        }
      elif counter % 5 == 3:
        # Rate limit error (retryable)
        operation = {
          "operation_id": f"op_{counter}",
          "api_endpoint": "/data/fetch",
          "should_fail": True,
          "failure_type": "rate_limit",
          "retry_count": 0
        }
      elif counter % 5 == 4:
        # Permanent error (not retryable)
        operation = {
          "operation_id": f"op_{counter}",
          "api_endpoint": "/users/delete",
          "should_fail": True,
          "failure_type": "not_found",
          "retry_count": 0
        }
      else:
        # Authentication error (not retryable)
        operation = {
          "operation_id": f"op_{counter}",
          "api_endpoint": "/admin/config",
          "should_fail": True,
          "failure_type": "auth_failed",
          "retry_count": 0
        }

      key = f"op_{counter}"
    expression: (key, operation)
    resultType: (string, json)

producers:
  api_operation_producer:
    generator: generate_api_operations
    interval: 3s
    count: 15
    to:
      topic: api_operations
      keyType: string
      valueType: json
Retry Strategies Processor - click to expand
# Retry strategies with exponential backoff for transient failures

streams:
  api_operations:
    topic: api_operations
    keyType: string
    valueType: json

  successful_operations:
    topic: successful_operations
    keyType: string
    valueType: json

  failed_operations:
    topic: failed_operations
    keyType: string
    valueType: json

functions:
  process_with_retry:
    type: valueTransformer
    globalCode: |
      import random
      import math

      MAX_RETRIES = 3
      BASE_DELAY_MS = 1000

      def calculate_backoff_delay(attempt):
        # Exponential backoff: 1s, 2s, 4s
        delay = BASE_DELAY_MS * (2 ** attempt)
        # Add 20% jitter
        jitter = delay * 0.2 * (random.random() * 2 - 1)
        return int(delay + jitter)

    code: |
      operation_id = value.get("operation_id", "unknown")
      retry_count = value.get("retry_count", 0)
      should_fail = value.get("should_fail", False)
      failure_type = value.get("failure_type", "")

      # Determine if this operation should succeed or fail
      if not should_fail:
        # Success case
        result = {
          "operation_id": operation_id,
          "api_endpoint": value.get("api_endpoint"),
          "status": "success",
          "retry_count": retry_count,
          "final_attempt": True
        }
      else:
        # Check if error is retryable
        retryable_errors = ["network_timeout", "rate_limit", "server_error"]
        is_retryable = failure_type in retryable_errors

        if is_retryable and retry_count < MAX_RETRIES:
          # Increment retry count and calculate delay
          new_retry_count = retry_count + 1
          backoff_delay = calculate_backoff_delay(new_retry_count - 1)

          # Simulate: 30% chance of success after retry
          if random.random() < 0.3:
            result = {
              "operation_id": operation_id,
              "api_endpoint": value.get("api_endpoint"),
              "status": "success_after_retry",
              "retry_count": new_retry_count,
              "backoff_delay": backoff_delay,
              "final_attempt": True
            }
          else:
            result = {
              "operation_id": operation_id,
              "api_endpoint": value.get("api_endpoint"),
              "status": "retry_needed",
              "retry_count": new_retry_count,
              "failure_type": failure_type,
              "backoff_delay": backoff_delay,
              "final_attempt": new_retry_count >= MAX_RETRIES
            }
        else:
          # Not retryable or max retries exceeded
          result = {
            "operation_id": operation_id,
            "api_endpoint": value.get("api_endpoint"),
            "status": "permanent_failure",
            "retry_count": retry_count,
            "failure_type": failure_type,
            "final_attempt": True
          }

    expression: result
    resultType: json

pipelines:
  retry_processor:
    from: api_operations
    via:
      - type: transformValue
        mapper: process_with_retry
      - type: peek
        forEach:
          code: |
            status = value.get("status", "unknown")
            op_id = value.get("operation_id", "unknown")
            retry_count = value.get("retry_count", 0)

            if status == "success":
              log.info("API SUCCESS - {}: completed", op_id)
            elif status == "success_after_retry":
              log.info("API RETRY SUCCESS - {}: completed after {} retries", op_id, retry_count)
            elif status == "retry_needed":
              delay = value.get("backoff_delay", 0)
              log.warn("API RETRY - {}: attempt {} failed, retry in {}ms", op_id, retry_count, delay)
            else:
              log.error("API FAILED - {}: {}", op_id, value.get("failure_type"))
    branch:
      # Route successful operations
      - if:
          expression: value.get("status") in ["success", "success_after_retry"]
        to: successful_operations
      # Route failed operations
      - to: failed_operations

What it does:

  • Produces API operations: Creates operations (fetch_user, update_profile, delete_account) with deliberate failures for some operations to test retry logic
  • Simulates API calls: Mock external API with different failure scenarios (timeout, rate_limit, server_error) and success cases
  • Implements retry logic: Calculates exponential backoff delays (1s, 2s, 4s, 8s) with added jitter to prevent retry storms
  • Tracks attempts: Maintains retry count, determines retry eligibility based on error type and max attempts (3), stores original operation data
  • Routes by outcome: Successful operations go to successful_operations, failed/exhausted retries go to failed_operations with retry history

5. Circuit Breaker Pattern

Prevent cascading failures by temporarily stopping calls to failing services, allowing them to recover.

Service Requests Producer - click to expand
# Producer for circuit breaker example - generates service requests with failure patterns

functions:
  generate_service_requests:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter
      counter += 1

      # Generate different service request patterns
      if counter <= 5:
        # Initial successful requests
        request = {
          "request_id": f"req_{counter}",
          "service": "user_service",
          "should_succeed": True
        }
      elif counter <= 10:
        # Start failing to trigger circuit breaker
        request = {
          "request_id": f"req_{counter}",
          "service": "user_service",
          "should_succeed": False,
          "error_type": "timeout"
        }
      elif counter <= 15:
        # Continue with failures (circuit should be open)
        request = {
          "request_id": f"req_{counter}",
          "service": "user_service",
          "should_succeed": False,
          "error_type": "connection_failed"
        }
      else:
        # Service recovery - mix of success/failure
        request = {
          "request_id": f"req_{counter}",
          "service": "user_service",
          "should_succeed": random.choice([True, True, False])  # 66% success
        }

      key = f"req_{counter}"
    expression: (key, request)
    resultType: (string, json)

producers:
  service_request_producer:
    generator: generate_service_requests
    interval: 2s
    count: 20
    to:
      topic: service_requests
      keyType: string
      valueType: json
Circuit Breaker Processor - click to expand
# Circuit breaker pattern to prevent cascading failures

streams:
  service_requests:
    topic: service_requests
    keyType: string
    valueType: json

  service_responses:
    topic: service_responses
    keyType: string
    valueType: json

  circuit_events:
    topic: circuit_events
    keyType: string
    valueType: json

functions:
  circuit_breaker_handler:
    type: valueTransformer
    globalCode: |
      # Circuit breaker state
      failure_count = 0
      success_count = 0
      circuit_state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
      last_failure_time = 0

      # Configuration
      FAILURE_THRESHOLD = 3
      SUCCESS_THRESHOLD = 2
      TIMEOUT_MS = 10000  # 10 seconds

    code: |
      global failure_count, success_count, circuit_state, last_failure_time

      import time
      current_time = int(time.time() * 1000)

      request_id = value.get("request_id", "unknown")
      should_succeed = value.get("should_succeed", True)

      # Check if we should transition from OPEN to HALF_OPEN
      if circuit_state == "OPEN" and (current_time - last_failure_time) > TIMEOUT_MS:
        circuit_state = "HALF_OPEN"
        success_count = 0

      # Handle request based on circuit state
      if circuit_state == "OPEN":
        # Circuit is open - reject request immediately
        result = {
          "request_id": request_id,
          "status": "circuit_open",
          "circuit_state": circuit_state,
          "failure_count": failure_count,
          "message": "Circuit breaker is OPEN - request rejected"
        }
      else:
        # Circuit is CLOSED or HALF_OPEN - try to process request
        if should_succeed:
          # Request succeeds
          success_count += 1

          if circuit_state == "HALF_OPEN" and success_count >= SUCCESS_THRESHOLD:
            # Reset circuit breaker
            circuit_state = "CLOSED"
            failure_count = 0
            success_count = 0

          result = {
            "request_id": request_id,
            "status": "success",
            "circuit_state": circuit_state,
            "success_count": success_count,
            "failure_count": failure_count,
            "service": value.get("service")
          }
        else:
          # Request fails
          failure_count += 1
          success_count = 0
          last_failure_time = current_time

          if failure_count >= FAILURE_THRESHOLD:
            circuit_state = "OPEN"

          result = {
            "request_id": request_id,
            "status": "failure",
            "circuit_state": circuit_state,
            "failure_count": failure_count,
            "error_type": value.get("error_type", "unknown"),
            "service": value.get("service")
          }

    expression: result
    resultType: json

pipelines:
  circuit_breaker_processor:
    from: service_requests
    via:
      - type: transformValue
        mapper: circuit_breaker_handler
      - type: peek
        forEach:
          code: |
            status = value.get("status", "unknown")
            circuit_state = value.get("circuit_state", "unknown")
            req_id = value.get("request_id", "unknown")

            if status == "circuit_open":
              log.warn("CIRCUIT OPEN - {}: Request rejected", req_id)
            elif status == "success":
              log.info("REQUEST OK - {} [{}]: Success", req_id, circuit_state)
            else:
              failures = value.get("failure_count", 0)
              log.error("REQUEST FAILED - {} [{}]: {} failures", req_id, circuit_state, failures)
    branch:
      # Route successful requests
      - if:
          expression: value.get("status") == "success"
        to: service_responses
      # Route circuit breaker events and failures
      - to: circuit_events

What it does:

  • Produces service requests: Creates requests with request_ids, some designed to fail to trigger circuit breaker state changes
  • Tracks circuit state: Uses global variables (failure_count, circuit_state) to maintain CLOSED/OPEN/HALF_OPEN states across all requests
  • Opens on failures: After 3 consecutive failures, circuit transitions from CLOSED to OPEN state to protect failing service
  • Rejects when open: In OPEN state, immediately returns circuit_open status without attempting processing, showing current failure count
  • Processes requests: In CLOSED state, simulates service calls with success/failure scenarios, updating failure counter and circuit state accordingly

Error Handling Best Practices

Data Type Recommendations

  • Use JSON types: Provides flexibility for error objects and easy inspection in Kowl UI
  • Include context: Add timestamps, retry counts, and error classifications to all error messages
  • Preserve original data: Keep original messages in error objects for debugging

Function Design Patterns

  • Handle null values: Always check for None/null values explicitly
  • Use appropriate exceptions: Choose specific exception types for different error conditions
  • Provide meaningful errors: Include context about what went wrong and potential solutions
  • Log appropriately: Use different log levels (info/warn/error) based on severity

Monitoring and Alerting

  • Track error rates: Monitor error percentages by type and set appropriate thresholds
  • Circuit breaker metrics: Alert when circuits open and track recovery times
  • Retry success rates: Measure effectiveness of retry strategies
  • Dead letter queue size: Monitor unprocessable message volume

Testing Error Scenarios

  • Simulate failures: Test error handling code with various failure scenarios
  • Load testing: Ensure error handling works under high load conditions
  • Recovery testing: Verify systems can recover from failure states

Error Pattern Selection Guide

Pattern Use Case Benefits When to Use
Validation & Filtering Data quality issues Early detection, clear routing Input data validation, format checking
Try-Catch Function-level errors Graceful degradation Type conversion, calculations, parsing
Dead Letter Queue Permanent failures No data loss, failure analysis Malformed data, business rule violations
Retry Strategies Transient failures Fault tolerance, automatic recovery Network timeouts, rate limits, temporary errors
Circuit Breaker External service failures Prevents cascading failures API calls, database connections, service dependencies

Next Steps