Skip to content

Implementing Joins in KSML

This tutorial explores how to implement join operations in KSML, allowing you to combine data from multiple streams or tables to create enriched datasets.

Introduction

Joins are fundamental operations in stream processing that combine data from multiple sources based on common keys. KSML provides three main types of joins, each serving different use cases in real-time data processing.

KSML joins are built on top of Kafka Streams join operations, providing a YAML-based interface to powerful stream processing capabilities without requiring Java code.

Prerequisites

Before starting this tutorial:

Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_clicks && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_purchases && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic correlated_user_actions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_conversions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_clicks_by_product && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_purchases_by_product && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic new_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic orders_with_customer_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_catalog && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic orders_with_product_details && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_activity_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_location_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic activity_with_location && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_login_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_logout_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_session_analysis && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_profiles && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_preferences && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_customer_data && \

Core Join Concepts

ValueJoiner Functions

All join operations require a valueJoiner function to combine data from both sides:

valueJoiner:
  type: valueJoiner
  code: |
    # value1: left stream/table value
    # value2: right stream/table value
    result = {"combined": value1, "enriched": value2}
  expression: result
  resultType: json

Requirements:

  • Must include expression and resultType fields
  • Handle null values gracefully (especially for left/outer joins)
  • Return appropriate data structure for downstream processing

Co-partitioning Requirements

Stream-stream and stream-table joins require co-partitioning:

  • Same number of partitions in both topics
  • Same partitioning strategy (how keys map to partitions)
  • Same key type and serialization format

GlobalTable joins don't require co-partitioning since data is replicated to all instances.

Types of Joins in KSML

KSML supports three main categories of joins, each with specific characteristics and use cases:

1. Stream-Stream Joins

Join two event streams within a time window to correlate related events.

Kafka Streams equivalent: KStream.join(), KStream.leftJoin(), KStream.outerJoin()

When to use:

  • Correlating events from different systems
  • Tracking user behavior across multiple actions
  • Detecting patterns that span multiple event types

Key characteristics:

  • Requires time windows for correlation
  • Both streams must be co-partitioned
  • Results are emitted when matching events occur within the window
  • Supports inner, left, and outer join semantics

2. Stream-Table Joins

Enrich a stream of events with the latest state from a changelog table.

Kafka Streams equivalent: KStream.join(), KStream.leftJoin() with KTable

When to use:

  • Enriching events with reference data
  • Adding current state information to events
  • Looking up the latest value for a key

Key characteristics:

  • Stream events are enriched with the latest table value
  • Table provides point-in-time lookups
  • Requires co-partitioning between stream and table
  • Supports inner and left join semantics

3. Stream-GlobalTable Joins

Enrich events using replicated reference data available on all instances.

Kafka Streams equivalent: KStream.join(), KStream.leftJoin() with GlobalKTable

When to use:

  • Joining with reference data (product catalogs, configuration)
  • Foreign key joins where keys don't match directly
  • Avoiding co-partitioning requirements

Key characteristics:

  • GlobalTable is replicated to all application instances
  • Supports foreign key extraction via mapper functions
  • No co-partitioning required
  • Supports inner and left join semantics

Stream-Stream Join

Stream-stream joins correlate events from two streams within a specified time window. This is essential for detecting patterns and relationships between different event types.

Use Case: User Behavior Analysis

Track user shopping behavior by correlating clicks and purchases within a 30-minute window to understand the customer journey.

What it does:

  • Produces two streams: Creates product_clicks (user clicks on products) and product_purchases (user buys products) with user_id keys and timestamps
  • Joins with time window: Uses join operation with 30-minute timeDifference to correlate clicks with purchases that happen within 30 minutes
  • Stores in window stores: Both streams use 60-minute window stores (2×30min) with retainDuplicates=true to buffer events for correlation
  • Combines with valueJoiner: When matching user_id found in both streams within time window, combines click and purchase data into single result
  • Outputs correlations: Returns JSON showing both events with conversion analysis (did click lead to purchase, what products involved)
Producer: Clicks and Purchases (click to expand)
functions:
  generate_click:
    type: generator
    globalCode: |
      import random
      import time
      click_counter = 0
      users = ["user1", "user2", "user3", "user4", "user5"]
      products = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
    code: |
      global click_counter, users, products

      click_counter += 1
      user_id = random.choice(users)

      value = {
        "click_id": f"CLK{click_counter:04d}",
        "user_id": user_id,
        "product_id": random.choice(products),
        "page": "product_detail",
        "timestamp": int(time.time() * 1000)
      }

      key = user_id
    expression: (key, value)
    resultType: (string, json)

  generate_purchase:
    type: generator
    globalCode: |
      import random
      import time
      purchase_counter = 0
      users = ["user1", "user2", "user3", "user4", "user5"]
      products = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
    code: |
      global purchase_counter, users, products

      # Only generate purchases 30% of the time to simulate conversion
      if random.random() > 0.3:
        # Return a proper tuple with None values to skip generation
        key = None
        value = None
      else:
        purchase_counter += 1
        user_id = random.choice(users)

        value = {
          "purchase_id": f"PUR{purchase_counter:04d}",
          "user_id": user_id,
          "product_id": random.choice(products),
          "amount": round(random.uniform(20.0, 300.0), 2),
          "timestamp": int(time.time() * 1000)
        }

        key = user_id
    expression: (key, value)
    resultType: (string, json)

producers:
  click_producer:
    generator: generate_click
    interval: 1s
    to:
      topic: product_clicks
      keyType: string
      valueType: json

  purchase_producer:
    generator: generate_purchase
    interval: 3s
    to:
      topic: product_purchases
      keyType: string
      valueType: json
Processor: Stream-Stream Join (click to expand)
streams:
  product_clicks:
    topic: product_clicks
    keyType: string
    valueType: json

  product_purchases:
    topic: product_purchases
    keyType: string
    valueType: json

  correlated_user_actions:
    topic: correlated_user_actions
    keyType: string
    valueType: json

functions:
  correlate_click_and_purchase:
    type: valueJoiner
    code: |
      result = {}

      # Add click data
      if value1 is not None:
        result["click"] = value1

      # Add purchase data
      if value2 is not None:
        result["purchase"] = value2

      # Calculate conversion if both exist
      if value1 is not None and value2 is not None:
        result["converted"] = True
        click_time = value1.get("timestamp", 0)
        purchase_time = value2.get("timestamp", 0)
        result["conversion_time_ms"] = purchase_time - click_time
      else:
        result["converted"] = False

      new_value = result
    expression: new_value
    resultType: json

pipelines:
  match_clicks_with_purchases:
    from: product_clicks
    via:
      - type: join
        stream: product_purchases
        valueJoiner: correlate_click_and_purchase
        timeDifference: 30m  # Look for purchases within 30 minutes of a click
        grace: 5m  # Grace period for late events
        thisStore:
          name: clicks_join_store
          type: window
          windowSize: 60m  # Must be 2*timeDifference
          retention: 65m   # Must be 2*timeDifference + grace = 60m + 5m
          retainDuplicates: true
        otherStore:
          name: purchases_join_store
          type: window
          windowSize: 60m  # Must be 2*timeDifference
          retention: 65m   # Must be 2*timeDifference + grace = 60m + 5m
          retainDuplicates: true
      - type: peek
        forEach:
          code: log.info("CORRELATION - user={}, converted={}, click_product={}, purchase_product={}", key, value.get("converted"), value.get("click", {}).get("product_id"), value.get("purchase", {}).get("product_id"))
    to: correlated_user_actions

Key Configuration Points

The aim here is to show how time windows must be used to correlate events from different streams. The configuration demonstrates:

  • timeDifference: 30m - Maximum time gap between correlated events
  • Window Stores: Both streams need stores with retainDuplicates: true
  • Window Size: Must be 2 × timeDifference (60m) to buffer events from both streams
  • Retention: Must be 2 × timeDifference + grace (65m) for proper state cleanup
  • Grace Period: 5m allowance for late-arriving events

This configuration ensures events are only correlated within a reasonable time frame while managing memory efficiently.

Stream-Table Join

Stream-table joins enrich streaming events with the latest state from a changelog table. This pattern is common for adding reference data to events.

Use Case: Order Enrichment

Enrich order events with customer information by joining the orders stream with a customers table.

What it does:

  • Produces orders and customers: Creates order stream (keyed by order_id) and customer table (keyed by customer_id) with matching customer references
  • Rekeys for join: Transforms order key from order_id to customer_id using extract_customer_id function to enable join with customers table
  • Joins stream with table: Uses join operation to combine each order with latest customer data from customers table based on customer_id
  • Combines data: valueJoiner function merges order details with customer information into enriched result containing both datasets
  • Restores original key: Transforms key back from customer_id to order_id using restore_order_key for downstream processing consistency
Producer: Orders (click to expand)
functions:
  generate_order:
    type: generator
    globalCode: |
      import random
      import time
      order_counter = 0
      customers = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
      products = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
    code: |
      global order_counter, customers, products

      order_counter += 1
      order_id = f"ORD{order_counter:04d}"
      customer_id = random.choice(customers)

      # Generate order
      value = {
        "order_id": order_id,
        "customer_id": customer_id,
        "product_id": random.choice(products),
        "quantity": random.randint(1, 5),
        "amount": round(random.uniform(10.0, 200.0), 2),
        "timestamp": int(time.time() * 1000)
      }

      # Use order_id as key (natural key for orders)
      key = order_id
    expression: (key, value)
    resultType: (string, json)

producers:
  order_producer:
    generator: generate_order
    interval: 2s
    to:
      topic: new_orders
      keyType: string
      valueType: json
Producer: Customers (click to expand)
functions:
  generate_customer:
    type: generator
    globalCode: |
      import random
      customers_data = [
        {"id": "CUST001", "name": "Alice Johnson", "email": "alice@email.com", "region": "US-WEST"},
        {"id": "CUST002", "name": "Bob Smith", "email": "bob@email.com", "region": "US-EAST"},
        {"id": "CUST003", "name": "Carol Davis", "email": "carol@email.com", "region": "EU-WEST"},
        {"id": "CUST004", "name": "David Wilson", "email": "david@email.com", "region": "ASIA-PACIFIC"},
        {"id": "CUST005", "name": "Eva Brown", "email": "eva@email.com", "region": "EU-CENTRAL"}
      ]
      customer_index = 0
    code: |
      global customers_data, customer_index

      # Cycle through customers
      customer = customers_data[customer_index % len(customers_data)]
      customer_index += 1

      value = {
        "name": customer["name"],
        "email": customer["email"],
        "region": customer["region"],
        "status": "active"
      }

      key = customer["id"]
    expression: (key, value)
    resultType: (string, json)

producers:
  customer_producer:
    generator: generate_customer
    interval: 10s
    to:
      topic: customer_data
      keyType: string
      valueType: json
Processor: Stream-Table Join (click to expand)
streams:
  orders:
    topic: new_orders
    keyType: string  # Order ID
    valueType: json  # Order details

  enriched_orders:
    topic: orders_with_customer_data
    keyType: string  # Order ID
    valueType: json  # Combined order and customer data

tables:
  customers:
    topic: customer_data
    keyType: string  # Customer ID
    valueType: json  # Customer details

functions:
  extract_customer_id:
    type: keyTransformer
    code: |
      # Extract customer_id from order to use as key for join
      new_key = value.get("customer_id") if value else None
    expression: new_key
    resultType: string

  join_order_with_customer:
    type: valueJoiner
    code: |
      # Combine order and customer information
      result = {}

      # Add order details
      if value1 is not None:
        result.update(value1)

      # Add customer details
      if value2 is not None:
        result["customer"] = value2

      new_value = result
    expression: new_value
    resultType: json

  restore_order_key:
    type: keyTransformer
    code: |
      # Restore order_id as key after join
      new_key = value.get("order_id") if value else None
    expression: new_key
    resultType: string

pipelines:
  enrich_orders:
    from: orders
    via:
      # Rekey to customer_id for join
      - type: transformKey
        mapper: extract_customer_id
      # Join with customers table
      - type: join
        table: customers
        valueJoiner: join_order_with_customer
      # Rekey back to order_id
      - type: transformKey
        mapper: restore_order_key
      - type: peek
        forEach:
          code: log.info("ENRICHED ORDER - key={}, order_id={}, customer={}", key, value.get("order_id"), value.get("customer", {}).get("name"))
    to: enriched_orders

Rekeying Pattern

The rekeying pattern is essential when join keys don't match naturally:

  • Use transformKey to extract the join key from the stream
  • Perform the join operation
  • Optionally restore the original key for downstream consistency

Stream-GlobalTable Join

GlobalTable joins enable enrichment with reference data that's replicated across all instances, supporting foreign key relationships.

Use Case: Product Catalog Enrichment

Enrich orders with product details using a foreign key join with a global product catalog.

What it does:

  • Produces orders and products: Creates order stream with product_id references and product_catalog globalTable with product details
  • Uses foreign key mapper: Extracts product_id from order value using keyValueMapper to look up in product_catalog globalTable
  • Joins with globalTable: No co-partitioning needed - globalTable replicated to all instances, joins orders with product details by product_id
  • Enriches with product data: Combines order information with product details (name, price, category) from catalog
  • Outputs enriched orders: Returns orders enhanced with full product information for downstream processing
Producer: Orders and Products (click to expand)
functions:
  generate_order:
    type: generator
    globalCode: |
      import random
      import time
      order_counter = 0
      customers = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
      products = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
    code: |
      global order_counter, customers, products

      order_counter += 1
      order_id = f"ORD{order_counter:04d}"
      customer_id = random.choice(customers)
      product_id = random.choice(products)

      # Generate order with product_id for foreign key join
      value = {
        "order_id": order_id,
        "customer_id": customer_id,
        "product_id": product_id,
        "quantity": random.randint(1, 5),
        "timestamp": int(time.time() * 1000)
      }

      # Use order_id as key (natural key for orders)
      key = order_id
    expression: (key, value)
    resultType: (string, json)

  generate_product:
    type: generator
    globalCode: |
      import random
      products_data = [
        {"id": "PROD001", "name": "Laptop", "category": "Electronics", "price": 999.99},
        {"id": "PROD002", "name": "Headphones", "category": "Electronics", "price": 149.99},
        {"id": "PROD003", "name": "Coffee Maker", "category": "Appliances", "price": 79.99},
        {"id": "PROD004", "name": "Backpack", "category": "Accessories", "price": 49.99},
        {"id": "PROD005", "name": "Desk Lamp", "category": "Furniture", "price": 39.99}
      ]
      product_index = 0
    code: |
      global products_data, product_index

      # Cycle through products to populate the global table
      product = products_data[product_index % len(products_data)]
      product_index += 1

      value = {
        "name": product["name"],
        "category": product["category"],
        "price": product["price"],
        "in_stock": True
      }

      # Product ID as key for the global table
      key = product["id"]
    expression: (key, value)
    resultType: (string, json)

producers:
  order_producer:
    generator: generate_order
    interval: 2s
    to:
      topic: new_orders
      keyType: string
      valueType: json

  product_producer:
    generator: generate_product
    interval: 10s
    to:
      topic: product_catalog
      keyType: string
      valueType: json
Processor: Foreign Key Join (click to expand)
streams:
  orders:
    topic: new_orders
    keyType: string  # Order ID
    valueType: json  # Order details including product_id

  orders_with_product_details:
    topic: orders_with_product_details
    keyType: string  # Order ID
    valueType: json  # Order enriched with product information

globalTables:
  products:
    topic: product_catalog
    keyType: string  # Product ID
    valueType: json  # Product details

functions:
  extract_product_id:
    type: keyValueMapper
    code: |
      # Map from order (key, value) to product_id for join
      product_id = value.get("product_id") if value else None
    expression: product_id
    resultType: string

  join_order_with_product:
    type: valueJoiner
    code: |
      # Combine order and product information
      result = {}

      # Add order details
      if value1 is not None:
        result.update(value1)

      # Add product details
      if value2 is not None:
        result["product_details"] = value2
        # Calculate total price
        quantity = value1.get("quantity", 0) if value1 else 0
        price = value2.get("price", 0) if value2 else 0
        result["total_price"] = quantity * price

      new_value = result
    expression: new_value
    resultType: json

pipelines:
  enrich_orders_with_products:
    from: orders
    via:
      - type: join
        globalTable: products
        mapper: extract_product_id
        valueJoiner: join_order_with_product
      - type: peek
        forEach:
          code: |
            log.info("ENRICHED ORDER - key={}, order_id={}, product={}, total_price={}", 
                     key, 
                     value.get("order_id"), 
                     value.get("product_details", {}).get("name"),
                     value.get("total_price"))
    to: orders_with_product_details

Foreign Key Extraction

The mapper function extracts the foreign key from stream records:

  • Function Type: keyValueMapper (not foreignKeyExtractor)
  • Input: Stream's key and value
  • Output: Key to lookup in the GlobalTable
  • Example: Extract product_id from order to join with product catalog

Stream-Table Left Join

Left joins preserve all records from the stream (left side) while adding data from the table (right side) when available. This is useful for enriching events with optional reference data.

Use Case: Activity Enrichment with Location

Enrich user activity events with location data, preserving all activities even when location information is missing.

What it does:

  • Produces activities and locations: Creates user_activity_events stream and user_location_data table, with some users having location data, others not
  • Uses leftJoin: Joins activity stream with location table - preserves all activities even when no location data exists for user
  • Handles missing data: valueJoiner receives null for location when user not in location table, includes null-check logic
  • Enriches optionally: Adds location data when available, leaves location fields empty/null when not available
  • Preserves all activities: All activity events flow through to output regardless of whether location data exists, maintaining complete activity stream
Producer: User Activity and Locations (click to expand)
functions:
  generate_user_activity:
    type: generator
    globalCode: |
      import random
      import time
      activity_counter = 0
    code: |
      global activity_counter

      user_ids = ["user001", "user002", "user003", "user004", "user005"]
      activity_types = ["login", "logout", "page_view", "purchase", "search"]

      user_id = random.choice(user_ids)
      activity = {
        "activity_id": f"activity_{activity_counter+1:03d}",
        "user_id": user_id,
        "activity_type": random.choice(activity_types),
        "timestamp": int(time.time() * 1000),
        "session_id": f"session_{user_id}_{random.randint(1, 5)}"
      }

      activity_counter += 1
      result = (user_id, activity)
    expression: result
    resultType: (string, json)

  generate_user_locations:
    type: generator
    globalCode: |
      import random
      import time
      location_counter = 0
      user_ids = ["user001", "user002", "user003", "user005"]  # Note: user004 missing
      locations = [
        {"country": "USA", "city": "New York", "timezone": "EST"},
        {"country": "UK", "city": "London", "timezone": "GMT"},
        {"country": "Germany", "city": "Berlin", "timezone": "CET"},
        {"country": "Japan", "city": "Tokyo", "timezone": "JST"}
      ]
    code: |
      global location_counter

      if location_counter < len(user_ids):
        user_id = user_ids[location_counter]
        location = random.choice(locations)
        location_data = {
          "user_id": user_id,
          "country": location["country"],
          "city": location["city"],
          "timezone": location["timezone"],
          "updated_at": int(time.time() * 1000)
        }

        location_counter += 1
        result = (user_id, location_data)
      else:
        result = None
    expression: result
    resultType: (string, json)

producers:
  user_activity_producer:
    generator: generate_user_activity
    interval: 1s
    to:
      topic: user_activity_events
      keyType: string
      valueType: json

  user_location_producer:
    generator: generate_user_locations
    interval: 2s
    count: 4  # Only 4 users have location data
    to:
      topic: user_location_data
      keyType: string
      valueType: json
Processor: Stream-Table Left Join (click to expand)
streams:
  user_activity:
    topic: user_activity_events
    keyType: string  # User ID
    valueType: json  # Activity details

  enriched_activity:
    topic: activity_with_location
    keyType: string  # User ID
    valueType: json  # Activity enriched with location

tables:
  user_locations:
    topic: user_location_data
    keyType: string  # User ID
    valueType: json  # Location details

functions:
  join_activity_with_location:
    type: valueJoiner
    globalCode: |
      import time
    code: |
      # Left join: always preserve activity data, add location if available
      result = {}

      # Always include activity data (left side)
      if value1 is not None:
        result.update(value1)

      # Add location data if available (right side)
      if value2 is not None:
        result["location"] = {
          "country": value2.get("country"),
          "city": value2.get("city"),
          "timezone": value2.get("timezone")
        }
      else:
        # Location not available for this user
        result["location"] = {
          "country": "UNKNOWN",
          "city": "UNKNOWN", 
          "timezone": "UTC"
        }

      # Add enrichment metadata
      result["enriched"] = value2 is not None
      result["enriched_at"] = int(time.time() * 1000)

      new_value = result
    expression: new_value
    resultType: json

pipelines:
  enrich_activity_with_location:
    from: user_activity
    via:
      # Left join with location table - preserves all activities
      - type: leftJoin
        table: user_locations
        valueJoiner: join_activity_with_location
      - type: peek
        forEach:
          code: log.info("ENRICHED ACTIVITY - user={}, activity={}, location={}/{}, enriched={}", key, value.get("activity_type"), value.get("location", {}).get("city"), value.get("location", {}).get("country"), value.get("enriched"))
    to: enriched_activity

This example demonstrates:

  • Left join semantics: All activity events are preserved, even when location data is missing
  • Graceful null handling: Unknown locations get default values instead of causing errors
  • Enrichment metadata: Tracks whether enrichment data was available
  • Real-world pattern: Common scenario where reference data may be incomplete

Expected Behavior:

  • Activities for users with location data are enriched with country/city information
  • Activities for users without location data get "UNKNOWN" placeholders
  • All activities are preserved regardless of location availability

Stream-Stream Outer Join

Outer joins capture events from either stream, making them ideal for tracking incomplete or partial interactions between two event types.

Use Case: Login/Logout Session Analysis

Track user sessions by correlating login and logout events, capturing incomplete sessions where only one event type occurs.

What it does:

  • Produces login and logout events: Creates separate streams for user_login_events and user_logout_events with overlapping but not always matching users
  • Uses outerJoin: Correlates events within 10-minute window, captures login-only, logout-only, and complete login+logout sessions
  • Handles three scenarios: Complete sessions (both events), LOGIN_ONLY (user logged in, no logout captured), LOGOUT_ONLY (logout without login)
  • Calculates session data: For complete sessions computes session duration, for incomplete sessions identifies the pattern and timing
  • Outputs all patterns: Returns session analysis showing complete sessions with duration, or incomplete sessions with pattern classification
Producer: Login and Logout Events (click to expand)
functions:
  generate_login_events:
    type: generator
    globalCode: |
      import random
      import time
      login_counter = 0
    code: |
      global login_counter

      user_ids = ["alice", "bob", "charlie", "diana", "eve"]
      user_id = random.choice(user_ids)

      login_event = {
        "event_id": f"login_{login_counter+1:03d}",
        "user_id": user_id,
        "login_time": int(time.time() * 1000),
        "device": random.choice(["mobile", "desktop", "tablet"]),
        "ip_address": f"192.168.1.{random.randint(1, 255)}"
      }

      login_counter += 1
      result = (user_id, login_event)
    expression: result
    resultType: (string, json)

  generate_logout_events:
    type: generator
    globalCode: |
      import random
      import time
      logout_counter = 0
    code: |
      global logout_counter

      user_ids = ["alice", "bob", "charlie", "frank", "grace"]  # Some different users
      user_id = random.choice(user_ids)

      logout_event = {
        "event_id": f"logout_{logout_counter+1:03d}",
        "user_id": user_id,
        "logout_time": int(time.time() * 1000),
        "session_duration": random.randint(60, 7200)  # 1 min to 2 hours
      }

      logout_counter += 1
      result = (user_id, logout_event)
    expression: result
    resultType: (string, json)

producers:
  login_producer:
    generator: generate_login_events
    interval: 2s
    to:
      topic: user_login_events
      keyType: string
      valueType: json

  logout_producer:
    generator: generate_logout_events
    interval: 3s
    to:
      topic: user_logout_events
      keyType: string
      valueType: json
Processor: Stream-Stream Outer Join (click to expand)
streams:
  user_logins:
    topic: user_login_events
    keyType: string
    valueType: json

  user_logouts:
    topic: user_logout_events
    keyType: string
    valueType: json

  session_analysis:
    topic: user_session_analysis
    keyType: string
    valueType: json

functions:
  analyze_user_session:
    type: valueJoiner
    globalCode: |
      import time
    code: |
      # Outer join: capture login-only, logout-only, and matched events
      result = {}

      # Determine session type based on what data is available
      if value1 is not None and value2 is not None:
        # Both login and logout available - complete session
        result = {
          "session_type": "COMPLETE",
          "user_id": value1.get("user_id"),
          "login_event": value1,
          "logout_event": value2,
          "session_duration": value2.get("session_duration"),
          "device": value1.get("device"),
          "login_time": value1.get("login_time"),
          "logout_time": value2.get("logout_time")
        }
      elif value1 is not None:
        # Login only - user still active or logout not captured
        result = {
          "session_type": "LOGIN_ONLY",
          "user_id": value1.get("user_id"),
          "login_event": value1,
          "logout_event": None,
          "device": value1.get("device"),
          "login_time": value1.get("login_time"),
          "status": "active_or_missing_logout"
        }
      elif value2 is not None:
        # Logout only - login not captured or user was already logged in
        result = {
          "session_type": "LOGOUT_ONLY",
          "user_id": value2.get("user_id"),
          "login_event": None,
          "logout_event": value2,
          "session_duration": value2.get("session_duration"),
          "logout_time": value2.get("logout_time"),
          "status": "missing_login_or_already_active"
        }

      # Add analysis metadata
      result["analyzed_at"] = int(time.time() * 1000)

      new_value = result
    expression: new_value
    resultType: json

pipelines:
  analyze_user_sessions:
    from: user_logins
    via:
      # Outer join to capture all login and logout events
      - type: outerJoin
        stream: user_logouts
        valueJoiner: analyze_user_session
        timeDifference: 10m  # Look for logouts within 10 minutes of login
        grace: 2m  # Grace period for late events
        thisStore:
          name: login_session_store
          type: window
          windowSize: 20m  # Must be 2*timeDifference
          retention: 22m   # Must be 2*timeDifference + grace
          retainDuplicates: true
        otherStore:
          name: logout_session_store
          type: window
          windowSize: 20m  # Must be 2*timeDifference
          retention: 22m   # Must be 2*timeDifference + grace
          retainDuplicates: true
      - type: peek
        forEach:
          code: log.info("SESSION ANALYSIS - user={}, type={}, device={}, duration={}s", key, value.get("session_type"), value.get("device", "N/A"), value.get("session_duration", "N/A"))
    to: session_analysis

This example demonstrates:

  • Outer join semantics: Captures login-only, logout-only, and complete sessions
  • Session analysis: Categorizes different session patterns for analytics
  • Time window correlation: Uses 10-minute window to correlate related events
  • Business insights: Identifies users who login but don't logout (active sessions) or logout without captured login

Expected Behavior:

  • COMPLETE sessions: When both login and logout events occur within the time window
  • LOGIN_ONLY sessions: Users who logged in but no logout was captured (active sessions)
  • LOGOUT_ONLY sessions: Logout events without corresponding login (users already logged in)

Join Type Variants

Each join type supports different semantics for handling missing matches:

Inner Joins

type: join  # Default inner join
Produces output only when both sides have matching keys.

Left Joins

type: leftJoin
Always produces output for the left side (stream), with null for missing right side values.

Outer Joins (Stream-Stream only)

type: outerJoin
Produces output whenever either side has data, with null for missing values.

Note: Table and GlobalTable joins don't support outer joins since tables represent current state, not events.

Performance Considerations

State Management

  • Window sizes: Larger windows consume more memory but capture more correlations
  • Retention periods: Balance between late data handling and resource usage
  • Grace periods: Allow late arrivals while managing state cleanup

Topology Optimization

  • Join order: Join with smaller datasets first when chaining multiple joins
  • GlobalTable usage: Use for frequently accessed reference data to avoid repartitioning
  • Rekeying overhead: Minimize unnecessary rekeying operations

Conclusion

KSML's join operations enable powerful data enrichment patterns:

  • Stream-stream joins correlate events within time windows
  • Stream-table joins enrich events with current state
  • Stream-GlobalTable joins provide foreign key lookups without co-partitioning

Choose the appropriate join type based on your data characteristics and business requirements.

Further Reading