Implementing Joins in KSML
This tutorial explores how to implement join operations in KSML, allowing you to combine data from multiple streams or tables to create enriched datasets.
Introduction
Joins are fundamental operations in stream processing that combine data from multiple sources based on common keys. KSML provides three main types of joins, each serving different use cases in real-time data processing.
KSML joins are built on top of Kafka Streams join operations, providing a YAML-based interface to powerful stream processing capabilities without requiring Java code.
Prerequisites
Before starting this tutorial:
- Have Docker Compose KSML environment setup running
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_clicks && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_purchases && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic correlated_user_actions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_conversions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_clicks_by_product && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_purchases_by_product && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic new_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_orders && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic orders_with_customer_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic product_catalog && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic orders_with_product_details && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_activity_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_location_data && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic activity_with_location && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_login_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_logout_events && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic user_session_analysis && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_profiles && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic customer_preferences && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic enriched_customer_data && \
Core Join Concepts
ValueJoiner Functions
All join operations require a valueJoiner function to combine data from both sides:
valueJoiner:
type: valueJoiner
code: |
# value1: left stream/table value
# value2: right stream/table value
result = {"combined": value1, "enriched": value2}
expression: result
resultType: json
Requirements:
- Must include
expression
andresultType
fields - Handle null values gracefully (especially for left/outer joins)
- Return appropriate data structure for downstream processing
Co-partitioning Requirements
Stream-stream and stream-table joins require co-partitioning:
- Same number of partitions in both topics
- Same partitioning strategy (how keys map to partitions)
- Same key type and serialization format
GlobalTable joins don't require co-partitioning since data is replicated to all instances.
Types of Joins in KSML
KSML supports three main categories of joins, each with specific characteristics and use cases:
1. Stream-Stream Joins
Join two event streams within a time window to correlate related events.
Kafka Streams equivalent: KStream.join()
, KStream.leftJoin()
, KStream.outerJoin()
When to use:
- Correlating events from different systems
- Tracking user behavior across multiple actions
- Detecting patterns that span multiple event types
Key characteristics:
- Requires time windows for correlation
- Both streams must be co-partitioned
- Results are emitted when matching events occur within the window
- Supports inner, left, and outer join semantics
2. Stream-Table Joins
Enrich a stream of events with the latest state from a changelog table.
Kafka Streams equivalent: KStream.join()
, KStream.leftJoin()
with KTable
When to use:
- Enriching events with reference data
- Adding current state information to events
- Looking up the latest value for a key
Key characteristics:
- Stream events are enriched with the latest table value
- Table provides point-in-time lookups
- Requires co-partitioning between stream and table
- Supports inner and left join semantics
3. Stream-GlobalTable Joins
Enrich events using replicated reference data available on all instances.
Kafka Streams equivalent: KStream.join()
, KStream.leftJoin()
with GlobalKTable
When to use:
- Joining with reference data (product catalogs, configuration)
- Foreign key joins where keys don't match directly
- Avoiding co-partitioning requirements
Key characteristics:
- GlobalTable is replicated to all application instances
- Supports foreign key extraction via mapper functions
- No co-partitioning required
- Supports inner and left join semantics
Stream-Stream Join
Stream-stream joins correlate events from two streams within a specified time window. This is essential for detecting patterns and relationships between different event types.
Use Case: User Behavior Analysis
Track user shopping behavior by correlating clicks and purchases within a 30-minute window to understand the customer journey.
What it does:
- Produces two streams: Creates product_clicks (user clicks on products) and product_purchases (user buys products) with user_id keys and timestamps
- Joins with time window: Uses join operation with 30-minute timeDifference to correlate clicks with purchases that happen within 30 minutes
- Stores in window stores: Both streams use 60-minute window stores (2×30min) with retainDuplicates=true to buffer events for correlation
- Combines with valueJoiner: When matching user_id found in both streams within time window, combines click and purchase data into single result
- Outputs correlations: Returns JSON showing both events with conversion analysis (did click lead to purchase, what products involved)
Producer: Clicks and Purchases (click to expand)
functions:
generate_click:
type: generator
globalCode: |
import random
import time
click_counter = 0
users = ["user1", "user2", "user3", "user4", "user5"]
products = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
code: |
global click_counter, users, products
click_counter += 1
user_id = random.choice(users)
value = {
"click_id": f"CLK{click_counter:04d}",
"user_id": user_id,
"product_id": random.choice(products),
"page": "product_detail",
"timestamp": int(time.time() * 1000)
}
key = user_id
expression: (key, value)
resultType: (string, json)
generate_purchase:
type: generator
globalCode: |
import random
import time
purchase_counter = 0
users = ["user1", "user2", "user3", "user4", "user5"]
products = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
code: |
global purchase_counter, users, products
# Only generate purchases 30% of the time to simulate conversion
if random.random() > 0.3:
# Return a proper tuple with None values to skip generation
key = None
value = None
else:
purchase_counter += 1
user_id = random.choice(users)
value = {
"purchase_id": f"PUR{purchase_counter:04d}",
"user_id": user_id,
"product_id": random.choice(products),
"amount": round(random.uniform(20.0, 300.0), 2),
"timestamp": int(time.time() * 1000)
}
key = user_id
expression: (key, value)
resultType: (string, json)
producers:
click_producer:
generator: generate_click
interval: 1s
to:
topic: product_clicks
keyType: string
valueType: json
purchase_producer:
generator: generate_purchase
interval: 3s
to:
topic: product_purchases
keyType: string
valueType: json
Processor: Stream-Stream Join (click to expand)
streams:
product_clicks:
topic: product_clicks
keyType: string
valueType: json
product_purchases:
topic: product_purchases
keyType: string
valueType: json
correlated_user_actions:
topic: correlated_user_actions
keyType: string
valueType: json
functions:
correlate_click_and_purchase:
type: valueJoiner
code: |
result = {}
# Add click data
if value1 is not None:
result["click"] = value1
# Add purchase data
if value2 is not None:
result["purchase"] = value2
# Calculate conversion if both exist
if value1 is not None and value2 is not None:
result["converted"] = True
click_time = value1.get("timestamp", 0)
purchase_time = value2.get("timestamp", 0)
result["conversion_time_ms"] = purchase_time - click_time
else:
result["converted"] = False
new_value = result
expression: new_value
resultType: json
pipelines:
match_clicks_with_purchases:
from: product_clicks
via:
- type: join
stream: product_purchases
valueJoiner: correlate_click_and_purchase
timeDifference: 30m # Look for purchases within 30 minutes of a click
grace: 5m # Grace period for late events
thisStore:
name: clicks_join_store
type: window
windowSize: 60m # Must be 2*timeDifference
retention: 65m # Must be 2*timeDifference + grace = 60m + 5m
retainDuplicates: true
otherStore:
name: purchases_join_store
type: window
windowSize: 60m # Must be 2*timeDifference
retention: 65m # Must be 2*timeDifference + grace = 60m + 5m
retainDuplicates: true
- type: peek
forEach:
code: log.info("CORRELATION - user={}, converted={}, click_product={}, purchase_product={}", key, value.get("converted"), value.get("click", {}).get("product_id"), value.get("purchase", {}).get("product_id"))
to: correlated_user_actions
Key Configuration Points
The aim here is to show how time windows must be used to correlate events from different streams. The configuration demonstrates:
- timeDifference: 30m - Maximum time gap between correlated events
- Window Stores: Both streams need stores with
retainDuplicates: true
- Window Size: Must be
2 × timeDifference
(60m) to buffer events from both streams - Retention: Must be
2 × timeDifference + grace
(65m) for proper state cleanup - Grace Period: 5m allowance for late-arriving events
This configuration ensures events are only correlated within a reasonable time frame while managing memory efficiently.
Stream-Table Join
Stream-table joins enrich streaming events with the latest state from a changelog table. This pattern is common for adding reference data to events.
Use Case: Order Enrichment
Enrich order events with customer information by joining the orders stream with a customers table.
What it does:
- Produces orders and customers: Creates order stream (keyed by order_id) and customer table (keyed by customer_id) with matching customer references
- Rekeys for join: Transforms order key from order_id to customer_id using extract_customer_id function to enable join with customers table
- Joins stream with table: Uses join operation to combine each order with latest customer data from customers table based on customer_id
- Combines data: valueJoiner function merges order details with customer information into enriched result containing both datasets
- Restores original key: Transforms key back from customer_id to order_id using restore_order_key for downstream processing consistency
Producer: Orders (click to expand)
functions:
generate_order:
type: generator
globalCode: |
import random
import time
order_counter = 0
customers = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
products = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
code: |
global order_counter, customers, products
order_counter += 1
order_id = f"ORD{order_counter:04d}"
customer_id = random.choice(customers)
# Generate order
value = {
"order_id": order_id,
"customer_id": customer_id,
"product_id": random.choice(products),
"quantity": random.randint(1, 5),
"amount": round(random.uniform(10.0, 200.0), 2),
"timestamp": int(time.time() * 1000)
}
# Use order_id as key (natural key for orders)
key = order_id
expression: (key, value)
resultType: (string, json)
producers:
order_producer:
generator: generate_order
interval: 2s
to:
topic: new_orders
keyType: string
valueType: json
Producer: Customers (click to expand)
functions:
generate_customer:
type: generator
globalCode: |
import random
customers_data = [
{"id": "CUST001", "name": "Alice Johnson", "email": "alice@email.com", "region": "US-WEST"},
{"id": "CUST002", "name": "Bob Smith", "email": "bob@email.com", "region": "US-EAST"},
{"id": "CUST003", "name": "Carol Davis", "email": "carol@email.com", "region": "EU-WEST"},
{"id": "CUST004", "name": "David Wilson", "email": "david@email.com", "region": "ASIA-PACIFIC"},
{"id": "CUST005", "name": "Eva Brown", "email": "eva@email.com", "region": "EU-CENTRAL"}
]
customer_index = 0
code: |
global customers_data, customer_index
# Cycle through customers
customer = customers_data[customer_index % len(customers_data)]
customer_index += 1
value = {
"name": customer["name"],
"email": customer["email"],
"region": customer["region"],
"status": "active"
}
key = customer["id"]
expression: (key, value)
resultType: (string, json)
producers:
customer_producer:
generator: generate_customer
interval: 10s
to:
topic: customer_data
keyType: string
valueType: json
Processor: Stream-Table Join (click to expand)
streams:
orders:
topic: new_orders
keyType: string # Order ID
valueType: json # Order details
enriched_orders:
topic: orders_with_customer_data
keyType: string # Order ID
valueType: json # Combined order and customer data
tables:
customers:
topic: customer_data
keyType: string # Customer ID
valueType: json # Customer details
functions:
extract_customer_id:
type: keyTransformer
code: |
# Extract customer_id from order to use as key for join
new_key = value.get("customer_id") if value else None
expression: new_key
resultType: string
join_order_with_customer:
type: valueJoiner
code: |
# Combine order and customer information
result = {}
# Add order details
if value1 is not None:
result.update(value1)
# Add customer details
if value2 is not None:
result["customer"] = value2
new_value = result
expression: new_value
resultType: json
restore_order_key:
type: keyTransformer
code: |
# Restore order_id as key after join
new_key = value.get("order_id") if value else None
expression: new_key
resultType: string
pipelines:
enrich_orders:
from: orders
via:
# Rekey to customer_id for join
- type: transformKey
mapper: extract_customer_id
# Join with customers table
- type: join
table: customers
valueJoiner: join_order_with_customer
# Rekey back to order_id
- type: transformKey
mapper: restore_order_key
- type: peek
forEach:
code: log.info("ENRICHED ORDER - key={}, order_id={}, customer={}", key, value.get("order_id"), value.get("customer", {}).get("name"))
to: enriched_orders
Rekeying Pattern
The rekeying pattern is essential when join keys don't match naturally:
- Use
transformKey
to extract the join key from the stream - Perform the join operation
- Optionally restore the original key for downstream consistency
Stream-GlobalTable Join
GlobalTable joins enable enrichment with reference data that's replicated across all instances, supporting foreign key relationships.
Use Case: Product Catalog Enrichment
Enrich orders with product details using a foreign key join with a global product catalog.
What it does:
- Produces orders and products: Creates order stream with product_id references and product_catalog globalTable with product details
- Uses foreign key mapper: Extracts product_id from order value using keyValueMapper to look up in product_catalog globalTable
- Joins with globalTable: No co-partitioning needed - globalTable replicated to all instances, joins orders with product details by product_id
- Enriches with product data: Combines order information with product details (name, price, category) from catalog
- Outputs enriched orders: Returns orders enhanced with full product information for downstream processing
Producer: Orders and Products (click to expand)
functions:
generate_order:
type: generator
globalCode: |
import random
import time
order_counter = 0
customers = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
products = ["PROD001", "PROD002", "PROD003", "PROD004", "PROD005"]
code: |
global order_counter, customers, products
order_counter += 1
order_id = f"ORD{order_counter:04d}"
customer_id = random.choice(customers)
product_id = random.choice(products)
# Generate order with product_id for foreign key join
value = {
"order_id": order_id,
"customer_id": customer_id,
"product_id": product_id,
"quantity": random.randint(1, 5),
"timestamp": int(time.time() * 1000)
}
# Use order_id as key (natural key for orders)
key = order_id
expression: (key, value)
resultType: (string, json)
generate_product:
type: generator
globalCode: |
import random
products_data = [
{"id": "PROD001", "name": "Laptop", "category": "Electronics", "price": 999.99},
{"id": "PROD002", "name": "Headphones", "category": "Electronics", "price": 149.99},
{"id": "PROD003", "name": "Coffee Maker", "category": "Appliances", "price": 79.99},
{"id": "PROD004", "name": "Backpack", "category": "Accessories", "price": 49.99},
{"id": "PROD005", "name": "Desk Lamp", "category": "Furniture", "price": 39.99}
]
product_index = 0
code: |
global products_data, product_index
# Cycle through products to populate the global table
product = products_data[product_index % len(products_data)]
product_index += 1
value = {
"name": product["name"],
"category": product["category"],
"price": product["price"],
"in_stock": True
}
# Product ID as key for the global table
key = product["id"]
expression: (key, value)
resultType: (string, json)
producers:
order_producer:
generator: generate_order
interval: 2s
to:
topic: new_orders
keyType: string
valueType: json
product_producer:
generator: generate_product
interval: 10s
to:
topic: product_catalog
keyType: string
valueType: json
Processor: Foreign Key Join (click to expand)
streams:
orders:
topic: new_orders
keyType: string # Order ID
valueType: json # Order details including product_id
orders_with_product_details:
topic: orders_with_product_details
keyType: string # Order ID
valueType: json # Order enriched with product information
globalTables:
products:
topic: product_catalog
keyType: string # Product ID
valueType: json # Product details
functions:
extract_product_id:
type: keyValueMapper
code: |
# Map from order (key, value) to product_id for join
product_id = value.get("product_id") if value else None
expression: product_id
resultType: string
join_order_with_product:
type: valueJoiner
code: |
# Combine order and product information
result = {}
# Add order details
if value1 is not None:
result.update(value1)
# Add product details
if value2 is not None:
result["product_details"] = value2
# Calculate total price
quantity = value1.get("quantity", 0) if value1 else 0
price = value2.get("price", 0) if value2 else 0
result["total_price"] = quantity * price
new_value = result
expression: new_value
resultType: json
pipelines:
enrich_orders_with_products:
from: orders
via:
- type: join
globalTable: products
mapper: extract_product_id
valueJoiner: join_order_with_product
- type: peek
forEach:
code: |
log.info("ENRICHED ORDER - key={}, order_id={}, product={}, total_price={}",
key,
value.get("order_id"),
value.get("product_details", {}).get("name"),
value.get("total_price"))
to: orders_with_product_details
Foreign Key Extraction
The mapper
function extracts the foreign key from stream records:
- Function Type:
keyValueMapper
(notforeignKeyExtractor
) - Input: Stream's key and value
- Output: Key to lookup in the GlobalTable
- Example: Extract product_id from order to join with product catalog
Stream-Table Left Join
Left joins preserve all records from the stream (left side) while adding data from the table (right side) when available. This is useful for enriching events with optional reference data.
Use Case: Activity Enrichment with Location
Enrich user activity events with location data, preserving all activities even when location information is missing.
What it does:
- Produces activities and locations: Creates user_activity_events stream and user_location_data table, with some users having location data, others not
- Uses leftJoin: Joins activity stream with location table - preserves all activities even when no location data exists for user
- Handles missing data: valueJoiner receives null for location when user not in location table, includes null-check logic
- Enriches optionally: Adds location data when available, leaves location fields empty/null when not available
- Preserves all activities: All activity events flow through to output regardless of whether location data exists, maintaining complete activity stream
Producer: User Activity and Locations (click to expand)
functions:
generate_user_activity:
type: generator
globalCode: |
import random
import time
activity_counter = 0
code: |
global activity_counter
user_ids = ["user001", "user002", "user003", "user004", "user005"]
activity_types = ["login", "logout", "page_view", "purchase", "search"]
user_id = random.choice(user_ids)
activity = {
"activity_id": f"activity_{activity_counter+1:03d}",
"user_id": user_id,
"activity_type": random.choice(activity_types),
"timestamp": int(time.time() * 1000),
"session_id": f"session_{user_id}_{random.randint(1, 5)}"
}
activity_counter += 1
result = (user_id, activity)
expression: result
resultType: (string, json)
generate_user_locations:
type: generator
globalCode: |
import random
import time
location_counter = 0
user_ids = ["user001", "user002", "user003", "user005"] # Note: user004 missing
locations = [
{"country": "USA", "city": "New York", "timezone": "EST"},
{"country": "UK", "city": "London", "timezone": "GMT"},
{"country": "Germany", "city": "Berlin", "timezone": "CET"},
{"country": "Japan", "city": "Tokyo", "timezone": "JST"}
]
code: |
global location_counter
if location_counter < len(user_ids):
user_id = user_ids[location_counter]
location = random.choice(locations)
location_data = {
"user_id": user_id,
"country": location["country"],
"city": location["city"],
"timezone": location["timezone"],
"updated_at": int(time.time() * 1000)
}
location_counter += 1
result = (user_id, location_data)
else:
result = None
expression: result
resultType: (string, json)
producers:
user_activity_producer:
generator: generate_user_activity
interval: 1s
to:
topic: user_activity_events
keyType: string
valueType: json
user_location_producer:
generator: generate_user_locations
interval: 2s
count: 4 # Only 4 users have location data
to:
topic: user_location_data
keyType: string
valueType: json
Processor: Stream-Table Left Join (click to expand)
streams:
user_activity:
topic: user_activity_events
keyType: string # User ID
valueType: json # Activity details
enriched_activity:
topic: activity_with_location
keyType: string # User ID
valueType: json # Activity enriched with location
tables:
user_locations:
topic: user_location_data
keyType: string # User ID
valueType: json # Location details
functions:
join_activity_with_location:
type: valueJoiner
globalCode: |
import time
code: |
# Left join: always preserve activity data, add location if available
result = {}
# Always include activity data (left side)
if value1 is not None:
result.update(value1)
# Add location data if available (right side)
if value2 is not None:
result["location"] = {
"country": value2.get("country"),
"city": value2.get("city"),
"timezone": value2.get("timezone")
}
else:
# Location not available for this user
result["location"] = {
"country": "UNKNOWN",
"city": "UNKNOWN",
"timezone": "UTC"
}
# Add enrichment metadata
result["enriched"] = value2 is not None
result["enriched_at"] = int(time.time() * 1000)
new_value = result
expression: new_value
resultType: json
pipelines:
enrich_activity_with_location:
from: user_activity
via:
# Left join with location table - preserves all activities
- type: leftJoin
table: user_locations
valueJoiner: join_activity_with_location
- type: peek
forEach:
code: log.info("ENRICHED ACTIVITY - user={}, activity={}, location={}/{}, enriched={}", key, value.get("activity_type"), value.get("location", {}).get("city"), value.get("location", {}).get("country"), value.get("enriched"))
to: enriched_activity
This example demonstrates:
- Left join semantics: All activity events are preserved, even when location data is missing
- Graceful null handling: Unknown locations get default values instead of causing errors
- Enrichment metadata: Tracks whether enrichment data was available
- Real-world pattern: Common scenario where reference data may be incomplete
Expected Behavior:
- Activities for users with location data are enriched with country/city information
- Activities for users without location data get "UNKNOWN" placeholders
- All activities are preserved regardless of location availability
Stream-Stream Outer Join
Outer joins capture events from either stream, making them ideal for tracking incomplete or partial interactions between two event types.
Use Case: Login/Logout Session Analysis
Track user sessions by correlating login and logout events, capturing incomplete sessions where only one event type occurs.
What it does:
- Produces login and logout events: Creates separate streams for user_login_events and user_logout_events with overlapping but not always matching users
- Uses outerJoin: Correlates events within 10-minute window, captures login-only, logout-only, and complete login+logout sessions
- Handles three scenarios: Complete sessions (both events), LOGIN_ONLY (user logged in, no logout captured), LOGOUT_ONLY (logout without login)
- Calculates session data: For complete sessions computes session duration, for incomplete sessions identifies the pattern and timing
- Outputs all patterns: Returns session analysis showing complete sessions with duration, or incomplete sessions with pattern classification
Producer: Login and Logout Events (click to expand)
functions:
generate_login_events:
type: generator
globalCode: |
import random
import time
login_counter = 0
code: |
global login_counter
user_ids = ["alice", "bob", "charlie", "diana", "eve"]
user_id = random.choice(user_ids)
login_event = {
"event_id": f"login_{login_counter+1:03d}",
"user_id": user_id,
"login_time": int(time.time() * 1000),
"device": random.choice(["mobile", "desktop", "tablet"]),
"ip_address": f"192.168.1.{random.randint(1, 255)}"
}
login_counter += 1
result = (user_id, login_event)
expression: result
resultType: (string, json)
generate_logout_events:
type: generator
globalCode: |
import random
import time
logout_counter = 0
code: |
global logout_counter
user_ids = ["alice", "bob", "charlie", "frank", "grace"] # Some different users
user_id = random.choice(user_ids)
logout_event = {
"event_id": f"logout_{logout_counter+1:03d}",
"user_id": user_id,
"logout_time": int(time.time() * 1000),
"session_duration": random.randint(60, 7200) # 1 min to 2 hours
}
logout_counter += 1
result = (user_id, logout_event)
expression: result
resultType: (string, json)
producers:
login_producer:
generator: generate_login_events
interval: 2s
to:
topic: user_login_events
keyType: string
valueType: json
logout_producer:
generator: generate_logout_events
interval: 3s
to:
topic: user_logout_events
keyType: string
valueType: json
Processor: Stream-Stream Outer Join (click to expand)
streams:
user_logins:
topic: user_login_events
keyType: string
valueType: json
user_logouts:
topic: user_logout_events
keyType: string
valueType: json
session_analysis:
topic: user_session_analysis
keyType: string
valueType: json
functions:
analyze_user_session:
type: valueJoiner
globalCode: |
import time
code: |
# Outer join: capture login-only, logout-only, and matched events
result = {}
# Determine session type based on what data is available
if value1 is not None and value2 is not None:
# Both login and logout available - complete session
result = {
"session_type": "COMPLETE",
"user_id": value1.get("user_id"),
"login_event": value1,
"logout_event": value2,
"session_duration": value2.get("session_duration"),
"device": value1.get("device"),
"login_time": value1.get("login_time"),
"logout_time": value2.get("logout_time")
}
elif value1 is not None:
# Login only - user still active or logout not captured
result = {
"session_type": "LOGIN_ONLY",
"user_id": value1.get("user_id"),
"login_event": value1,
"logout_event": None,
"device": value1.get("device"),
"login_time": value1.get("login_time"),
"status": "active_or_missing_logout"
}
elif value2 is not None:
# Logout only - login not captured or user was already logged in
result = {
"session_type": "LOGOUT_ONLY",
"user_id": value2.get("user_id"),
"login_event": None,
"logout_event": value2,
"session_duration": value2.get("session_duration"),
"logout_time": value2.get("logout_time"),
"status": "missing_login_or_already_active"
}
# Add analysis metadata
result["analyzed_at"] = int(time.time() * 1000)
new_value = result
expression: new_value
resultType: json
pipelines:
analyze_user_sessions:
from: user_logins
via:
# Outer join to capture all login and logout events
- type: outerJoin
stream: user_logouts
valueJoiner: analyze_user_session
timeDifference: 10m # Look for logouts within 10 minutes of login
grace: 2m # Grace period for late events
thisStore:
name: login_session_store
type: window
windowSize: 20m # Must be 2*timeDifference
retention: 22m # Must be 2*timeDifference + grace
retainDuplicates: true
otherStore:
name: logout_session_store
type: window
windowSize: 20m # Must be 2*timeDifference
retention: 22m # Must be 2*timeDifference + grace
retainDuplicates: true
- type: peek
forEach:
code: log.info("SESSION ANALYSIS - user={}, type={}, device={}, duration={}s", key, value.get("session_type"), value.get("device", "N/A"), value.get("session_duration", "N/A"))
to: session_analysis
This example demonstrates:
- Outer join semantics: Captures login-only, logout-only, and complete sessions
- Session analysis: Categorizes different session patterns for analytics
- Time window correlation: Uses 10-minute window to correlate related events
- Business insights: Identifies users who login but don't logout (active sessions) or logout without captured login
Expected Behavior:
- COMPLETE sessions: When both login and logout events occur within the time window
- LOGIN_ONLY sessions: Users who logged in but no logout was captured (active sessions)
- LOGOUT_ONLY sessions: Logout events without corresponding login (users already logged in)
Join Type Variants
Each join type supports different semantics for handling missing matches:
Inner Joins
Produces output only when both sides have matching keys.Left Joins
Always produces output for the left side (stream), with null for missing right side values.Outer Joins (Stream-Stream only)
Produces output whenever either side has data, with null for missing values.Note: Table and GlobalTable joins don't support outer joins since tables represent current state, not events.
Performance Considerations
State Management
- Window sizes: Larger windows consume more memory but capture more correlations
- Retention periods: Balance between late data handling and resource usage
- Grace periods: Allow late arrivals while managing state cleanup
Topology Optimization
- Join order: Join with smaller datasets first when chaining multiple joins
- GlobalTable usage: Use for frequently accessed reference data to avoid repartitioning
- Rekeying overhead: Minimize unnecessary rekeying operations
Conclusion
KSML's join operations enable powerful data enrichment patterns:
- Stream-stream joins correlate events within time windows
- Stream-table joins enrich events with current state
- Stream-GlobalTable joins provide foreign key lookups without co-partitioning
Choose the appropriate join type based on your data characteristics and business requirements.