Complex Event Processing in KSML
This tutorial explores how to implement complex event processing (CEP) patterns in KSML, allowing you to detect meaningful patterns across multiple events and streams in real-time.
Introduction to Complex Event Processing
Complex Event Processing (CEP) is a method of tracking and analyzing streams of data to identify patterns, correlate events, and derive higher-level insights. CEP enables real-time decision making by processing events as they occur rather than in batch.
Key capabilities of CEP in KSML:
- Pattern detection: Identify sequences of events that form meaningful patterns
- Temporal analysis: Detect time-based patterns and relationships
- Event correlation: Connect related events from different sources
- Anomaly detection: Identify unusual patterns or deviations
- Stateful processing: Maintain context across multiple events
Prerequisites
Before starting this tutorial:
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
# Pattern Detection
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic pattern_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic detected_patterns && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temporal_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temporal_patterns && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic system_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic correlated_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_metrics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic anomalies_detected && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic credit_card_transactions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic fraud_alerts && \
Pattern Detection
Pattern detection identifies specific sequences of events within a stream. This example detects an A→B→C pattern across events.
What it does:
- Produces events: Creates events with types A, B, C, D, E - deliberately generates A→B→C sequences for session_001 to demonstrate pattern completion
- Tracks sequences: Uses a state store to remember where each session is in the A→B→C pattern (stores "A", "AB", or deletes when complete)
- Detects completion: When event C arrives and the state shows "AB", it recognizes the full A→B→C pattern is complete
- Outputs results: Only emits a detection message when the complete pattern A→B→C is found, otherwise filters out partial matches
- Resets state: Clears the pattern tracking after successful detection or if the sequence breaks (e.g., gets A→D instead of A→B)
Pattern Events Producer - click to expand
# Producer for pattern detection example - generates event sequences
functions:
generate_pattern_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
patterns = ["A", "B", "C", "D", "E"]
sessions = ["session_001", "session_002", "session_003"]
code: |
global event_counter, patterns, sessions
event_counter += 1
session_id = random.choice(sessions)
# Create pattern events - generate A->B->C sequence for specific sessions
if event_counter % 9 == 1:
event_type = "A"
session_id = "session_001" # Force same session for pattern
elif event_counter % 9 == 2:
event_type = "B"
session_id = "session_001" # Same session
elif event_counter % 9 == 3:
event_type = "C"
session_id = "session_001" # Complete the pattern
elif event_counter % 9 in [4, 5]: # Partial pattern
event_type = ["A", "B"][event_counter % 9 - 4]
else:
event_type = random.choice(["D", "E"]) # Other events
# Create structured JSON event for better readability in Kowl UI
event = {
"event_id": f"evt_{event_counter:04d}",
"session_id": session_id,
"event_type": event_type,
"timestamp": int(time.time() * 1000),
"data": f"event_data_{event_type}",
"source": "pattern_generator",
"sequence_number": event_counter,
"metadata": {
"simulation": True,
"pattern_type": "abc_sequence"
}
}
key = session_id
value = event
expression: (key, value)
resultType: (string, json)
producers:
pattern_event_producer:
generator: generate_pattern_events
interval: 1s
to:
topic: pattern_events
keyType: string
valueType: json
Pattern Detection Processor - click to expand
# Processor for pattern detection - detects A->B->C sequences
streams:
pattern_events:
topic: pattern_events
keyType: string
valueType: json
stores:
pattern_tracker:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
functions:
detect_abc_pattern:
type: valueTransformer
globalCode: |
import time
stores:
- pattern_tracker
code: |
# Extract fields from JSON event
event_type = value.get("event_type")
event_id = value.get("event_id")
session_id = value.get("session_id")
timestamp = value.get("timestamp")
sequence_number = value.get("sequence_number")
current_time = int(time.time() * 1000)
if not event_type or not event_id:
return None
# Get current pattern state
current_state = pattern_tracker.get(key) or ""
# Check for pattern completion
if event_type == "A":
# Start new pattern
pattern_tracker.put(key, "A")
log.info("Pattern started for {}: A", key)
return None
elif event_type == "B" and current_state == "A":
# Continue pattern
pattern_tracker.put(key, "AB")
log.info("Pattern progressing for {}: A->B", key)
return None
elif event_type == "C" and current_state == "AB":
# Pattern complete!
pattern_tracker.delete(key)
log.info("PATTERN DETECTED for {}: A->B->C", key)
# Create structured JSON result
return {
"pattern_type": "ABC_SEQUENCE",
"status": "DETECTED",
"session_id": session_id,
"completing_event": {
"event_id": event_id,
"event_type": event_type,
"timestamp": timestamp,
"sequence_number": sequence_number
},
"detection_timestamp": current_time,
"processing_time": current_time - timestamp,
"metadata": {
"pattern_duration": current_time - timestamp,
"detector": "abc_pattern_detector"
}
}
else:
# Reset on invalid sequence
if current_state:
log.info("Pattern broken for {}, was: {}, got: {}", key, current_state, event_type)
pattern_tracker.delete(key)
return None
expression: result if result else None
resultType: json
pipelines:
pattern_detection_pipeline:
from: pattern_events
via:
- type: mapValues
mapper: detect_abc_pattern
- type: filter
if:
expression: value is not None
to:
topic: detected_patterns
keyType: string
valueType: json
Key concepts demonstrated:
- State store usage for tracking partial patterns
- Sequential event processing
- Pattern completion and reset logic
Temporal Pattern Matching
Temporal patterns add time constraints to event sequences. This example detects quick checkout behavior (cart→checkout within 5 minutes).
What it does:
- Produces shopping events: Creates events like "add_to_cart" and "checkout" with realistic timestamps and shopping details
- Stores cart events: When "add_to_cart" happens, saves the cart timestamp and details in state store as JSON
- Measures time gaps: When "checkout" arrives, calculates milliseconds between cart and checkout events
- Classifies by speed: If checkout happens within 5 minutes (300,000ms) = "QUICK_CHECKOUT", otherwise "SLOW_CHECKOUT"
- Outputs results: Only emits a message when both cart and checkout events are found, showing the time difference and classification
Temporal Events Producer - click to expand
# Producer for temporal pattern matching - generates time-sensitive events
functions:
generate_temporal_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
users = ["user_001", "user_002", "user_003"]
actions = ["login", "view_product", "add_to_cart", "checkout", "logout"]
code: |
global event_counter, users, actions
event_counter += 1
user_id = random.choice(users)
# Create temporal sequences
if event_counter % 5 == 1:
action = "login"
elif event_counter % 5 == 2:
action = "view_product"
elif event_counter % 5 == 3:
action = "add_to_cart"
elif event_counter % 5 == 4:
action = "checkout" # Should happen within 5 minutes of add_to_cart
else:
action = random.choice(actions)
current_timestamp = int(time.time() * 1000)
# Create structured JSON event for better readability in Kowl UI
event = {
"user_id": user_id,
"action": action,
"timestamp": current_timestamp,
"event_id": f"evt_{event_counter:04d}",
"session_id": f"session_{user_id}_{event_counter // 10}",
"sequence_number": event_counter,
"source": "temporal_generator",
"metadata": {
"simulation": True,
"pattern_type": "temporal_checkout",
"time_window_minutes": 5
}
}
# Add action-specific data
if action == "view_product":
event["product_id"] = f"prod_{random.randint(100, 999)}"
elif action == "add_to_cart":
event["product_id"] = f"prod_{random.randint(100, 999)}"
event["quantity"] = random.randint(1, 5)
event["price"] = round(random.uniform(10, 100), 2)
elif action == "checkout":
event["total_amount"] = round(random.uniform(50, 500), 2)
event["payment_method"] = random.choice(["credit_card", "paypal", "bank_transfer"])
expression: (user_id, event)
resultType: (string, json)
producers:
temporal_event_producer:
generator: generate_temporal_events
interval: 2s
to:
topic: temporal_events
keyType: string
valueType: json
Temporal Pattern Processor - click to expand
# Processor for temporal pattern matching - detects actions within time windows
streams:
temporal_events:
topic: temporal_events
keyType: string
valueType: json
stores:
temporal_state:
type: keyValue
keyType: string
valueType: string
persistent: true
functions:
detect_temporal_pattern:
type: valueTransformer
globalCode: |
import time
import json
stores:
- temporal_state
code: |
# Extract fields from JSON event
action = value.get("action")
event_time = value.get("timestamp")
user_id = value.get("user_id")
event_id = value.get("event_id")
session_id = value.get("session_id")
current_time = int(time.time() * 1000)
if not action or not event_time:
return None
# Get stored state
state = temporal_state.get(key)
if action == "add_to_cart":
# Store cart event with detailed JSON state
cart_state = {
"event": "add_to_cart",
"timestamp": event_time,
"event_id": event_id,
"session_id": session_id,
"product_id": value.get("product_id"),
"quantity": value.get("quantity"),
"price": value.get("price")
}
temporal_state.put(key, json.dumps(cart_state))
log.info("Cart event for {}: {}", key, event_time)
return None
elif action == "checkout" and state:
# Parse stored cart state
try:
cart_data = json.loads(state)
if cart_data.get("event") == "add_to_cart":
cart_time = cart_data.get("timestamp")
time_diff = event_time - cart_time
# Create structured result
result = {
"pattern_type": "TEMPORAL_CHECKOUT",
"user_id": user_id,
"session_id": session_id,
"cart_event": {
"event_id": cart_data.get("event_id"),
"timestamp": cart_time,
"product_id": cart_data.get("product_id"),
"quantity": cart_data.get("quantity"),
"price": cart_data.get("price")
},
"checkout_event": {
"event_id": event_id,
"timestamp": event_time,
"total_amount": value.get("total_amount"),
"payment_method": value.get("payment_method")
},
"time_difference_ms": time_diff,
"time_difference_minutes": round(time_diff / 60000, 2),
"detection_timestamp": current_time,
"processing_time": current_time - event_time
}
if time_diff <= 300000: # Within 5 minutes
temporal_state.delete(key)
result["status"] = "QUICK_CHECKOUT"
result["is_quick"] = True
log.info("QUICK CHECKOUT detected for {} in {}ms", key, time_diff)
return result
else:
temporal_state.delete(key)
result["status"] = "SLOW_CHECKOUT"
result["is_quick"] = False
log.info("Slow checkout for {} ({}ms)", key, time_diff)
return result
except (json.JSONDecodeError, KeyError) as e:
log.warn("Error parsing cart state for {}: {}", key, str(e))
temporal_state.delete(key)
return None
expression: result if result else None
resultType: json
pipelines:
temporal_pattern_pipeline:
from: temporal_events
via:
- type: mapValues
mapper: detect_temporal_pattern
- type: filter
if:
expression: value is not None
to:
topic: temporal_patterns
keyType: string
valueType: json
Key concepts demonstrated:
- Time-based pattern constraints
- Timestamp extraction and comparison
- Temporal window analysis
Event Correlation
Event correlation combines related events from different streams to provide enriched context.
What it does:
- Produces two event streams: Creates user events (page_view, click, form_submit) and system events (api_call, db_query, error) with the same user IDs
- Joins streams by user: Uses leftJoin to connect system events with the latest user activity for each user ID
- Detects specific patterns: Looks for meaningful combinations like "form_submit + error", "page_view + db_query", or "click + api_call"
- Measures timing: Calculates milliseconds between user action and system response to determine correlation strength (HIGH/MEDIUM)
- Outputs correlations: Only emits results when it finds the specific patterns, showing both events with timing analysis and relationship details
Correlation Events Producer (generates both user and system events) - click to expand
# Producer for event correlation - generates events from multiple sources
functions:
generate_user_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
users = ["alice", "bob", "charlie"]
code: |
global event_counter, users
event_counter += 1
user_id = random.choice(users)
# Generate user activity with structured JSON
activity = random.choice(["page_view", "click", "scroll", "form_submit"])
current_timestamp = int(time.time() * 1000)
event = {
"event_type": "user_activity",
"user_id": user_id,
"activity": activity,
"timestamp": current_timestamp,
"event_id": f"user_evt_{event_counter:04d}",
"session_id": f"session_{user_id}_{event_counter // 5}",
"source": "user_interface",
"metadata": {
"simulation": True,
"correlation_type": "user_system"
}
}
# Add activity-specific data
if activity == "page_view":
event["page_url"] = f"/page_{random.randint(1, 10)}"
event["referrer"] = random.choice(["direct", "search", "social"])
elif activity == "click":
event["element_id"] = f"btn_{random.randint(1, 5)}"
event["element_type"] = random.choice(["button", "link", "image"])
elif activity == "form_submit":
event["form_id"] = f"form_{random.randint(1, 3)}"
event["form_data_length"] = random.randint(10, 100)
expression: (user_id, event)
resultType: (string, json)
generate_system_events:
type: generator
globalCode: |
import random
import time
event_counter = 0
users = ["alice", "bob", "charlie"]
code: |
global event_counter, users
event_counter += 1
user_id = random.choice(users)
# Generate system events with structured JSON
event_type = random.choice(["api_call", "db_query", "cache_hit", "error"])
current_timestamp = int(time.time() * 1000)
event = {
"event_type": "system_event",
"system_event": event_type,
"user_id": user_id,
"timestamp": current_timestamp,
"event_id": f"sys_evt_{event_counter:04d}",
"source": "backend_system",
"server_id": f"server_{random.randint(1, 3)}",
"metadata": {
"simulation": True,
"correlation_type": "user_system"
}
}
# Add event-specific data
if event_type == "api_call":
event["endpoint"] = f"/api/v1/resource_{random.randint(1, 5)}"
event["response_time_ms"] = random.randint(50, 500)
event["status_code"] = random.choice([200, 201, 400, 500])
elif event_type == "db_query":
event["table_name"] = random.choice(["users", "orders", "products"])
event["query_time_ms"] = random.randint(10, 200)
event["rows_affected"] = random.randint(1, 100)
elif event_type == "error":
event["error_code"] = random.choice(["E001", "E002", "E003"])
event["error_message"] = f"Error in operation {random.randint(1, 10)}"
event["severity"] = random.choice(["warning", "error", "critical"])
elif event_type == "cache_hit":
event["cache_key"] = f"cache_{random.randint(1, 20)}"
event["cache_type"] = random.choice(["redis", "memcached"])
expression: (user_id, event)
resultType: (string, json)
producers:
user_event_producer:
generator: generate_user_events
interval: 2s
to:
topic: user_events
keyType: string
valueType: json
system_event_producer:
generator: generate_system_events
interval: 3s
to:
topic: system_events
keyType: string
valueType: json
Event Correlation Processor - click to expand
# Processor for event correlation - correlates user and system events
streams:
user_events:
topic: user_events
keyType: string
valueType: json
system_events:
topic: system_events
keyType: string
valueType: json
tables:
user_context:
topic: user_events
keyType: string
valueType: json
functions:
join_events:
type: valueJoiner
globalCode: |
import time
code: |
# Join system event with user context - create enriched JSON
current_time = int(time.time() * 1000)
# Extract system event details
system_event = value1.get("system_event") if value1 else "unknown"
system_timestamp = value1.get("timestamp") if value1 else current_time
system_event_id = value1.get("event_id") if value1 else "unknown"
# Create enriched event with correlation data
enriched_event = {
"correlation_id": f"corr_{system_event_id}",
"system_event": value1 if value1 else {},
"user_context": value2 if value2 else {},
"correlation_timestamp": current_time,
"has_user_context": bool(value2),
"time_since_user_activity": current_time - (value2.get("timestamp", current_time) if value2 else current_time)
}
expression: enriched_event
resultType: json
correlate_events:
type: valueTransformer
globalCode: |
import time
code: |
# Extract correlation data from enriched event
if not value:
return None
system_event = value.get("system_event", {})
user_context = value.get("user_context", {})
correlation_id = value.get("correlation_id", "unknown")
has_user_context = value.get("has_user_context", False)
time_since_user_activity = value.get("time_since_user_activity", 0)
current_time = int(time.time() * 1000)
# Extract system and user event details
system_event_type = system_event.get("system_event", "unknown")
user_activity = user_context.get("activity", "none") if has_user_context else "none"
# Define correlation patterns and create structured results
correlation_result = None
if system_event_type == "error" and user_activity == "form_submit":
log.warn("ERROR after form submit for user {}", key)
correlation_result = {
"pattern_type": "FORM_ERROR_CORRELATION",
"status": "CRITICAL",
"description": "System error occurred after user form submission",
"correlation_id": correlation_id,
"user_id": key,
"system_event": {
"type": system_event_type,
"event_id": system_event.get("event_id"),
"timestamp": system_event.get("timestamp"),
"error_details": {
"error_code": system_event.get("error_code"),
"error_message": system_event.get("error_message"),
"severity": system_event.get("severity")
}
},
"user_activity": {
"type": user_activity,
"event_id": user_context.get("event_id"),
"timestamp": user_context.get("timestamp"),
"form_details": {
"form_id": user_context.get("form_id"),
"form_data_length": user_context.get("form_data_length")
}
},
"time_correlation": {
"time_since_user_activity_ms": time_since_user_activity,
"correlation_strength": "HIGH" if time_since_user_activity < 5000 else "MEDIUM"
}
}
elif system_event_type == "db_query" and user_activity == "page_view":
log.info("DB query triggered by page view for {}", key)
correlation_result = {
"pattern_type": "PAGE_LOAD_CORRELATION",
"status": "NORMAL",
"description": "Database query triggered by user page view",
"correlation_id": correlation_id,
"user_id": key,
"system_event": {
"type": system_event_type,
"event_id": system_event.get("event_id"),
"timestamp": system_event.get("timestamp"),
"db_details": {
"table_name": system_event.get("table_name"),
"query_time_ms": system_event.get("query_time_ms"),
"rows_affected": system_event.get("rows_affected")
}
},
"user_activity": {
"type": user_activity,
"event_id": user_context.get("event_id"),
"timestamp": user_context.get("timestamp"),
"page_details": {
"page_url": user_context.get("page_url"),
"referrer": user_context.get("referrer")
}
},
"time_correlation": {
"time_since_user_activity_ms": time_since_user_activity,
"correlation_strength": "HIGH" if time_since_user_activity < 2000 else "MEDIUM"
}
}
elif system_event_type == "api_call" and user_activity == "click":
log.info("API call from user interaction for {}", key)
correlation_result = {
"pattern_type": "USER_API_CORRELATION",
"status": "NORMAL",
"description": "API call triggered by user click interaction",
"correlation_id": correlation_id,
"user_id": key,
"system_event": {
"type": system_event_type,
"event_id": system_event.get("event_id"),
"timestamp": system_event.get("timestamp"),
"api_details": {
"endpoint": system_event.get("endpoint"),
"response_time_ms": system_event.get("response_time_ms"),
"status_code": system_event.get("status_code")
}
},
"user_activity": {
"type": user_activity,
"event_id": user_context.get("event_id"),
"timestamp": user_context.get("timestamp"),
"click_details": {
"element_id": user_context.get("element_id"),
"element_type": user_context.get("element_type")
}
},
"time_correlation": {
"time_since_user_activity_ms": time_since_user_activity,
"correlation_strength": "HIGH" if time_since_user_activity < 1000 else "MEDIUM"
}
}
if correlation_result:
correlation_result["detection_timestamp"] = current_time
correlation_result["processing_time"] = current_time - system_event.get("timestamp", current_time)
correlation_result["metadata"] = {
"detector": "event_correlation_processor",
"version": "1.0"
}
return correlation_result
expression: result if result else None
resultType: json
pipelines:
correlation_pipeline:
from: system_events
via:
- type: leftJoin
table: user_context
valueJoiner: join_events
- type: mapValues
mapper: correlate_events
- type: filter
if:
expression: value is not None
to:
topic: correlated_events
keyType: string
valueType: json
Key concepts demonstrated:
- Stream-table joins for context enrichment
- Cross-stream event correlation
- Cause-effect relationship detection
Anomaly Detection
Anomaly detection identifies unusual patterns using statistical analysis.
What it does:
- Produces sensor readings: Creates temperature values that are normally 40-60°C, but every 20th reading is a spike (90-100°C) and every 25th is a drop (0-10°C)
- Tracks statistics per sensor: Stores running count, sum, sum-of-squares, min, max in state store to calculate mean and standard deviation
- Calculates z-scores: After 10+ readings, computes how many standard deviations each new reading is from the mean
- Detects outliers: When z-score > 3.0, flags as anomaly (spike if above mean, drop if below mean)
- Outputs anomalies: Only emits detection messages when statistical threshold is exceeded, showing z-score, mean, and severity level
Metrics Producer (with occasional anomalies) - click to expand
# Producer for anomaly detection - generates metrics with occasional anomalies
functions:
generate_metrics:
type: generator
globalCode: |
import random
import time
import math
metric_counter = 0
sensors = ["sensor_001", "sensor_002", "sensor_003"]
code: |
global metric_counter, sensors
metric_counter += 1
sensor_id = random.choice(sensors)
# Generate normal values with occasional anomalies
if metric_counter % 20 == 0:
# Anomaly - spike
value = random.uniform(90, 100)
elif metric_counter % 25 == 0:
# Anomaly - drop
value = random.uniform(0, 10)
else:
# Normal range with some variation
base = 50 + 10 * math.sin(metric_counter / 10)
value = base + random.uniform(-5, 5)
current_timestamp = int(time.time() * 1000)
# Create structured JSON metric for better readability in Kowl UI
metric = {
"sensor_id": sensor_id,
"value": round(value, 2),
"timestamp": current_timestamp,
"metric_id": f"metric_{metric_counter:04d}",
"sensor_type": "temperature",
"unit": "celsius",
"location": f"zone_{random.randint(1, 5)}",
"source": "sensor_network",
"metadata": {
"simulation": True,
"anomaly_detection": True,
"normal_range": [40, 60]
}
}
# Add anomaly indicators for testing
if metric_counter % 20 == 0:
metric["anomaly_type"] = "spike"
metric["expected_anomaly"] = True
elif metric_counter % 25 == 0:
metric["anomaly_type"] = "drop"
metric["expected_anomaly"] = True
else:
metric["expected_anomaly"] = False
expression: (sensor_id, metric)
resultType: (string, json)
producers:
metrics_producer:
generator: generate_metrics
interval: 1s
to:
topic: sensor_metrics
keyType: string
valueType: json
Anomaly Detection Processor - click to expand
# Processor for anomaly detection using statistical analysis
streams:
sensor_metrics:
topic: sensor_metrics
keyType: string
valueType: json
stores:
stats_store:
type: keyValue
keyType: string
valueType: string
persistent: true
functions:
detect_anomalies:
type: valueTransformer
stores:
- stats_store
code: |
import json
# Extract fields from JSON metric
current_value = value.get("value")
timestamp = value.get("timestamp")
sensor_id = value.get("sensor_id")
metric_id = value.get("metric_id")
if current_value is None or timestamp is None:
return None
# Get or initialize statistics
stats_json = stats_store.get(key)
if stats_json:
stats = json.loads(stats_json)
else:
stats = {"count": 0, "sum": 0, "sum_sq": 0, "min": current_value, "max": current_value}
# Calculate running statistics
n = stats["count"]
if n > 10: # Need enough samples
mean = stats["sum"] / n
variance = (stats["sum_sq"] / n) - (mean * mean)
std_dev = variance ** 0.5 if variance > 0 else 1
# Detect anomalies (values outside 3 standard deviations)
z_score = abs(current_value - mean) / std_dev if std_dev > 0 else 0
if z_score > 3:
log.warn("ANOMALY detected for {}: value={}, mean={:.2f}, z_score={:.2f}",
key, current_value, mean, z_score)
anomaly_type = "spike" if current_value > mean else "drop"
result = {
"anomaly_type": "STATISTICAL_ANOMALY",
"status": "DETECTED",
"pattern": anomaly_type,
"sensor_id": sensor_id,
"metric_id": metric_id,
"anomaly_value": current_value,
"statistical_analysis": {
"mean": round(mean, 2),
"std_dev": round(std_dev, 2),
"z_score": round(z_score, 2),
"threshold": 3.0,
"sample_count": n + 1
},
"detection_timestamp": timestamp,
"sensor_metadata": {
"sensor_type": value.get("sensor_type"),
"location": value.get("location"),
"unit": value.get("unit")
},
"severity": "HIGH" if z_score > 5 else "MEDIUM"
}
else:
result = None
else:
result = None
# Update statistics
stats["count"] = n + 1
stats["sum"] += current_value
stats["sum_sq"] += current_value * current_value
stats["min"] = min(stats["min"], current_value)
stats["max"] = max(stats["max"], current_value)
# Store updated stats
stats_store.put(key, json.dumps(stats))
return result
expression: result if result else None
resultType: json
pipelines:
anomaly_detection_pipeline:
from: sensor_metrics
via:
- type: mapValues
mapper: detect_anomalies
- type: filter
if:
expression: value is not None
to:
topic: anomalies_detected
keyType: string
valueType: json
Key concepts demonstrated:
- Statistical anomaly detection (z-score)
- Running statistics calculation
- Threshold-based alerting
Fraud Detection System
A practical example combining multiple CEP techniques for real-time fraud detection.
What it does:
- Produces transactions: Creates credit card purchases with amounts, merchants, locations - deliberately generates suspicious patterns (high amounts every 15th, rapid transactions every 20th)
- Stores transaction history: Keeps last location, timestamp, and totals for each card number in state store
- Scores fraud risk: Adds points for patterns: +40 for amounts >$5000, +30 for transactions <60s apart, +20 for location changes <1hr, +20 for suspicious merchants
- Classifies threats: If score ≥70 = "FRAUD_ALERT", if 30-69 = "SUSPICIOUS_TRANSACTION", otherwise no output
- Outputs alerts: Only emits results when fraud score thresholds are met, showing detected patterns, risk factors, and recommended actions
Credit Card Transactions Producer - click to expand
# Producer for fraud detection - generates credit card transactions
functions:
generate_transactions:
type: generator
globalCode: |
import random
import time
txn_counter = 0
cards = ["4111-1111-1111-1111", "4222-2222-2222-2222", "4333-3333-3333-3333"]
merchants = ["grocery_store", "gas_station", "online_shop", "restaurant", "atm_withdrawal"]
locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]
code: |
global txn_counter, cards, merchants, locations
txn_counter += 1
card_number = random.choice(cards)
current_timestamp = int(time.time() * 1000)
# Generate transaction with occasional suspicious patterns
if txn_counter % 15 == 0:
# Suspicious: high amount
amount = random.uniform(5000, 10000)
merchant = "luxury_goods"
is_suspicious = True
risk_category = "high_amount"
elif txn_counter % 20 == 0:
# Suspicious: rapid transactions
amount = random.uniform(100, 500)
merchant = "atm_withdrawal"
is_suspicious = True
risk_category = "rapid_transaction"
else:
# Normal transaction
amount = random.uniform(10, 500)
merchant = random.choice(merchants)
is_suspicious = False
risk_category = "normal"
location = random.choice(locations)
# Create structured JSON transaction for better readability in Kowl UI
transaction = {
"transaction_id": f"txn_{txn_counter:06d}",
"card_number": card_number,
"amount": round(amount, 2),
"merchant": merchant,
"location": location,
"timestamp": current_timestamp,
"merchant_category": {
"grocery_store": "retail",
"gas_station": "fuel",
"online_shop": "ecommerce",
"restaurant": "food_service",
"atm_withdrawal": "cash",
"luxury_goods": "luxury"
}.get(merchant, "other"),
"transaction_type": "purchase" if merchant != "atm_withdrawal" else "withdrawal",
"currency": "USD",
"channel": "pos" if merchant in ["grocery_store", "gas_station", "restaurant"] else "online",
"metadata": {
"simulation": True,
"fraud_detection": True,
"is_suspicious": is_suspicious,
"risk_category": risk_category,
"sequence_number": txn_counter
}
}
expression: (card_number, transaction)
resultType: (string, json)
producers:
transaction_producer:
generator: generate_transactions
interval: 1s
to:
topic: credit_card_transactions
keyType: string
valueType: json
Fraud Detection Processor - click to expand
# Processor for fraud detection - analyzes transaction patterns
streams:
credit_card_transactions:
topic: credit_card_transactions
keyType: string
valueType: json
stores:
transaction_history:
type: keyValue
keyType: string
valueType: string
persistent: true
caching: true
functions:
detect_fraud:
type: valueTransformer
stores:
- transaction_history
code: |
import json
import time
# Extract fields from JSON transaction
if not value:
return None
amount = value.get("amount")
merchant = value.get("merchant")
location = value.get("location")
timestamp = value.get("timestamp")
transaction_id = value.get("transaction_id")
merchant_category = value.get("merchant_category")
transaction_type = value.get("transaction_type")
channel = value.get("channel")
if amount is None or merchant is None or location is None or timestamp is None:
return None
# Get transaction history
history_json = transaction_history.get(key)
if history_json:
history = json.loads(history_json)
else:
history = {"last_location": location, "last_time": timestamp, "txn_count": 0, "total_amount": 0}
# Check for fraud patterns
fraud_score = 0
fraud_reasons = []
fraud_patterns = []
# Pattern 1: High amount transaction
if amount > 5000:
fraud_score += 40
fraud_reasons.append(f"high_amount:{amount:.2f}")
fraud_patterns.append({
"type": "HIGH_AMOUNT",
"details": {"amount": amount, "threshold": 5000},
"risk_weight": 40
})
# Pattern 2: Rapid transactions (within 60 seconds)
time_diff = timestamp - history["last_time"]
if time_diff < 60000 and history["txn_count"] > 0:
fraud_score += 30
fraud_reasons.append(f"rapid_txn:{time_diff}ms")
fraud_patterns.append({
"type": "RAPID_TRANSACTION",
"details": {"time_difference_ms": time_diff, "threshold_ms": 60000},
"risk_weight": 30
})
# Pattern 3: Location change
if history["last_location"] != location and time_diff < 3600000: # Within 1 hour
fraud_score += 20
fraud_reasons.append(f"location_change:{history['last_location']}->{location}")
fraud_patterns.append({
"type": "LOCATION_CHANGE",
"details": {
"previous_location": history["last_location"],
"current_location": location,
"time_difference_ms": time_diff,
"threshold_ms": 3600000
},
"risk_weight": 20
})
# Pattern 4: Suspicious merchant
if merchant in ["luxury_goods", "online_gambling", "crypto_exchange"]:
fraud_score += 20
fraud_reasons.append(f"suspicious_merchant:{merchant}")
fraud_patterns.append({
"type": "SUSPICIOUS_MERCHANT",
"details": {"merchant": merchant, "merchant_category": merchant_category},
"risk_weight": 20
})
# Update history
history["last_location"] = location
history["last_time"] = timestamp
history["txn_count"] += 1
history["total_amount"] += amount
transaction_history.put(key, json.dumps(history))
# Generate structured alert if fraud score is high
result = None
if fraud_score >= 50:
log.warn("FRAUD ALERT for card {}: score={}, reasons={}", key, fraud_score, fraud_reasons)
result = {
"alert_type": "FRAUD_ALERT",
"status": "HIGH_RISK",
"card_number": key,
"fraud_score": fraud_score,
"severity": "CRITICAL",
"transaction": {
"transaction_id": transaction_id,
"amount": amount,
"merchant": merchant,
"merchant_category": merchant_category,
"location": location,
"timestamp": timestamp,
"transaction_type": transaction_type,
"channel": channel
},
"fraud_patterns": fraud_patterns,
"risk_analysis": {
"total_score": fraud_score,
"threshold_exceeded": "HIGH_RISK",
"patterns_detected": len(fraud_patterns),
"recommendation": "BLOCK_TRANSACTION"
},
"detection_timestamp": int(time.time() * 1000),
"cardholder_history": {
"previous_location": history.get("last_location"),
"transaction_count": history.get("txn_count", 0) + 1,
"total_spending": history.get("total_amount", 0) + amount
}
}
elif fraud_score >= 30:
log.info("Suspicious transaction for {}: score={}", key, fraud_score)
result = {
"alert_type": "SUSPICIOUS_TRANSACTION",
"status": "MEDIUM_RISK",
"card_number": key,
"fraud_score": fraud_score,
"severity": "WARNING",
"transaction": {
"transaction_id": transaction_id,
"amount": amount,
"merchant": merchant,
"merchant_category": merchant_category,
"location": location,
"timestamp": timestamp,
"transaction_type": transaction_type,
"channel": channel
},
"fraud_patterns": fraud_patterns,
"risk_analysis": {
"total_score": fraud_score,
"threshold_exceeded": "MEDIUM_RISK",
"patterns_detected": len(fraud_patterns),
"recommendation": "REVIEW_REQUIRED"
},
"detection_timestamp": int(time.time() * 1000),
"cardholder_history": {
"previous_location": history.get("last_location"),
"transaction_count": history.get("txn_count", 0) + 1,
"total_spending": history.get("total_amount", 0) + amount
}
}
return result
expression: result if result else None
resultType: json
pipelines:
fraud_detection_pipeline:
from: credit_card_transactions
via:
- type: mapValues
mapper: detect_fraud
- type: filter
if:
expression: value is not None
to:
topic: fraud_alerts
keyType: string
valueType: json
Key concepts demonstrated:
- Multi-factor pattern analysis
- Risk scoring algorithms
- Transaction velocity checks
- Geographic anomaly detection
Conclusion
Complex Event Processing in KSML provides capabilities for real-time pattern detection and analysis. By combining state management, temporal operations, and correlation techniques, you can build sophisticated event processing applications that derive actionable insights from streaming data.