Skip to content

Integration with External Systems in KSML

This tutorial covers how to integrate KSML with external systems like databases and APIs using JSON data formats for better observability.

Introduction

Stream processing often needs external data. This tutorial shows patterns for enriching events with database lookups, API calls, and async integrations.

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_user_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_product_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic order_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic external_requests && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic external_responses && \

Integration Patterns

API Enrichment Pattern

The API enrichment pattern calls external REST APIs to add additional data to streaming events. This is useful when you need real-time data that can't be cached locally.

What it does:

  • Produces user events: Creates events (login, purchase, page_view) with user IDs, session info, device details, page URLs, referrer sources
  • Calls mock API: For each user_id, fetches profile from hardcoded lookup table simulating external REST API call with user details
  • Handles API failures: Uses try-catch to gracefully handle missing users, returning fallback "Unknown User" profile data
  • Enriches with profiles: Combines original event data with fetched profile (name, tier, location, preferences, lifetime_value)
  • Outputs enriched events: Returns JSON combining event details with user profile, API call timing, and computed recommendations

Key concepts demonstrated:

  • Making external API calls from KSML functions with JSON data structures
  • Handling API failures gracefully with structured fallback strategies
  • Managing API latency and timeouts in stream processing
  • Enriching events with external data while maintaining stream processing semantics
User Events Producer (API enrichment demo) - click to expand
# Producer for API enrichment demo - generates user events that need external data

functions:
  generate_user_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]
      event_types = ["login", "view_product", "add_to_cart", "purchase"]
    code: |
      global event_counter, user_ids, event_types

      event_counter += 1
      user_id = random.choice(user_ids)
      event_type = random.choice(event_types)

      # Create structured JSON user event for better readability in Kowl UI
      user_event = {
        "event_id": f"event_{event_counter:06d}",
        "event_type": event_type,
        "user_id": user_id,
        "timestamp": int(time.time() * 1000),
        "sequence_number": event_counter,
        "session_id": f"session_{user_id}_{event_counter // 5}",  # Change session every 5 events
        "device_info": {
          "platform": random.choice(["web", "mobile", "tablet"]),
          "user_agent": random.choice(["Chrome", "Firefox", "Safari", "Edge"]),
          "ip_address": f"192.168.1.{random.randint(1, 255)}"
        },
        "context": {
          "page_url": random.choice(["/home", "/products", "/cart", "/checkout"]),
          "referrer": random.choice([None, "google.com", "facebook.com", "direct"]),
          "campaign": random.choice([None, "summer_sale", "new_user", "retargeting"])
        },
        "metadata": {
          "simulation": True,
          "api_enrichment": True,
          "needs_external_data": True
        }
      }

    expression: (f"event_{event_counter:06d}", user_event)
    resultType: (string, json)

producers:
  user_event_producer:
    generator: generate_user_events
    interval: 3s
    to:
      topic: user_events
      keyType: string
      valueType: json
API Enrichment Processor (external API calls) - click to expand
# Processor demonstrating API enrichment pattern

streams:
  user_events:
    topic: user_events
    keyType: string
    valueType: json

functions:
  enrich_with_api_data:
    type: valueTransformer
    globalCode: |
      import time
      import random

      # Mock API client (simulates external REST API)
      def get_user_profile(user_id):
        """Simulate API call to get user profile"""
        try:
          # Simulate API latency
          time.sleep(0.1)

          # Mock user profiles based on user_id
          profiles = {
            "user_001": {
              "name": "Alice Johnson", "tier": "premium", "location": "New York",
              "age": 32, "email": "alice@example.com", "preferences": ["tech", "gaming"],
              "lifetime_value": 2500.00, "registration_date": "2022-01-15"
            },
            "user_002": {
              "name": "Bob Smith", "tier": "standard", "location": "London", 
              "age": 28, "email": "bob@example.com", "preferences": ["sports", "music"],
              "lifetime_value": 850.00, "registration_date": "2023-03-22"
            },
            "user_003": {
              "name": "Charlie Davis", "tier": "premium", "location": "Tokyo",
              "age": 45, "email": "charlie@example.com", "preferences": ["travel", "food"],
              "lifetime_value": 3200.00, "registration_date": "2021-11-08"
            },
            "user_004": {
              "name": "Diana Wilson", "tier": "basic", "location": "Sydney",
              "age": 24, "email": "diana@example.com", "preferences": ["fashion", "art"],
              "lifetime_value": 320.00, "registration_date": "2024-01-10"
            },
            "user_005": {
              "name": "Eve Brown", "tier": "standard", "location": "Berlin",
              "age": 35, "email": "eve@example.com", "preferences": ["books", "movies"],
              "lifetime_value": 1150.00, "registration_date": "2022-08-17"
            }
          }

          profile = profiles.get(user_id, {
            "name": "Unknown User", "tier": "basic", "location": "Unknown",
            "age": 0, "email": "unknown@example.com", "preferences": [],
            "lifetime_value": 0.0, "registration_date": "unknown"
          })
          log.info("Fetched profile for user {}: {}", user_id, profile["name"])
          return profile
        except Exception as e:
          log.warn("API request failed for user {}: {}", user_id, str(e))
          return None

    code: |
      import time

      # Extract fields from JSON event
      if not value:
        return None

      event_id = value.get("event_id")
      event_type = value.get("event_type")
      user_id = value.get("user_id")
      timestamp = value.get("timestamp")
      sequence_number = value.get("sequence_number")
      session_id = value.get("session_id")
      device_info = value.get("device_info", {})
      context = value.get("context", {})
      metadata = value.get("metadata", {})

      if not event_type or not user_id or not timestamp:
        return None

      # Call API to get user profile data
      api_start_time = int(time.time() * 1000)
      profile_data = get_user_profile(user_id)
      api_end_time = int(time.time() * 1000)
      api_latency = api_end_time - api_start_time

      # Create enriched event with comprehensive data structure
      if profile_data:
        enriched_event = {
          "enrichment_status": "SUCCESS",
          "enrichment_type": "API_PROFILE_DATA",
          "original_event": {
            "event_id": event_id,
            "event_type": event_type,
            "user_id": user_id,
            "timestamp": timestamp,
            "sequence_number": sequence_number,
            "session_id": session_id,
            "device_info": device_info,
            "context": context
          },
          "enriched_data": {
            "user_profile": profile_data,
            "computed_metrics": {
              "user_tier_level": {"premium": 3, "standard": 2, "basic": 1}.get(profile_data.get("tier"), 0),
              "is_high_value": profile_data.get("lifetime_value", 0) > 1000,
              "account_age_days": (timestamp - 1640995200000) // (24 * 60 * 60 * 1000) if profile_data.get("registration_date") != "unknown" else 0
            },
            "recommendations": {
              "personalized": profile_data.get("preferences", [])[:2] if profile_data.get("preferences") else [],
              "tier_benefits": f"Available benefits for {profile_data.get('tier', 'basic')} tier",
              "location_offers": f"Special offers in {profile_data.get('location', 'Unknown')}"
            }
          },
          "api_metrics": {
            "api_call_duration_ms": api_latency,
            "api_endpoint": "mock_user_profile_api",
            "api_success": True,
            "cache_hit": False  # This is a live API call
          },
          "processing_info": {
            "enrichment_timestamp": api_end_time,
            "processor_version": "1.0",
            "enrichment_rules": ["basic_profile", "tier_computation", "recommendations"]
          }
        }
        return enriched_event
      else:
        # Return event with failure information if API fails
        fallback_event = {
          "enrichment_status": "FAILED", 
          "enrichment_type": "API_PROFILE_DATA",
          "original_event": {
            "event_id": event_id,
            "event_type": event_type,
            "user_id": user_id,
            "timestamp": timestamp,
            "sequence_number": sequence_number,
            "session_id": session_id,
            "device_info": device_info,
            "context": context
          },
          "enriched_data": {
            "user_profile": None,
            "fallback_data": {
              "default_tier": "basic",
              "estimated_location": "Unknown",
              "default_preferences": []
            }
          },
          "api_metrics": {
            "api_call_duration_ms": api_latency,
            "api_endpoint": "mock_user_profile_api", 
            "api_success": False,
            "error_reason": "Profile not found or API unavailable"
          },
          "processing_info": {
            "enrichment_timestamp": api_end_time,
            "processor_version": "1.0",
            "fallback_applied": True
          }
        }
        return fallback_event

    expression: result if result else None
    resultType: json

pipelines:
  api_enrichment_pipeline:
    from: user_events
    via:
      - type: mapValues
        mapper: enrich_with_api_data
      - type: filter
        if:
          expression: value is not None
    to:
      topic: enriched_user_events
      keyType: string
      valueType: json

API enrichment benefits:

  • Real-time enrichment: Access to the most current external data
  • Flexible data sources: Can integrate with any REST API
  • Graceful degradation: Continues processing even when external systems fail
  • Simple implementation: Straightforward request-response pattern

Database Lookup Pattern

This pattern shows how to enrich streaming events with data that would normally come from a database.

What happens in this example:

Imagine you have an e-commerce website. Users view products, add them to cart, and make purchases. Your stream only has basic event data like {"product_id": "PROD001", "event_type": "purchased", "quantity": 2}. But you want to know the product name, price, and category too.

The Producer creates these basic product events every 2 seconds:

{"product_id": "PROD001", "event_type": "purchased", "quantity": 2, "user_id": "user_1234"}

The Processor enriches each event by:

  1. First time only: Loads a product catalog into memory (like a mini database):
    • PROD001 → "Wireless Headphones", $99.99, "Electronics"
    • PROD002 → "Coffee Mug", $12.50, "Kitchen"
  2. For each event: Looks up the product_id and adds the details:
    {
      "product_id": "PROD001", 
      "event_type": "purchased", 
      "quantity": 2,
      "enriched_data": {
        "name": "Wireless Headphones",
        "category": "Electronics", 
        "unit_price": 99.99,
        "total_price": 199.98
      }
    }
    

This way you get rich product information without hitting a database for every single event.

Product Events Producer (database lookup demo) - click to expand
# Producer for database lookup demo - generates product events

functions:
  generate_product_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      product_ids = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
      event_types = ["viewed", "added_to_cart", "purchased", "removed_from_cart"]
    code: |
      global event_counter, product_ids, event_types

      event_counter += 1
      product_id = random.choice(product_ids)
      event_type = random.choice(event_types)
      quantity = random.randint(1, 5) if event_type in ["added_to_cart", "purchased"] else 1
      current_timestamp = int(time.time() * 1000)

      # Create structured JSON product event for better readability in Kowl UI
      product_event = {
        "event_id": f"product_event_{event_counter:06d}",
        "event_type": event_type,
        "product_id": product_id,
        "quantity": quantity,
        "timestamp": current_timestamp,
        "user_id": f"user_{random.randint(1000, 9999)}",
        "session_id": f"session_{event_counter // 10}_{random.randint(100, 999)}",
        "page_location": random.choice(["/products", "/category", "/search"]),
        "metadata": {
          "simulation": True,
          "database_lookup": True
        }
      }

    expression: (product_id, product_event)
    resultType: (string, json)

producers:
  product_event_producer:
    generator: generate_product_events
    interval: 2s
    to:
      topic: product_events
      keyType: string
      valueType: json
Database Lookup Processor (cached reference data) - click to expand
# Processor demonstrating database lookup pattern with state store caching

streams:
  product_events:
    topic: product_events
    keyType: string
    valueType: json

stores:
  product_reference_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

functions:
  load_product_reference_data:
    type: forEach
    globalCode: |
      import json

      # Track if we've already loaded the data
      data_loaded = False

      def load_product_catalog():
        """Simulate loading product data from database"""
        # Mock product catalog (simulates database query results)
        products = {
          "PROD001": {"name": "Wireless Headphones", "price": 99.99, "category": "Electronics"},
          "PROD002": {"name": "Coffee Mug", "price": 12.50, "category": "Kitchen"},
          "PROD003": {"name": "Running Shoes", "price": 129.99, "category": "Sports"},
          "PROD004": {"name": "Notebook", "price": 5.99, "category": "Office"},
          "PROD005": {"name": "Smartphone", "price": 699.99, "category": "Electronics"}
        }

        log.info("Loaded {} products into reference data store", len(products))
        return products

    code: |
      global data_loaded

      # Only load data once
      if not data_loaded:
        # Load product data into state store (simulates database loading)
        products = load_product_catalog()

        for product_id, product_data in products.items():
          # Store as JSON string to avoid ForeignObject issues
          product_reference_store.put(product_id, json.dumps(product_data))

        data_loaded = True
        log.info("Product reference data loaded into state store")
    stores:
      - product_reference_store

  enrich_with_product_data:
    type: valueTransformer
    code: |
      import json

      # Extract fields from JSON product event using .get() method
      if not value:
        return None

      event_id = str(value.get("event_id", ""))
      event_type = str(value.get("event_type", ""))
      product_id = str(value.get("product_id", ""))
      quantity = int(value.get("quantity", 0))
      timestamp = int(value.get("timestamp", 0))
      user_id = str(value.get("user_id", ""))
      session_id = str(value.get("session_id", ""))
      page_location = str(value.get("page_location", ""))

      if not event_type or not product_id or quantity == 0:
        return None

      # Look up product data from state store (cached database data)
      product_data_str = product_reference_store.get(product_id)

      if product_data_str:
        # Parse JSON string to get product data
        product_data = json.loads(product_data_str)

        # Calculate total price for purchase events
        unit_price = product_data.get("price", 0.0)
        total_price = unit_price * quantity if event_type in ["purchased", "added_to_cart"] else 0.0

        # Create enriched event with product details
        enriched_event = {
          "event_id": event_id,
          "event_type": event_type,
          "product_id": product_id,
          "quantity": quantity,
          "timestamp": timestamp,
          "user_id": user_id,
          "session_id": session_id,
          "page_location": page_location,
          "enriched_data": {
            "name": product_data.get("name", "Unknown"),
            "category": product_data.get("category", "Unknown"),
            "unit_price": unit_price,
            "total_price": total_price
          },
          "cache_hit": True
        }

        log.info("Enriched {} event for product: {} ({})", event_type, product_data.get("name"), product_id)
        return enriched_event
      else:
        # Product not found in cache
        fallback_event = {
          "event_id": event_id,
          "event_type": event_type,
          "product_id": product_id,
          "quantity": quantity,
          "timestamp": timestamp,
          "user_id": user_id,
          "session_id": session_id,
          "page_location": page_location,
          "enriched_data": {
            "name": "Unknown Product",
            "category": "Unknown",
            "unit_price": 0.0,
            "total_price": 0.0
          },
          "cache_hit": False
        }

        log.warn("Product not found in reference data: {}", product_id)
        return fallback_event

    expression: result if result else None
    resultType: json
    stores:
      - product_reference_store

pipelines:
  # Process product events with database lookup
  product_enrichment_pipeline:
    from: product_events
    via:
      - type: peek
        forEach: load_product_reference_data
      - type: mapValues
        mapper: enrich_with_product_data
      - type: filter
        if:
          expression: value is not None
    to:
      topic: enriched_product_events
      keyType: string
      valueType: json

Key concepts demonstrated:

  • Loading reference data into state stores as cache
  • Fast local lookups without external database calls
  • JSON enrichment with cached data

Async Integration Pattern

The async integration pattern uses separate Kafka topics for communication with external systems, providing loose coupling and better resilience.

What it does:

  • Produces order events: Creates orders with status (created, paid, shipped) containing customer info, amounts, payment methods, business context
  • Filters paid orders: Only processes orders with status="paid", creates external payment processing requests with correlation IDs
  • Sends to request topic: Outputs JSON payment requests to external_requests topic with order details, customer tier, processing priority
  • Processes responses: Reads responses from external_responses topic, matches by correlation_id, handles success/failure scenarios
  • Outputs results: Returns JSON combining original order with external processing results, transaction IDs, and follow-up actions

Key concepts demonstrated:

  • Creating request topics for external system communication with JSON payloads
  • Generating correlation IDs for request-response tracking
  • Processing responses asynchronously through separate topics with structured data
  • Building event-driven integration patterns using JSON messaging
Order Events Producer (async integration demo) - click to expand
# Producer for async integration demo - generates order events

functions:
  generate_orders:
    type: generator
    globalCode: |
      import random
      import time
      order_counter = 0
      customer_ids = ["CUST001", "CUST002", "CUST003", "CUST004"]
      statuses = ["created", "paid", "shipped", "delivered"]
    code: |
      global order_counter, customer_ids, statuses

      order_counter += 1
      order_id = f"ORDER_{order_counter:04d}"
      customer_id = random.choice(customer_ids)
      status = random.choice(statuses)
      amount = round(random.uniform(10.0, 500.0), 2)
      current_timestamp = int(time.time() * 1000)

      # Create structured JSON order event for better readability in Kowl UI
      order_event = {
        "order_id": order_id,
        "customer_id": customer_id,
        "status": status,
        "amount": amount,
        "timestamp": current_timestamp,
        "sequence_number": order_counter,
        "order_details": {
          "currency": "USD",
          "payment_method": random.choice(["credit_card", "debit_card", "paypal", "bank_transfer"]),
          "shipping_method": random.choice(["standard", "express", "overnight", "pickup"]),
          "order_source": random.choice(["web", "mobile", "phone", "store"])
        },
        "customer_info": {
          "customer_tier": random.choice(["bronze", "silver", "gold", "platinum"]),
          "loyalty_points": random.randint(0, 5000),
          "previous_orders": random.randint(0, 50)
        },
        "fulfillment": {
          "warehouse_id": f"WH_{random.randint(1, 5):02d}",
          "estimated_ship_date": current_timestamp + random.randint(1, 7) * 24 * 60 * 60 * 1000,  # 1-7 days
          "shipping_address": {
            "country": random.choice(["US", "CA", "UK", "DE", "FR"]),
            "region": random.choice(["North", "South", "East", "West", "Central"])
          }
        },
        "business_context": {
          "requires_external_processing": status == "paid",
          "high_value": amount > 200,
          "priority": "high" if amount > 300 else "normal",
          "async_processing_needed": True
        },
        "metadata": {
          "simulation": True,
          "async_integration": True,
          "correlation_id": f"corr_{order_id}_{current_timestamp}",
          "processing_version": "1.0"
        }
      }

    expression: (order_id, order_event)
    resultType: (string, json)

producers:
  order_producer:
    generator: generate_orders
    interval: 3s
    to:
      topic: order_events
      keyType: string
      valueType: json
Async Integration Processor (request-response pattern) - click to expand
# Processor demonstrating async integration pattern

streams:
  order_events:
    topic: order_events
    keyType: string
    valueType: json
  external_requests:
    topic: external_requests
    keyType: string
    valueType: json

functions:
  filter_paid_orders:
    type: predicate
    code: |
      # Extract status from JSON order event
      if not value:
        return False
      status = value.get("status")
      # Only process 'paid' orders
      return status == "paid"
    expression: result

  create_external_request:
    type: keyValueTransformer
    code: |
      import time

      # Extract fields from JSON order event
      if not value:
        return None

      order_id = value.get("order_id")
      customer_id = value.get("customer_id")
      status = value.get("status")
      amount = value.get("amount")
      timestamp = value.get("timestamp")
      sequence_number = value.get("sequence_number")
      order_details = value.get("order_details", {})
      customer_info = value.get("customer_info", {})
      fulfillment = value.get("fulfillment", {})
      business_context = value.get("business_context", {})
      metadata = value.get("metadata", {})

      # Create comprehensive request for external payment processing system
      request_id = f"REQ_{order_id}_{timestamp}"

      external_request = {
        "request_id": request_id,
        "request_type": "PAYMENT_PROCESSING",
        "original_order": {
          "order_id": order_id,
          "customer_id": customer_id,
          "amount": amount,
          "timestamp": timestamp,
          "sequence_number": sequence_number
        },
        "payment_details": {
          "amount": amount,
          "currency": order_details.get("currency", "USD"),
          "payment_method": order_details.get("payment_method"),
          "customer_tier": customer_info.get("customer_tier"),
          "loyalty_points": customer_info.get("loyalty_points")
        },
        "processing_context": {
          "priority": business_context.get("priority", "normal"),
          "high_value": business_context.get("high_value", False),
          "customer_previous_orders": customer_info.get("previous_orders", 0),
          "order_source": order_details.get("order_source")
        },
        "async_metadata": {
          "correlation_id": metadata.get("correlation_id", f"corr_{request_id}"),
          "created_at": int(time.time() * 1000),
          "timeout_ms": 30000,  # 30 second timeout
          "retry_count": 0,
          "expected_response_topic": "external_responses"
        },
        "external_system_info": {
          "target_system": "payment_processor",
          "api_version": "v2.1",
          "request_format": "async_json",
          "callback_required": True
        }
      }

      log.info("Created external payment request for order {}: amount=${:.2f}, priority={}", 
               order_id, amount, business_context.get("priority", "normal"))

      return (request_id, external_request)

    expression: result
    resultType: (string, json)

  process_external_response:
    type: valueTransformer 
    globalCode: |
      import time
      import random
    code: |
      # Simulate processing external system response
      # In real scenario, this would come from external system response topic

      if not value:
        return None

      # Extract request information
      request_id = value.get("request_id")
      request_type = value.get("request_type")
      original_order = value.get("original_order", {})
      payment_details = value.get("payment_details", {})
      processing_context = value.get("processing_context", {})
      async_metadata = value.get("async_metadata", {})

      # Simulate external system processing with realistic delays and outcomes
      processing_time_ms = random.randint(100, 2000)  # 0.1 to 2 seconds
      success_rate = 0.9 if processing_context.get("high_value") else 0.95
      is_successful = random.random() < success_rate

      current_timestamp = int(time.time() * 1000)

      # Create comprehensive response
      response_data = {
        "response_id": f"RESP_{request_id}_{current_timestamp}",
        "request_id": request_id,
        "response_type": "PAYMENT_PROCESSING_RESULT",
        "status": "success" if is_successful else "failed",
        "original_request": {
          "order_id": original_order.get("order_id"),
          "customer_id": original_order.get("customer_id"),
          "amount": payment_details.get("amount"),
          "request_timestamp": original_order.get("timestamp")
        },
        "processing_result": {
          "transaction_id": f"TXN_{current_timestamp}_{random.randint(1000, 9999)}" if is_successful else None,
          "authorization_code": f"AUTH_{random.randint(100000, 999999)}" if is_successful else None,
          "processing_status": "approved" if is_successful else "declined",
          "reason_code": "000" if is_successful else random.choice(["051", "061", "065", "075"]),
          "reason_message": "Transaction approved" if is_successful else random.choice([
            "Insufficient funds", "Invalid card", "Expired card", "Fraud suspected"
          ])
        },
        "financial_details": {
          "processed_amount": payment_details.get("amount") if is_successful else 0.0,
          "currency": payment_details.get("currency", "USD"),
          "fee_amount": round(payment_details.get("amount", 0) * 0.029, 2) if is_successful else 0.0,  # 2.9% fee
          "settlement_date": current_timestamp + 86400000 if is_successful else None  # Next day
        },
        "system_metadata": {
          "external_system": "payment_processor_v2.1",
          "processing_time_ms": processing_time_ms,
          "processed_at": current_timestamp,
          "correlation_id": async_metadata.get("correlation_id"),
          "retry_attempt": async_metadata.get("retry_count", 0) + 1,
          "final_response": True
        },
        "business_context": {
          "customer_impact": "order_confirmed" if is_successful else "order_cancelled",
          "requires_notification": True,
          "follow_up_actions": ["send_confirmation_email", "update_inventory"] if is_successful else ["send_decline_email", "release_inventory"],
          "priority_level": processing_context.get("priority", "normal")
        }
      }

      log.info("Processed external system response for order {}: status={}, amount=${:.2f}", 
               original_order.get("order_id"), response_data["status"], payment_details.get("amount", 0))

      return response_data

    expression: result
    resultType: json

pipelines:
  # Send requests to external system
  external_request_pipeline:
    from: order_events
    via:
      - type: filter
        if: filter_paid_orders
      - type: transformKeyValue
        mapper: create_external_request
    to:
      topic: external_requests
      keyType: string
      valueType: json

  # Process responses from external system (mock processing)
  external_response_pipeline:
    from: external_requests
    via:
      - type: mapValues
        mapper: process_external_response
    to:
      topic: external_responses
      keyType: string
      valueType: json

Async integration features:

  • Loose coupling: External systems communicate via topics, not direct calls
  • Scalability: Multiple consumers can process requests and responses
  • Reliability: Messages are persisted in Kafka topics
  • Monitoring: Easy to monitor request/response flows through topic metrics

Key Metrics to Track

Monitor these metrics to ensure integration health:

  • Track latency of external API calls and database queries
  • Monitor the percentage of successful external system interactions
  • Track different types of errors (timeouts, authentication, etc.)
  • For lookup patterns, monitor state store cache hit rates
  • For async patterns, monitor request and response topic lag

Alert Configuration

Set up alerts for integration issues:

alerts:
  high_api_latency:
    condition: avg_api_response_time > 5000ms
    duration: 2_minutes
    severity: warning

  external_system_down:
    condition: api_error_rate > 50%
    duration: 1_minute
    severity: critical

  cache_miss_rate_high:
    condition: cache_miss_rate > 20%
    duration: 5_minutes
    severity: warning

Advanced Integration Patterns

Hybrid Event-Database Pattern

Combine streaming events with database lookups for complex business logic:

functions:
  hybrid_processor:
    type: valueTransformer
    code: |
      # Process streaming event
      event_data = parse_event(value)

      # Look up reference data from cached database
      reference_data = get_reference_data(event_data.entity_id)

      # Apply business rules using both streaming and reference data
      result = apply_business_rules(event_data, reference_data)

      # Optionally trigger external API call based on result
      if result.requires_notification:
        send_notification(result)

      return result

Multi-System Coordination

Coordinate operations across multiple external systems:

functions:
  multi_system_coordinator:
    type: keyValueTransformer
    code: |
      # Create requests for multiple external systems
      payment_request = create_payment_request(value)
      inventory_request = create_inventory_request(value)

      # Use correlation ID to track related requests
      correlation_id = generate_correlation_id()

      # Return multiple outputs for different external systems
      return [
        (f"payment_{correlation_id}", payment_request),
        (f"inventory_{correlation_id}", inventory_request)
      ]

Conclusion

External integration is vital for stream processing. Using API enrichment, database lookups, and async integration, you can build scalable KSML applications that enhance streaming data without sacrificing performance.

Choose the right pattern: API enrichment for real-time data, database lookups for reference data, and async integration for multi-system workflows.