Integration with External Systems in KSML
This tutorial covers how to integrate KSML with external systems like databases and APIs using JSON data formats for better observability.
Introduction
Stream processing often needs external data. This tutorial shows patterns for enriching events with database lookups, API calls, and async integrations.
Prerequisites
Before starting this tutorial:
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_user_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_product_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic order_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic external_requests && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic external_responses && \
Integration Patterns
API Enrichment Pattern
The API enrichment pattern calls external REST APIs to add additional data to streaming events. This is useful when you need real-time data that can't be cached locally.
What it does:
- Produces user events: Creates events (login, purchase, page_view) with user IDs, session info, device details, page URLs, referrer sources
- Calls mock API: For each user_id, fetches profile from hardcoded lookup table simulating external REST API call with user details
- Handles API failures: Uses try-catch to gracefully handle missing users, returning fallback "Unknown User" profile data
- Enriches with profiles: Combines original event data with fetched profile (name, tier, location, preferences, lifetime_value)
- Outputs enriched events: Returns JSON combining event details with user profile, API call timing, and computed recommendations
Key concepts demonstrated:
- Making external API calls from KSML functions with JSON data structures
- Handling API failures gracefully with structured fallback strategies
- Managing API latency and timeouts in stream processing
- Enriching events with external data while maintaining stream processing semantics
User Events Producer (API enrichment demo) - click to expand
# Producer for API enrichment demo - generates user events that need external data
functions:
generate_user_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]
event_types = ["login", "view_product", "add_to_cart", "purchase"]
code: |
global event_counter, user_ids, event_types
event_counter += 1
user_id = random.choice(user_ids)
event_type = random.choice(event_types)
# Create structured JSON user event for better readability in Kowl UI
user_event = {
"event_id": f"event_{event_counter:06d}",
"event_type": event_type,
"user_id": user_id,
"timestamp": int(time.time() * 1000),
"sequence_number": event_counter,
"session_id": f"session_{user_id}_{event_counter // 5}", # Change session every 5 events
"device_info": {
"platform": random.choice(["web", "mobile", "tablet"]),
"user_agent": random.choice(["Chrome", "Firefox", "Safari", "Edge"]),
"ip_address": f"192.168.1.{random.randint(1, 255)}"
},
"context": {
"page_url": random.choice(["/home", "/products", "/cart", "/checkout"]),
"referrer": random.choice([None, "google.com", "facebook.com", "direct"]),
"campaign": random.choice([None, "summer_sale", "new_user", "retargeting"])
},
"metadata": {
"simulation": True,
"api_enrichment": True,
"needs_external_data": True
}
}
expression: (f"event_{event_counter:06d}", user_event)
resultType: (string, json)
producers:
user_event_producer:
generator: generate_user_events
interval: 3s
to:
topic: user_events
keyType: string
valueType: json
API Enrichment Processor (external API calls) - click to expand
# Processor demonstrating API enrichment pattern
streams:
user_events:
topic: user_events
keyType: string
valueType: json
functions:
enrich_with_api_data:
type: valueTransformer
globalCode: |
import time
import random
# Mock API client (simulates external REST API)
def get_user_profile(user_id):
"""Simulate API call to get user profile"""
try:
# Simulate API latency
time.sleep(0.1)
# Mock user profiles based on user_id
profiles = {
"user_001": {
"name": "Alice Johnson", "tier": "premium", "location": "New York",
"age": 32, "email": "alice@example.com", "preferences": ["tech", "gaming"],
"lifetime_value": 2500.00, "registration_date": "2022-01-15"
},
"user_002": {
"name": "Bob Smith", "tier": "standard", "location": "London",
"age": 28, "email": "bob@example.com", "preferences": ["sports", "music"],
"lifetime_value": 850.00, "registration_date": "2023-03-22"
},
"user_003": {
"name": "Charlie Davis", "tier": "premium", "location": "Tokyo",
"age": 45, "email": "charlie@example.com", "preferences": ["travel", "food"],
"lifetime_value": 3200.00, "registration_date": "2021-11-08"
},
"user_004": {
"name": "Diana Wilson", "tier": "basic", "location": "Sydney",
"age": 24, "email": "diana@example.com", "preferences": ["fashion", "art"],
"lifetime_value": 320.00, "registration_date": "2024-01-10"
},
"user_005": {
"name": "Eve Brown", "tier": "standard", "location": "Berlin",
"age": 35, "email": "eve@example.com", "preferences": ["books", "movies"],
"lifetime_value": 1150.00, "registration_date": "2022-08-17"
}
}
profile = profiles.get(user_id, {
"name": "Unknown User", "tier": "basic", "location": "Unknown",
"age": 0, "email": "unknown@example.com", "preferences": [],
"lifetime_value": 0.0, "registration_date": "unknown"
})
log.info("Fetched profile for user {}: {}", user_id, profile["name"])
return profile
except Exception as e:
log.warn("API request failed for user {}: {}", user_id, str(e))
return None
code: |
import time
# Extract fields from JSON event
if not value:
return None
event_id = value.get("event_id")
event_type = value.get("event_type")
user_id = value.get("user_id")
timestamp = value.get("timestamp")
sequence_number = value.get("sequence_number")
session_id = value.get("session_id")
device_info = value.get("device_info", {})
context = value.get("context", {})
metadata = value.get("metadata", {})
if not event_type or not user_id or not timestamp:
return None
# Call API to get user profile data
api_start_time = int(time.time() * 1000)
profile_data = get_user_profile(user_id)
api_end_time = int(time.time() * 1000)
api_latency = api_end_time - api_start_time
# Create enriched event with comprehensive data structure
if profile_data:
enriched_event = {
"enrichment_status": "SUCCESS",
"enrichment_type": "API_PROFILE_DATA",
"original_event": {
"event_id": event_id,
"event_type": event_type,
"user_id": user_id,
"timestamp": timestamp,
"sequence_number": sequence_number,
"session_id": session_id,
"device_info": device_info,
"context": context
},
"enriched_data": {
"user_profile": profile_data,
"computed_metrics": {
"user_tier_level": {"premium": 3, "standard": 2, "basic": 1}.get(profile_data.get("tier"), 0),
"is_high_value": profile_data.get("lifetime_value", 0) > 1000,
"account_age_days": (timestamp - 1640995200000) // (24 * 60 * 60 * 1000) if profile_data.get("registration_date") != "unknown" else 0
},
"recommendations": {
"personalized": profile_data.get("preferences", [])[:2] if profile_data.get("preferences") else [],
"tier_benefits": f"Available benefits for {profile_data.get('tier', 'basic')} tier",
"location_offers": f"Special offers in {profile_data.get('location', 'Unknown')}"
}
},
"api_metrics": {
"api_call_duration_ms": api_latency,
"api_endpoint": "mock_user_profile_api",
"api_success": True,
"cache_hit": False # This is a live API call
},
"processing_info": {
"enrichment_timestamp": api_end_time,
"processor_version": "1.0",
"enrichment_rules": ["basic_profile", "tier_computation", "recommendations"]
}
}
return enriched_event
else:
# Return event with failure information if API fails
fallback_event = {
"enrichment_status": "FAILED",
"enrichment_type": "API_PROFILE_DATA",
"original_event": {
"event_id": event_id,
"event_type": event_type,
"user_id": user_id,
"timestamp": timestamp,
"sequence_number": sequence_number,
"session_id": session_id,
"device_info": device_info,
"context": context
},
"enriched_data": {
"user_profile": None,
"fallback_data": {
"default_tier": "basic",
"estimated_location": "Unknown",
"default_preferences": []
}
},
"api_metrics": {
"api_call_duration_ms": api_latency,
"api_endpoint": "mock_user_profile_api",
"api_success": False,
"error_reason": "Profile not found or API unavailable"
},
"processing_info": {
"enrichment_timestamp": api_end_time,
"processor_version": "1.0",
"fallback_applied": True
}
}
return fallback_event
expression: result if result else None
resultType: json
pipelines:
api_enrichment_pipeline:
from: user_events
via:
- type: mapValues
mapper: enrich_with_api_data
- type: filter
if:
expression: value is not None
to:
topic: enriched_user_events
keyType: string
valueType: json
API enrichment benefits:
- Real-time enrichment: Access to the most current external data
- Flexible data sources: Can integrate with any REST API
- Graceful degradation: Continues processing even when external systems fail
- Simple implementation: Straightforward request-response pattern
Database Lookup Pattern
This pattern shows how to enrich streaming events with data that would normally come from a database.
What happens in this example:
Imagine you have an e-commerce website. Users view products, add them to cart, and make purchases. Your stream only has basic event data like {"product_id": "PROD001", "event_type": "purchased", "quantity": 2}
. But you want to know the product name, price, and category too.
The Producer creates these basic product events every 2 seconds:
The Processor enriches each event by:
- First time only: Loads a product catalog into memory (like a mini database):
- PROD001 → "Wireless Headphones", $99.99, "Electronics"
- PROD002 → "Coffee Mug", $12.50, "Kitchen"
- For each event: Looks up the product_id and adds the details:
This way you get rich product information without hitting a database for every single event.
Product Events Producer (database lookup demo) - click to expand
# Producer for database lookup demo - generates product events
functions:
generate_product_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
product_ids = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
event_types = ["viewed", "added_to_cart", "purchased", "removed_from_cart"]
code: |
global event_counter, product_ids, event_types
event_counter += 1
product_id = random.choice(product_ids)
event_type = random.choice(event_types)
quantity = random.randint(1, 5) if event_type in ["added_to_cart", "purchased"] else 1
current_timestamp = int(time.time() * 1000)
# Create structured JSON product event for better readability in Kowl UI
product_event = {
"event_id": f"product_event_{event_counter:06d}",
"event_type": event_type,
"product_id": product_id,
"quantity": quantity,
"timestamp": current_timestamp,
"user_id": f"user_{random.randint(1000, 9999)}",
"session_id": f"session_{event_counter // 10}_{random.randint(100, 999)}",
"page_location": random.choice(["/products", "/category", "/search"]),
"metadata": {
"simulation": True,
"database_lookup": True
}
}
expression: (product_id, product_event)
resultType: (string, json)
producers:
product_event_producer:
generator: generate_product_events
interval: 2s
to:
topic: product_events
keyType: string
valueType: json
Database Lookup Processor (cached reference data) - click to expand
# Processor demonstrating database lookup pattern with state store caching
streams:
product_events:
topic: product_events
keyType: string
valueType: json
stores:
product_reference_store:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
functions:
load_product_reference_data:
type: forEach
globalCode: |
import json
# Track if we've already loaded the data
data_loaded = False
def load_product_catalog():
"""Simulate loading product data from database"""
# Mock product catalog (simulates database query results)
products = {
"PROD001": {"name": "Wireless Headphones", "price": 99.99, "category": "Electronics"},
"PROD002": {"name": "Coffee Mug", "price": 12.50, "category": "Kitchen"},
"PROD003": {"name": "Running Shoes", "price": 129.99, "category": "Sports"},
"PROD004": {"name": "Notebook", "price": 5.99, "category": "Office"},
"PROD005": {"name": "Smartphone", "price": 699.99, "category": "Electronics"}
}
log.info("Loaded {} products into reference data store", len(products))
return products
code: |
global data_loaded
# Only load data once
if not data_loaded:
# Load product data into state store (simulates database loading)
products = load_product_catalog()
for product_id, product_data in products.items():
# Store as JSON string to avoid ForeignObject issues
product_reference_store.put(product_id, json.dumps(product_data))
data_loaded = True
log.info("Product reference data loaded into state store")
stores:
- product_reference_store
enrich_with_product_data:
type: valueTransformer
code: |
import json
# Extract fields from JSON product event using .get() method
if not value:
return None
event_id = str(value.get("event_id", ""))
event_type = str(value.get("event_type", ""))
product_id = str(value.get("product_id", ""))
quantity = int(value.get("quantity", 0))
timestamp = int(value.get("timestamp", 0))
user_id = str(value.get("user_id", ""))
session_id = str(value.get("session_id", ""))
page_location = str(value.get("page_location", ""))
if not event_type or not product_id or quantity == 0:
return None
# Look up product data from state store (cached database data)
product_data_str = product_reference_store.get(product_id)
if product_data_str:
# Parse JSON string to get product data
product_data = json.loads(product_data_str)
# Calculate total price for purchase events
unit_price = product_data.get("price", 0.0)
total_price = unit_price * quantity if event_type in ["purchased", "added_to_cart"] else 0.0
# Create enriched event with product details
enriched_event = {
"event_id": event_id,
"event_type": event_type,
"product_id": product_id,
"quantity": quantity,
"timestamp": timestamp,
"user_id": user_id,
"session_id": session_id,
"page_location": page_location,
"enriched_data": {
"name": product_data.get("name", "Unknown"),
"category": product_data.get("category", "Unknown"),
"unit_price": unit_price,
"total_price": total_price
},
"cache_hit": True
}
log.info("Enriched {} event for product: {} ({})", event_type, product_data.get("name"), product_id)
return enriched_event
else:
# Product not found in cache
fallback_event = {
"event_id": event_id,
"event_type": event_type,
"product_id": product_id,
"quantity": quantity,
"timestamp": timestamp,
"user_id": user_id,
"session_id": session_id,
"page_location": page_location,
"enriched_data": {
"name": "Unknown Product",
"category": "Unknown",
"unit_price": 0.0,
"total_price": 0.0
},
"cache_hit": False
}
log.warn("Product not found in reference data: {}", product_id)
return fallback_event
expression: result if result else None
resultType: json
stores:
- product_reference_store
pipelines:
# Process product events with database lookup
product_enrichment_pipeline:
from: product_events
via:
- type: peek
forEach: load_product_reference_data
- type: mapValues
mapper: enrich_with_product_data
- type: filter
if:
expression: value is not None
to:
topic: enriched_product_events
keyType: string
valueType: json
Key concepts demonstrated:
- Loading reference data into state stores as cache
- Fast local lookups without external database calls
- JSON enrichment with cached data
Async Integration Pattern
The async integration pattern uses separate Kafka topics for communication with external systems, providing loose coupling and better resilience.
What it does:
- Produces order events: Creates orders with status (created, paid, shipped) containing customer info, amounts, payment methods, business context
- Filters paid orders: Only processes orders with status="paid", creates external payment processing requests with correlation IDs
- Sends to request topic: Outputs JSON payment requests to external_requests topic with order details, customer tier, processing priority
- Processes responses: Reads responses from external_responses topic, matches by correlation_id, handles success/failure scenarios
- Outputs results: Returns JSON combining original order with external processing results, transaction IDs, and follow-up actions
Key concepts demonstrated:
- Creating request topics for external system communication with JSON payloads
- Generating correlation IDs for request-response tracking
- Processing responses asynchronously through separate topics with structured data
- Building event-driven integration patterns using JSON messaging
Order Events Producer (async integration demo) - click to expand
# Producer for async integration demo - generates order events
functions:
generate_orders:
type: generator
globalCode: |
import random
import time
order_counter = 0
customer_ids = ["CUST001", "CUST002", "CUST003", "CUST004"]
statuses = ["created", "paid", "shipped", "delivered"]
code: |
global order_counter, customer_ids, statuses
order_counter += 1
order_id = f"ORDER_{order_counter:04d}"
customer_id = random.choice(customer_ids)
status = random.choice(statuses)
amount = round(random.uniform(10.0, 500.0), 2)
current_timestamp = int(time.time() * 1000)
# Create structured JSON order event for better readability in Kowl UI
order_event = {
"order_id": order_id,
"customer_id": customer_id,
"status": status,
"amount": amount,
"timestamp": current_timestamp,
"sequence_number": order_counter,
"order_details": {
"currency": "USD",
"payment_method": random.choice(["credit_card", "debit_card", "paypal", "bank_transfer"]),
"shipping_method": random.choice(["standard", "express", "overnight", "pickup"]),
"order_source": random.choice(["web", "mobile", "phone", "store"])
},
"customer_info": {
"customer_tier": random.choice(["bronze", "silver", "gold", "platinum"]),
"loyalty_points": random.randint(0, 5000),
"previous_orders": random.randint(0, 50)
},
"fulfillment": {
"warehouse_id": f"WH_{random.randint(1, 5):02d}",
"estimated_ship_date": current_timestamp + random.randint(1, 7) * 24 * 60 * 60 * 1000, # 1-7 days
"shipping_address": {
"country": random.choice(["US", "CA", "UK", "DE", "FR"]),
"region": random.choice(["North", "South", "East", "West", "Central"])
}
},
"business_context": {
"requires_external_processing": status == "paid",
"high_value": amount > 200,
"priority": "high" if amount > 300 else "normal",
"async_processing_needed": True
},
"metadata": {
"simulation": True,
"async_integration": True,
"correlation_id": f"corr_{order_id}_{current_timestamp}",
"processing_version": "1.0"
}
}
expression: (order_id, order_event)
resultType: (string, json)
producers:
order_producer:
generator: generate_orders
interval: 3s
to:
topic: order_events
keyType: string
valueType: json
Async Integration Processor (request-response pattern) - click to expand
# Processor demonstrating async integration pattern
streams:
order_events:
topic: order_events
keyType: string
valueType: json
external_requests:
topic: external_requests
keyType: string
valueType: json
functions:
filter_paid_orders:
type: predicate
code: |
# Extract status from JSON order event
if not value:
return False
status = value.get("status")
# Only process 'paid' orders
return status == "paid"
expression: result
create_external_request:
type: keyValueTransformer
code: |
import time
# Extract fields from JSON order event
if not value:
return None
order_id = value.get("order_id")
customer_id = value.get("customer_id")
status = value.get("status")
amount = value.get("amount")
timestamp = value.get("timestamp")
sequence_number = value.get("sequence_number")
order_details = value.get("order_details", {})
customer_info = value.get("customer_info", {})
fulfillment = value.get("fulfillment", {})
business_context = value.get("business_context", {})
metadata = value.get("metadata", {})
# Create comprehensive request for external payment processing system
request_id = f"REQ_{order_id}_{timestamp}"
external_request = {
"request_id": request_id,
"request_type": "PAYMENT_PROCESSING",
"original_order": {
"order_id": order_id,
"customer_id": customer_id,
"amount": amount,
"timestamp": timestamp,
"sequence_number": sequence_number
},
"payment_details": {
"amount": amount,
"currency": order_details.get("currency", "USD"),
"payment_method": order_details.get("payment_method"),
"customer_tier": customer_info.get("customer_tier"),
"loyalty_points": customer_info.get("loyalty_points")
},
"processing_context": {
"priority": business_context.get("priority", "normal"),
"high_value": business_context.get("high_value", False),
"customer_previous_orders": customer_info.get("previous_orders", 0),
"order_source": order_details.get("order_source")
},
"async_metadata": {
"correlation_id": metadata.get("correlation_id", f"corr_{request_id}"),
"created_at": int(time.time() * 1000),
"timeout_ms": 30000, # 30 second timeout
"retry_count": 0,
"expected_response_topic": "external_responses"
},
"external_system_info": {
"target_system": "payment_processor",
"api_version": "v2.1",
"request_format": "async_json",
"callback_required": True
}
}
log.info("Created external payment request for order {}: amount=${:.2f}, priority={}",
order_id, amount, business_context.get("priority", "normal"))
return (request_id, external_request)
expression: result
resultType: (string, json)
process_external_response:
type: valueTransformer
globalCode: |
import time
import random
code: |
# Simulate processing external system response
# In real scenario, this would come from external system response topic
if not value:
return None
# Extract request information
request_id = value.get("request_id")
request_type = value.get("request_type")
original_order = value.get("original_order", {})
payment_details = value.get("payment_details", {})
processing_context = value.get("processing_context", {})
async_metadata = value.get("async_metadata", {})
# Simulate external system processing with realistic delays and outcomes
processing_time_ms = random.randint(100, 2000) # 0.1 to 2 seconds
success_rate = 0.9 if processing_context.get("high_value") else 0.95
is_successful = random.random() < success_rate
current_timestamp = int(time.time() * 1000)
# Create comprehensive response
response_data = {
"response_id": f"RESP_{request_id}_{current_timestamp}",
"request_id": request_id,
"response_type": "PAYMENT_PROCESSING_RESULT",
"status": "success" if is_successful else "failed",
"original_request": {
"order_id": original_order.get("order_id"),
"customer_id": original_order.get("customer_id"),
"amount": payment_details.get("amount"),
"request_timestamp": original_order.get("timestamp")
},
"processing_result": {
"transaction_id": f"TXN_{current_timestamp}_{random.randint(1000, 9999)}" if is_successful else None,
"authorization_code": f"AUTH_{random.randint(100000, 999999)}" if is_successful else None,
"processing_status": "approved" if is_successful else "declined",
"reason_code": "000" if is_successful else random.choice(["051", "061", "065", "075"]),
"reason_message": "Transaction approved" if is_successful else random.choice([
"Insufficient funds", "Invalid card", "Expired card", "Fraud suspected"
])
},
"financial_details": {
"processed_amount": payment_details.get("amount") if is_successful else 0.0,
"currency": payment_details.get("currency", "USD"),
"fee_amount": round(payment_details.get("amount", 0) * 0.029, 2) if is_successful else 0.0, # 2.9% fee
"settlement_date": current_timestamp + 86400000 if is_successful else None # Next day
},
"system_metadata": {
"external_system": "payment_processor_v2.1",
"processing_time_ms": processing_time_ms,
"processed_at": current_timestamp,
"correlation_id": async_metadata.get("correlation_id"),
"retry_attempt": async_metadata.get("retry_count", 0) + 1,
"final_response": True
},
"business_context": {
"customer_impact": "order_confirmed" if is_successful else "order_cancelled",
"requires_notification": True,
"follow_up_actions": ["send_confirmation_email", "update_inventory"] if is_successful else ["send_decline_email", "release_inventory"],
"priority_level": processing_context.get("priority", "normal")
}
}
log.info("Processed external system response for order {}: status={}, amount=${:.2f}",
original_order.get("order_id"), response_data["status"], payment_details.get("amount", 0))
return response_data
expression: result
resultType: json
pipelines:
# Send requests to external system
external_request_pipeline:
from: order_events
via:
- type: filter
if: filter_paid_orders
- type: transformKeyValue
mapper: create_external_request
to:
topic: external_requests
keyType: string
valueType: json
# Process responses from external system (mock processing)
external_response_pipeline:
from: external_requests
via:
- type: mapValues
mapper: process_external_response
to:
topic: external_responses
keyType: string
valueType: json
Async integration features:
- Loose coupling: External systems communicate via topics, not direct calls
- Scalability: Multiple consumers can process requests and responses
- Reliability: Messages are persisted in Kafka topics
- Monitoring: Easy to monitor request/response flows through topic metrics
Key Metrics to Track
Monitor these metrics to ensure integration health:
- Track latency of external API calls and database queries
- Monitor the percentage of successful external system interactions
- Track different types of errors (timeouts, authentication, etc.)
- For lookup patterns, monitor state store cache hit rates
- For async patterns, monitor request and response topic lag
Alert Configuration
Set up alerts for integration issues:
alerts:
high_api_latency:
condition: avg_api_response_time > 5000ms
duration: 2_minutes
severity: warning
external_system_down:
condition: api_error_rate > 50%
duration: 1_minute
severity: critical
cache_miss_rate_high:
condition: cache_miss_rate > 20%
duration: 5_minutes
severity: warning
Advanced Integration Patterns
Hybrid Event-Database Pattern
Combine streaming events with database lookups for complex business logic:
functions:
hybrid_processor:
type: valueTransformer
code: |
# Process streaming event
event_data = parse_event(value)
# Look up reference data from cached database
reference_data = get_reference_data(event_data.entity_id)
# Apply business rules using both streaming and reference data
result = apply_business_rules(event_data, reference_data)
# Optionally trigger external API call based on result
if result.requires_notification:
send_notification(result)
return result
Multi-System Coordination
Coordinate operations across multiple external systems:
functions:
multi_system_coordinator:
type: keyValueTransformer
code: |
# Create requests for multiple external systems
payment_request = create_payment_request(value)
inventory_request = create_inventory_request(value)
# Use correlation ID to track related requests
correlation_id = generate_correlation_id()
# Return multiple outputs for different external systems
return [
(f"payment_{correlation_id}", payment_request),
(f"inventory_{correlation_id}", inventory_request)
]
Conclusion
External integration is vital for stream processing. Using API enrichment, database lookups, and async integration, you can build scalable KSML applications that enhance streaming data without sacrificing performance.
Choose the right pattern: API enrichment for real-time data, database lookups for reference data, and async integration for multi-system workflows.