Skip to content

Complex Event Processing in KSML

This tutorial explores how to implement complex event processing (CEP) patterns in KSML, allowing you to detect meaningful patterns across multiple events and streams in real-time.

Introduction to Complex Event Processing

Complex Event Processing (CEP) is a method of tracking and analyzing streams of data to identify patterns, correlate events, and derive higher-level insights. CEP enables real-time decision making by processing events as they occur rather than in batch.

Key capabilities of CEP in KSML:

  • Pattern detection: Identify sequences of events that form meaningful patterns
  • Temporal analysis: Detect time-based patterns and relationships
  • Event correlation: Connect related events from different sources
  • Anomaly detection: Identify unusual patterns or deviations
  • Stateful processing: Maintain context across multiple events

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
# Pattern Detection
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic pattern_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic detected_patterns && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temporal_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic temporal_patterns && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic system_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic correlated_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_metrics && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic anomalies_detected && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic credit_card_transactions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic fraud_alerts && \

Pattern Detection

Pattern detection identifies specific sequences of events within a stream. This example detects an A→B→C pattern across events.

What it does:

  • Produces events: Creates events with types A, B, C, D, E - deliberately generates A→B→C sequences for session_001 to demonstrate pattern completion
  • Tracks sequences: Uses a state store to remember where each session is in the A→B→C pattern (stores "A", "AB", or deletes when complete)
  • Detects completion: When event C arrives and the state shows "AB", it recognizes the full A→B→C pattern is complete
  • Outputs results: Only emits a detection message when the complete pattern A→B→C is found, otherwise filters out partial matches
  • Resets state: Clears the pattern tracking after successful detection or if the sequence breaks (e.g., gets A→D instead of A→B)
Pattern Events Producer - click to expand
# Producer for pattern detection example - generates event sequences

functions:
  generate_pattern_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      patterns = ["A", "B", "C", "D", "E"]
      sessions = ["session_001", "session_002", "session_003"]
    code: |
      global event_counter, patterns, sessions

      event_counter += 1
      session_id = random.choice(sessions)

      # Create pattern events - generate A->B->C sequence for specific sessions
      if event_counter % 9 == 1:
        event_type = "A"
        session_id = "session_001"  # Force same session for pattern
      elif event_counter % 9 == 2:
        event_type = "B"
        session_id = "session_001"  # Same session
      elif event_counter % 9 == 3:
        event_type = "C"
        session_id = "session_001"  # Complete the pattern
      elif event_counter % 9 in [4, 5]:  # Partial pattern
        event_type = ["A", "B"][event_counter % 9 - 4]
      else:
        event_type = random.choice(["D", "E"])  # Other events

      # Create structured JSON event for better readability in Kowl UI
      event = {
        "event_id": f"evt_{event_counter:04d}",
        "session_id": session_id,
        "event_type": event_type,
        "timestamp": int(time.time() * 1000),
        "data": f"event_data_{event_type}",
        "source": "pattern_generator",
        "sequence_number": event_counter,
        "metadata": {
          "simulation": True,
          "pattern_type": "abc_sequence"
        }
      }

      key = session_id
      value = event

    expression: (key, value)
    resultType: (string, json)

producers:
  pattern_event_producer:
    generator: generate_pattern_events
    interval: 1s
    to:
      topic: pattern_events
      keyType: string
      valueType: json
Pattern Detection Processor - click to expand
# Processor for pattern detection - detects A->B->C sequences

streams:
  pattern_events:
    topic: pattern_events
    keyType: string
    valueType: json

stores:
  pattern_tracker:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

functions:
  detect_abc_pattern:
    type: valueTransformer
    globalCode: |
      import time
    stores:
      - pattern_tracker
    code: |
      # Extract fields from JSON event
      event_type = value.get("event_type")
      event_id = value.get("event_id")
      session_id = value.get("session_id")
      timestamp = value.get("timestamp")
      sequence_number = value.get("sequence_number")
      current_time = int(time.time() * 1000)

      if not event_type or not event_id:
        return None

      # Get current pattern state
      current_state = pattern_tracker.get(key) or ""

      # Check for pattern completion
      if event_type == "A":
        # Start new pattern
        pattern_tracker.put(key, "A")
        log.info("Pattern started for {}: A", key)
        return None
      elif event_type == "B" and current_state == "A":
        # Continue pattern
        pattern_tracker.put(key, "AB")
        log.info("Pattern progressing for {}: A->B", key)
        return None
      elif event_type == "C" and current_state == "AB":
        # Pattern complete!
        pattern_tracker.delete(key)
        log.info("PATTERN DETECTED for {}: A->B->C", key)

        # Create structured JSON result
        return {
          "pattern_type": "ABC_SEQUENCE",
          "status": "DETECTED",
          "session_id": session_id,
          "completing_event": {
            "event_id": event_id,
            "event_type": event_type,
            "timestamp": timestamp,
            "sequence_number": sequence_number
          },
          "detection_timestamp": current_time,
          "processing_time": current_time - timestamp,
          "metadata": {
            "pattern_duration": current_time - timestamp,
            "detector": "abc_pattern_detector"
          }
        }
      else:
        # Reset on invalid sequence
        if current_state:
          log.info("Pattern broken for {}, was: {}, got: {}", key, current_state, event_type)
          pattern_tracker.delete(key)
        return None

    expression: result if result else None
    resultType: json

pipelines:
  pattern_detection_pipeline:
    from: pattern_events
    via:
      - type: mapValues
        mapper: detect_abc_pattern
      - type: filter
        if:
          expression: value is not None
    to:
      topic: detected_patterns
      keyType: string
      valueType: json

Key concepts demonstrated:

  • State store usage for tracking partial patterns
  • Sequential event processing
  • Pattern completion and reset logic

Temporal Pattern Matching

Temporal patterns add time constraints to event sequences. This example detects quick checkout behavior (cart→checkout within 5 minutes).

What it does:

  • Produces shopping events: Creates events like "add_to_cart" and "checkout" with realistic timestamps and shopping details
  • Stores cart events: When "add_to_cart" happens, saves the cart timestamp and details in state store as JSON
  • Measures time gaps: When "checkout" arrives, calculates milliseconds between cart and checkout events
  • Classifies by speed: If checkout happens within 5 minutes (300,000ms) = "QUICK_CHECKOUT", otherwise "SLOW_CHECKOUT"
  • Outputs results: Only emits a message when both cart and checkout events are found, showing the time difference and classification
Temporal Events Producer - click to expand
# Producer for temporal pattern matching - generates time-sensitive events

functions:
  generate_temporal_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      users = ["user_001", "user_002", "user_003"]
      actions = ["login", "view_product", "add_to_cart", "checkout", "logout"]
    code: |
      global event_counter, users, actions

      event_counter += 1
      user_id = random.choice(users)

      # Create temporal sequences
      if event_counter % 5 == 1:
        action = "login"
      elif event_counter % 5 == 2:
        action = "view_product"
      elif event_counter % 5 == 3:
        action = "add_to_cart"
      elif event_counter % 5 == 4:
        action = "checkout"  # Should happen within 5 minutes of add_to_cart
      else:
        action = random.choice(actions)

      current_timestamp = int(time.time() * 1000)

      # Create structured JSON event for better readability in Kowl UI
      event = {
        "user_id": user_id,
        "action": action,
        "timestamp": current_timestamp,
        "event_id": f"evt_{event_counter:04d}",
        "session_id": f"session_{user_id}_{event_counter // 10}",
        "sequence_number": event_counter,
        "source": "temporal_generator",
        "metadata": {
          "simulation": True,
          "pattern_type": "temporal_checkout",
          "time_window_minutes": 5
        }
      }

      # Add action-specific data
      if action == "view_product":
        event["product_id"] = f"prod_{random.randint(100, 999)}"
      elif action == "add_to_cart":
        event["product_id"] = f"prod_{random.randint(100, 999)}"
        event["quantity"] = random.randint(1, 5)
        event["price"] = round(random.uniform(10, 100), 2)
      elif action == "checkout":
        event["total_amount"] = round(random.uniform(50, 500), 2)
        event["payment_method"] = random.choice(["credit_card", "paypal", "bank_transfer"])

    expression: (user_id, event)
    resultType: (string, json)

producers:
  temporal_event_producer:
    generator: generate_temporal_events
    interval: 2s
    to:
      topic: temporal_events
      keyType: string
      valueType: json
Temporal Pattern Processor - click to expand
# Processor for temporal pattern matching - detects actions within time windows

streams:
  temporal_events:
    topic: temporal_events
    keyType: string
    valueType: json

stores:
  temporal_state:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true

functions:
  detect_temporal_pattern:
    type: valueTransformer
    globalCode: |
      import time
      import json
    stores:
      - temporal_state
    code: |
      # Extract fields from JSON event
      action = value.get("action")
      event_time = value.get("timestamp")
      user_id = value.get("user_id")
      event_id = value.get("event_id")
      session_id = value.get("session_id")
      current_time = int(time.time() * 1000)

      if not action or not event_time:
        return None

      # Get stored state
      state = temporal_state.get(key)

      if action == "add_to_cart":
        # Store cart event with detailed JSON state
        cart_state = {
          "event": "add_to_cart",
          "timestamp": event_time,
          "event_id": event_id,
          "session_id": session_id,
          "product_id": value.get("product_id"),
          "quantity": value.get("quantity"),
          "price": value.get("price")
        }
        temporal_state.put(key, json.dumps(cart_state))
        log.info("Cart event for {}: {}", key, event_time)
        return None

      elif action == "checkout" and state:
        # Parse stored cart state
        try:
          cart_data = json.loads(state)
          if cart_data.get("event") == "add_to_cart":
            cart_time = cart_data.get("timestamp")
            time_diff = event_time - cart_time

            # Create structured result
            result = {
              "pattern_type": "TEMPORAL_CHECKOUT",
              "user_id": user_id,
              "session_id": session_id,
              "cart_event": {
                "event_id": cart_data.get("event_id"),
                "timestamp": cart_time,
                "product_id": cart_data.get("product_id"),
                "quantity": cart_data.get("quantity"),
                "price": cart_data.get("price")
              },
              "checkout_event": {
                "event_id": event_id,
                "timestamp": event_time,
                "total_amount": value.get("total_amount"),
                "payment_method": value.get("payment_method")
              },
              "time_difference_ms": time_diff,
              "time_difference_minutes": round(time_diff / 60000, 2),
              "detection_timestamp": current_time,
              "processing_time": current_time - event_time
            }

            if time_diff <= 300000:  # Within 5 minutes
              temporal_state.delete(key)
              result["status"] = "QUICK_CHECKOUT"
              result["is_quick"] = True
              log.info("QUICK CHECKOUT detected for {} in {}ms", key, time_diff)
              return result
            else:
              temporal_state.delete(key)
              result["status"] = "SLOW_CHECKOUT"
              result["is_quick"] = False
              log.info("Slow checkout for {} ({}ms)", key, time_diff)
              return result
        except (json.JSONDecodeError, KeyError) as e:
          log.warn("Error parsing cart state for {}: {}", key, str(e))
          temporal_state.delete(key)

      return None

    expression: result if result else None
    resultType: json

pipelines:
  temporal_pattern_pipeline:
    from: temporal_events
    via:
      - type: mapValues
        mapper: detect_temporal_pattern
      - type: filter
        if:
          expression: value is not None
    to:
      topic: temporal_patterns
      keyType: string
      valueType: json

Key concepts demonstrated:

  • Time-based pattern constraints
  • Timestamp extraction and comparison
  • Temporal window analysis

Event Correlation

Event correlation combines related events from different streams to provide enriched context.

What it does:

  • Produces two event streams: Creates user events (page_view, click, form_submit) and system events (api_call, db_query, error) with the same user IDs
  • Joins streams by user: Uses leftJoin to connect system events with the latest user activity for each user ID
  • Detects specific patterns: Looks for meaningful combinations like "form_submit + error", "page_view + db_query", or "click + api_call"
  • Measures timing: Calculates milliseconds between user action and system response to determine correlation strength (HIGH/MEDIUM)
  • Outputs correlations: Only emits results when it finds the specific patterns, showing both events with timing analysis and relationship details
Correlation Events Producer (generates both user and system events) - click to expand
# Producer for event correlation - generates events from multiple sources

functions:
  generate_user_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      users = ["alice", "bob", "charlie"]
    code: |
      global event_counter, users

      event_counter += 1
      user_id = random.choice(users)

      # Generate user activity with structured JSON
      activity = random.choice(["page_view", "click", "scroll", "form_submit"])
      current_timestamp = int(time.time() * 1000)

      event = {
        "event_type": "user_activity",
        "user_id": user_id,
        "activity": activity,
        "timestamp": current_timestamp,
        "event_id": f"user_evt_{event_counter:04d}",
        "session_id": f"session_{user_id}_{event_counter // 5}",
        "source": "user_interface",
        "metadata": {
          "simulation": True,
          "correlation_type": "user_system"
        }
      }

      # Add activity-specific data
      if activity == "page_view":
        event["page_url"] = f"/page_{random.randint(1, 10)}"
        event["referrer"] = random.choice(["direct", "search", "social"])
      elif activity == "click":
        event["element_id"] = f"btn_{random.randint(1, 5)}"
        event["element_type"] = random.choice(["button", "link", "image"])
      elif activity == "form_submit":
        event["form_id"] = f"form_{random.randint(1, 3)}"
        event["form_data_length"] = random.randint(10, 100)

    expression: (user_id, event)
    resultType: (string, json)

  generate_system_events:
    type: generator
    globalCode: |
      import random
      import time
      event_counter = 0
      users = ["alice", "bob", "charlie"]
    code: |
      global event_counter, users

      event_counter += 1
      user_id = random.choice(users)

      # Generate system events with structured JSON
      event_type = random.choice(["api_call", "db_query", "cache_hit", "error"])
      current_timestamp = int(time.time() * 1000)

      event = {
        "event_type": "system_event",
        "system_event": event_type,
        "user_id": user_id,
        "timestamp": current_timestamp,
        "event_id": f"sys_evt_{event_counter:04d}",
        "source": "backend_system",
        "server_id": f"server_{random.randint(1, 3)}",
        "metadata": {
          "simulation": True,
          "correlation_type": "user_system"
        }
      }

      # Add event-specific data
      if event_type == "api_call":
        event["endpoint"] = f"/api/v1/resource_{random.randint(1, 5)}"
        event["response_time_ms"] = random.randint(50, 500)
        event["status_code"] = random.choice([200, 201, 400, 500])
      elif event_type == "db_query":
        event["table_name"] = random.choice(["users", "orders", "products"])
        event["query_time_ms"] = random.randint(10, 200)
        event["rows_affected"] = random.randint(1, 100)
      elif event_type == "error":
        event["error_code"] = random.choice(["E001", "E002", "E003"])
        event["error_message"] = f"Error in operation {random.randint(1, 10)}"
        event["severity"] = random.choice(["warning", "error", "critical"])
      elif event_type == "cache_hit":
        event["cache_key"] = f"cache_{random.randint(1, 20)}"
        event["cache_type"] = random.choice(["redis", "memcached"])

    expression: (user_id, event)
    resultType: (string, json)

producers:
  user_event_producer:
    generator: generate_user_events
    interval: 2s
    to:
      topic: user_events
      keyType: string
      valueType: json

  system_event_producer:
    generator: generate_system_events
    interval: 3s
    to:
      topic: system_events
      keyType: string
      valueType: json
Event Correlation Processor - click to expand
# Processor for event correlation - correlates user and system events

streams:
  user_events:
    topic: user_events
    keyType: string
    valueType: json

  system_events:
    topic: system_events
    keyType: string
    valueType: json

tables:
  user_context:
    topic: user_events
    keyType: string
    valueType: json

functions:
  join_events:
    type: valueJoiner
    globalCode: |
      import time
    code: |
      # Join system event with user context - create enriched JSON
      current_time = int(time.time() * 1000)

      # Extract system event details
      system_event = value1.get("system_event") if value1 else "unknown"
      system_timestamp = value1.get("timestamp") if value1 else current_time
      system_event_id = value1.get("event_id") if value1 else "unknown"

      # Create enriched event with correlation data
      enriched_event = {
        "correlation_id": f"corr_{system_event_id}",
        "system_event": value1 if value1 else {},
        "user_context": value2 if value2 else {},
        "correlation_timestamp": current_time,
        "has_user_context": bool(value2),
        "time_since_user_activity": current_time - (value2.get("timestamp", current_time) if value2 else current_time)
      }

    expression: enriched_event
    resultType: json

  correlate_events:
    type: valueTransformer
    globalCode: |
      import time
    code: |
      # Extract correlation data from enriched event
      if not value:
        return None

      system_event = value.get("system_event", {})
      user_context = value.get("user_context", {})
      correlation_id = value.get("correlation_id", "unknown")
      has_user_context = value.get("has_user_context", False)
      time_since_user_activity = value.get("time_since_user_activity", 0)
      current_time = int(time.time() * 1000)

      # Extract system and user event details
      system_event_type = system_event.get("system_event", "unknown")
      user_activity = user_context.get("activity", "none") if has_user_context else "none"

      # Define correlation patterns and create structured results
      correlation_result = None

      if system_event_type == "error" and user_activity == "form_submit":
        log.warn("ERROR after form submit for user {}", key)
        correlation_result = {
          "pattern_type": "FORM_ERROR_CORRELATION",
          "status": "CRITICAL",
          "description": "System error occurred after user form submission",
          "correlation_id": correlation_id,
          "user_id": key,
          "system_event": {
            "type": system_event_type,
            "event_id": system_event.get("event_id"),
            "timestamp": system_event.get("timestamp"),
            "error_details": {
              "error_code": system_event.get("error_code"),
              "error_message": system_event.get("error_message"),
              "severity": system_event.get("severity")
            }
          },
          "user_activity": {
            "type": user_activity,
            "event_id": user_context.get("event_id"),
            "timestamp": user_context.get("timestamp"),
            "form_details": {
              "form_id": user_context.get("form_id"),
              "form_data_length": user_context.get("form_data_length")
            }
          },
          "time_correlation": {
            "time_since_user_activity_ms": time_since_user_activity,
            "correlation_strength": "HIGH" if time_since_user_activity < 5000 else "MEDIUM"
          }
        }

      elif system_event_type == "db_query" and user_activity == "page_view":
        log.info("DB query triggered by page view for {}", key)
        correlation_result = {
          "pattern_type": "PAGE_LOAD_CORRELATION",
          "status": "NORMAL",
          "description": "Database query triggered by user page view",
          "correlation_id": correlation_id,
          "user_id": key,
          "system_event": {
            "type": system_event_type,
            "event_id": system_event.get("event_id"),
            "timestamp": system_event.get("timestamp"),
            "db_details": {
              "table_name": system_event.get("table_name"),
              "query_time_ms": system_event.get("query_time_ms"),
              "rows_affected": system_event.get("rows_affected")
            }
          },
          "user_activity": {
            "type": user_activity,
            "event_id": user_context.get("event_id"),
            "timestamp": user_context.get("timestamp"),
            "page_details": {
              "page_url": user_context.get("page_url"),
              "referrer": user_context.get("referrer")
            }
          },
          "time_correlation": {
            "time_since_user_activity_ms": time_since_user_activity,
            "correlation_strength": "HIGH" if time_since_user_activity < 2000 else "MEDIUM"
          }
        }

      elif system_event_type == "api_call" and user_activity == "click":
        log.info("API call from user interaction for {}", key)
        correlation_result = {
          "pattern_type": "USER_API_CORRELATION",
          "status": "NORMAL",
          "description": "API call triggered by user click interaction",
          "correlation_id": correlation_id,
          "user_id": key,
          "system_event": {
            "type": system_event_type,
            "event_id": system_event.get("event_id"),
            "timestamp": system_event.get("timestamp"),
            "api_details": {
              "endpoint": system_event.get("endpoint"),
              "response_time_ms": system_event.get("response_time_ms"),
              "status_code": system_event.get("status_code")
            }
          },
          "user_activity": {
            "type": user_activity,
            "event_id": user_context.get("event_id"),
            "timestamp": user_context.get("timestamp"),
            "click_details": {
              "element_id": user_context.get("element_id"),
              "element_type": user_context.get("element_type")
            }
          },
          "time_correlation": {
            "time_since_user_activity_ms": time_since_user_activity,
            "correlation_strength": "HIGH" if time_since_user_activity < 1000 else "MEDIUM"
          }
        }

      if correlation_result:
        correlation_result["detection_timestamp"] = current_time
        correlation_result["processing_time"] = current_time - system_event.get("timestamp", current_time)
        correlation_result["metadata"] = {
          "detector": "event_correlation_processor",
          "version": "1.0"
        }

      return correlation_result

    expression: result if result else None
    resultType: json

pipelines:
  correlation_pipeline:
    from: system_events
    via:
      - type: leftJoin
        table: user_context
        valueJoiner: join_events
      - type: mapValues
        mapper: correlate_events
      - type: filter
        if:
          expression: value is not None
    to:
      topic: correlated_events
      keyType: string
      valueType: json

Key concepts demonstrated:

  • Stream-table joins for context enrichment
  • Cross-stream event correlation
  • Cause-effect relationship detection

Anomaly Detection

Anomaly detection identifies unusual patterns using statistical analysis.

What it does:

  • Produces sensor readings: Creates temperature values that are normally 40-60°C, but every 20th reading is a spike (90-100°C) and every 25th is a drop (0-10°C)
  • Tracks statistics per sensor: Stores running count, sum, sum-of-squares, min, max in state store to calculate mean and standard deviation
  • Calculates z-scores: After 10+ readings, computes how many standard deviations each new reading is from the mean
  • Detects outliers: When z-score > 3.0, flags as anomaly (spike if above mean, drop if below mean)
  • Outputs anomalies: Only emits detection messages when statistical threshold is exceeded, showing z-score, mean, and severity level
Metrics Producer (with occasional anomalies) - click to expand
# Producer for anomaly detection - generates metrics with occasional anomalies

functions:
  generate_metrics:
    type: generator
    globalCode: |
      import random
      import time
      import math
      metric_counter = 0
      sensors = ["sensor_001", "sensor_002", "sensor_003"]
    code: |
      global metric_counter, sensors

      metric_counter += 1
      sensor_id = random.choice(sensors)

      # Generate normal values with occasional anomalies
      if metric_counter % 20 == 0:
        # Anomaly - spike
        value = random.uniform(90, 100)
      elif metric_counter % 25 == 0:
        # Anomaly - drop
        value = random.uniform(0, 10)
      else:
        # Normal range with some variation
        base = 50 + 10 * math.sin(metric_counter / 10)
        value = base + random.uniform(-5, 5)

      current_timestamp = int(time.time() * 1000)

      # Create structured JSON metric for better readability in Kowl UI
      metric = {
        "sensor_id": sensor_id,
        "value": round(value, 2),
        "timestamp": current_timestamp,
        "metric_id": f"metric_{metric_counter:04d}",
        "sensor_type": "temperature",
        "unit": "celsius",
        "location": f"zone_{random.randint(1, 5)}",
        "source": "sensor_network",
        "metadata": {
          "simulation": True,
          "anomaly_detection": True,
          "normal_range": [40, 60]
        }
      }

      # Add anomaly indicators for testing
      if metric_counter % 20 == 0:
        metric["anomaly_type"] = "spike"
        metric["expected_anomaly"] = True
      elif metric_counter % 25 == 0:
        metric["anomaly_type"] = "drop" 
        metric["expected_anomaly"] = True
      else:
        metric["expected_anomaly"] = False

    expression: (sensor_id, metric)
    resultType: (string, json)

producers:
  metrics_producer:
    generator: generate_metrics
    interval: 1s
    to:
      topic: sensor_metrics
      keyType: string
      valueType: json
Anomaly Detection Processor - click to expand
# Processor for anomaly detection using statistical analysis

streams:
  sensor_metrics:
    topic: sensor_metrics
    keyType: string
    valueType: json

stores:
  stats_store:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true

functions:
  detect_anomalies:
    type: valueTransformer
    stores:
      - stats_store
    code: |
      import json

      # Extract fields from JSON metric
      current_value = value.get("value")
      timestamp = value.get("timestamp")
      sensor_id = value.get("sensor_id")
      metric_id = value.get("metric_id")

      if current_value is None or timestamp is None:
        return None

      # Get or initialize statistics
      stats_json = stats_store.get(key)
      if stats_json:
        stats = json.loads(stats_json)
      else:
        stats = {"count": 0, "sum": 0, "sum_sq": 0, "min": current_value, "max": current_value}

      # Calculate running statistics
      n = stats["count"]
      if n > 10:  # Need enough samples
        mean = stats["sum"] / n
        variance = (stats["sum_sq"] / n) - (mean * mean)
        std_dev = variance ** 0.5 if variance > 0 else 1

        # Detect anomalies (values outside 3 standard deviations)
        z_score = abs(current_value - mean) / std_dev if std_dev > 0 else 0

        if z_score > 3:
          log.warn("ANOMALY detected for {}: value={}, mean={:.2f}, z_score={:.2f}", 
                   key, current_value, mean, z_score)
          anomaly_type = "spike" if current_value > mean else "drop"
          result = {
            "anomaly_type": "STATISTICAL_ANOMALY",
            "status": "DETECTED",
            "pattern": anomaly_type,
            "sensor_id": sensor_id,
            "metric_id": metric_id,
            "anomaly_value": current_value,
            "statistical_analysis": {
              "mean": round(mean, 2),
              "std_dev": round(std_dev, 2),
              "z_score": round(z_score, 2),
              "threshold": 3.0,
              "sample_count": n + 1
            },
            "detection_timestamp": timestamp,
            "sensor_metadata": {
              "sensor_type": value.get("sensor_type"),
              "location": value.get("location"),
              "unit": value.get("unit")
            },
            "severity": "HIGH" if z_score > 5 else "MEDIUM"
          }
        else:
          result = None
      else:
        result = None

      # Update statistics
      stats["count"] = n + 1
      stats["sum"] += current_value
      stats["sum_sq"] += current_value * current_value
      stats["min"] = min(stats["min"], current_value)
      stats["max"] = max(stats["max"], current_value)

      # Store updated stats
      stats_store.put(key, json.dumps(stats))

      return result

    expression: result if result else None
    resultType: json

pipelines:
  anomaly_detection_pipeline:
    from: sensor_metrics
    via:
      - type: mapValues
        mapper: detect_anomalies
      - type: filter
        if:
          expression: value is not None
    to:
      topic: anomalies_detected
      keyType: string
      valueType: json

Key concepts demonstrated:

  • Statistical anomaly detection (z-score)
  • Running statistics calculation
  • Threshold-based alerting

Fraud Detection System

A practical example combining multiple CEP techniques for real-time fraud detection.

What it does:

  • Produces transactions: Creates credit card purchases with amounts, merchants, locations - deliberately generates suspicious patterns (high amounts every 15th, rapid transactions every 20th)
  • Stores transaction history: Keeps last location, timestamp, and totals for each card number in state store
  • Scores fraud risk: Adds points for patterns: +40 for amounts >$5000, +30 for transactions <60s apart, +20 for location changes <1hr, +20 for suspicious merchants
  • Classifies threats: If score ≥70 = "FRAUD_ALERT", if 30-69 = "SUSPICIOUS_TRANSACTION", otherwise no output
  • Outputs alerts: Only emits results when fraud score thresholds are met, showing detected patterns, risk factors, and recommended actions
Credit Card Transactions Producer - click to expand
# Producer for fraud detection - generates credit card transactions

functions:
  generate_transactions:
    type: generator
    globalCode: |
      import random
      import time
      txn_counter = 0
      cards = ["4111-1111-1111-1111", "4222-2222-2222-2222", "4333-3333-3333-3333"]
      merchants = ["grocery_store", "gas_station", "online_shop", "restaurant", "atm_withdrawal"]
      locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]
    code: |
      global txn_counter, cards, merchants, locations

      txn_counter += 1
      card_number = random.choice(cards)
      current_timestamp = int(time.time() * 1000)

      # Generate transaction with occasional suspicious patterns
      if txn_counter % 15 == 0:
        # Suspicious: high amount
        amount = random.uniform(5000, 10000)
        merchant = "luxury_goods"
        is_suspicious = True
        risk_category = "high_amount"
      elif txn_counter % 20 == 0:
        # Suspicious: rapid transactions
        amount = random.uniform(100, 500)
        merchant = "atm_withdrawal"
        is_suspicious = True
        risk_category = "rapid_transaction"
      else:
        # Normal transaction
        amount = random.uniform(10, 500)
        merchant = random.choice(merchants)
        is_suspicious = False
        risk_category = "normal"

      location = random.choice(locations)

      # Create structured JSON transaction for better readability in Kowl UI
      transaction = {
        "transaction_id": f"txn_{txn_counter:06d}",
        "card_number": card_number,
        "amount": round(amount, 2),
        "merchant": merchant,
        "location": location,
        "timestamp": current_timestamp,
        "merchant_category": {
          "grocery_store": "retail",
          "gas_station": "fuel",
          "online_shop": "ecommerce",
          "restaurant": "food_service",
          "atm_withdrawal": "cash",
          "luxury_goods": "luxury"
        }.get(merchant, "other"),
        "transaction_type": "purchase" if merchant != "atm_withdrawal" else "withdrawal",
        "currency": "USD",
        "channel": "pos" if merchant in ["grocery_store", "gas_station", "restaurant"] else "online",
        "metadata": {
          "simulation": True,
          "fraud_detection": True,
          "is_suspicious": is_suspicious,
          "risk_category": risk_category,
          "sequence_number": txn_counter
        }
      }

    expression: (card_number, transaction)
    resultType: (string, json)

producers:
  transaction_producer:
    generator: generate_transactions
    interval: 1s
    to:
      topic: credit_card_transactions
      keyType: string
      valueType: json
Fraud Detection Processor - click to expand
# Processor for fraud detection - analyzes transaction patterns

streams:
  credit_card_transactions:
    topic: credit_card_transactions
    keyType: string
    valueType: json

stores:
  transaction_history:
    type: keyValue
    keyType: string
    valueType: string
    persistent: true
    caching: true

functions:
  detect_fraud:
    type: valueTransformer
    stores:
      - transaction_history
    code: |
      import json
      import time

      # Extract fields from JSON transaction
      if not value:
        return None

      amount = value.get("amount")
      merchant = value.get("merchant")
      location = value.get("location")
      timestamp = value.get("timestamp")
      transaction_id = value.get("transaction_id")
      merchant_category = value.get("merchant_category")
      transaction_type = value.get("transaction_type")
      channel = value.get("channel")

      if amount is None or merchant is None or location is None or timestamp is None:
        return None

      # Get transaction history
      history_json = transaction_history.get(key)
      if history_json:
        history = json.loads(history_json)
      else:
        history = {"last_location": location, "last_time": timestamp, "txn_count": 0, "total_amount": 0}

      # Check for fraud patterns
      fraud_score = 0
      fraud_reasons = []
      fraud_patterns = []

      # Pattern 1: High amount transaction
      if amount > 5000:
        fraud_score += 40
        fraud_reasons.append(f"high_amount:{amount:.2f}")
        fraud_patterns.append({
          "type": "HIGH_AMOUNT",
          "details": {"amount": amount, "threshold": 5000},
          "risk_weight": 40
        })

      # Pattern 2: Rapid transactions (within 60 seconds)
      time_diff = timestamp - history["last_time"]
      if time_diff < 60000 and history["txn_count"] > 0:
        fraud_score += 30
        fraud_reasons.append(f"rapid_txn:{time_diff}ms")
        fraud_patterns.append({
          "type": "RAPID_TRANSACTION",
          "details": {"time_difference_ms": time_diff, "threshold_ms": 60000},
          "risk_weight": 30
        })

      # Pattern 3: Location change
      if history["last_location"] != location and time_diff < 3600000:  # Within 1 hour
        fraud_score += 20
        fraud_reasons.append(f"location_change:{history['last_location']}->{location}")
        fraud_patterns.append({
          "type": "LOCATION_CHANGE",
          "details": {
            "previous_location": history["last_location"],
            "current_location": location,
            "time_difference_ms": time_diff,
            "threshold_ms": 3600000
          },
          "risk_weight": 20
        })

      # Pattern 4: Suspicious merchant
      if merchant in ["luxury_goods", "online_gambling", "crypto_exchange"]:
        fraud_score += 20
        fraud_reasons.append(f"suspicious_merchant:{merchant}")
        fraud_patterns.append({
          "type": "SUSPICIOUS_MERCHANT",
          "details": {"merchant": merchant, "merchant_category": merchant_category},
          "risk_weight": 20
        })

      # Update history
      history["last_location"] = location
      history["last_time"] = timestamp
      history["txn_count"] += 1
      history["total_amount"] += amount
      transaction_history.put(key, json.dumps(history))

      # Generate structured alert if fraud score is high
      result = None
      if fraud_score >= 50:
        log.warn("FRAUD ALERT for card {}: score={}, reasons={}", key, fraud_score, fraud_reasons)
        result = {
          "alert_type": "FRAUD_ALERT",
          "status": "HIGH_RISK",
          "card_number": key,
          "fraud_score": fraud_score,
          "severity": "CRITICAL",
          "transaction": {
            "transaction_id": transaction_id,
            "amount": amount,
            "merchant": merchant,
            "merchant_category": merchant_category,
            "location": location,
            "timestamp": timestamp,
            "transaction_type": transaction_type,
            "channel": channel
          },
          "fraud_patterns": fraud_patterns,
          "risk_analysis": {
            "total_score": fraud_score,
            "threshold_exceeded": "HIGH_RISK",
            "patterns_detected": len(fraud_patterns),
            "recommendation": "BLOCK_TRANSACTION"
          },
          "detection_timestamp": int(time.time() * 1000),
          "cardholder_history": {
            "previous_location": history.get("last_location"),
            "transaction_count": history.get("txn_count", 0) + 1,
            "total_spending": history.get("total_amount", 0) + amount
          }
        }
      elif fraud_score >= 30:
        log.info("Suspicious transaction for {}: score={}", key, fraud_score)
        result = {
          "alert_type": "SUSPICIOUS_TRANSACTION",
          "status": "MEDIUM_RISK",
          "card_number": key,
          "fraud_score": fraud_score,
          "severity": "WARNING",
          "transaction": {
            "transaction_id": transaction_id,
            "amount": amount,
            "merchant": merchant,
            "merchant_category": merchant_category,
            "location": location,
            "timestamp": timestamp,
            "transaction_type": transaction_type,
            "channel": channel
          },
          "fraud_patterns": fraud_patterns,
          "risk_analysis": {
            "total_score": fraud_score,
            "threshold_exceeded": "MEDIUM_RISK",
            "patterns_detected": len(fraud_patterns),
            "recommendation": "REVIEW_REQUIRED"
          },
          "detection_timestamp": int(time.time() * 1000),
          "cardholder_history": {
            "previous_location": history.get("last_location"),
            "transaction_count": history.get("txn_count", 0) + 1,
            "total_spending": history.get("total_amount", 0) + amount
          }
        }

      return result

    expression: result if result else None
    resultType: json

pipelines:
  fraud_detection_pipeline:
    from: credit_card_transactions
    via:
      - type: mapValues
        mapper: detect_fraud
      - type: filter
        if:
          expression: value is not None
    to:
      topic: fraud_alerts
      keyType: string
      valueType: json

Key concepts demonstrated:

  • Multi-factor pattern analysis
  • Risk scoring algorithms
  • Transaction velocity checks
  • Geographic anomaly detection

Conclusion

Complex Event Processing in KSML provides capabilities for real-time pattern detection and analysis. By combining state management, temporal operations, and correlation techniques, you can build sophisticated event processing applications that derive actionable insights from streaming data.