Fraud Detection with KSML
This guide demonstrates how to build a real-time fraud detection system using KSML. You'll learn how to analyze transaction streams, identify suspicious patterns, and generate alerts for potential fraud.
Introduction
Fraud detection is a critical application of stream processing, allowing organizations to identify and respond to fraudulent activities in real-time. Key benefits include:
- Minimizing financial losses by detecting fraud as it happens
- Reducing false positives through multi-factor analysis
- Adapting to evolving fraud patterns with flexible detection rules
- Providing immediate alerts to security teams and customers
KSML provides powerful capabilities for building sophisticated fraud detection systems that can process high volumes of transactions and apply complex detection algorithms in real-time.
Prerequisites
Before starting this guide, you should:
- Understand basic KSML concepts (streams, functions, pipelines)
- Have completed the KSML Basics Tutorial
- Be familiar with Aggregations
- Have a basic understanding of Windowed Operations
- Be familiar with State Stores
The Use Case
Imagine you're building a fraud detection system for a financial institution that processes millions of credit card transactions daily. You want to:
- Identify suspicious transactions based on multiple risk factors
- Track unusual patterns in customer behavior
- Generate real-time alerts for high-risk activities
- Maintain a low rate of false positives
Define the topics for the use case
In earlier tutorials, you created a Docker Compose file with all the necessary containers. For this use case guide, some other topics
are needed.
To have these created, open the docker-compose.yml
in the examples directory, and find the definitions for the kafka-setup
container
which creates the topics.
Change the definition so that the startup command for the setup container (the command
section) looks like the following:
command
section for the kafka-setup container (click to expand)
command: "bash -c 'echo Creating topics... && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic credit_card_transactions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic high_value_transactions && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic unusual_location_alerts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic transaction_velocity_alerts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic transaction_pattern_alerts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic fraud_alerts && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic fraud_notifications'"
Defining the Data Model
Our transaction data will have the following structure:
{
"transaction_id": "tx-123456",
"timestamp": 1625097600000,
"card_id": "card-789",
"customer_id": "cust-456",
"merchant": {
"id": "merch-123",
"name": "Online Electronics Store",
"category": "electronics",
"location": {
"country": "US",
"state": "CA",
"city": "San Francisco"
}
},
"amount": 299.99,
"currency": "USD",
"transaction_type": "online",
"ip_address": "203.0.113.45"
}
Creating the KSML Definition
Now, let's create our KSML definition file. This defines operations that check for transactions coming from unusual locations, coming at an unsual speed, or are for a high amount:
Basic fraud detection pipeline (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json
streams:
transactions:
topic: credit_card_transactions
keyType: string # transaction_id
valueType: json # transaction data
high_value_transactions:
topic: high_value_transactions
keyType: string # transaction_id
valueType: json # transaction data
unusual_location_alerts:
topic: unusual_location_alerts
keyType: string # card_id
valueType: json # alert data
velocity_alerts:
topic: transaction_velocity_alerts
keyType: string # card_id
valueType: json # alert data
fraud_alerts:
topic: fraud_alerts
keyType: string # transaction_id
valueType: json # consolidated alert data
stores:
customer_transaction_history:
type: keyValue
persistent: true
keyType: string
valueType: json
card_location_history:
type: keyValue
persistent: true
keyType: string
valueType: json
functions:
check_high_value:
type: predicate
code: |
# Define thresholds for different merchant categories
thresholds = {
"electronics": 1000,
"jewelry": 2000,
"travel": 3000,
"default": 500
}
category = value.get("merchant", {}).get("category", "default")
threshold = thresholds.get(category, thresholds["default"])
return value.get("amount", 0) > threshold
create_high_value_alert:
type: valueTransformer
code: |
result = {
"transaction_id": value.get("transaction_id"),
"timestamp": value.get("timestamp"),
"customer_id": value.get("customer_id"),
"card_id": value.get("card_id"),
"merchant": value.get("merchant"),
"amount": value.get("amount"),
"alert_type": "high_value_transaction",
"risk_score": min(100, value.get("amount") / 10) # Simple scoring based on amount
}
expression: result
resultType: struct
check_unusual_location:
type: valueTransformer
code: |
card_id = value.get("card_id")
current_country = value.get("merchant", {}).get("location", {}).get("country")
current_state = value.get("merchant", {}).get("location", {}).get("state")
# Get location history for this card
location_history = card_location_history.get(card_id)
unusual_location = False
if location_history is None:
# First transaction for this card, initialize history
location_history = {
"last_countries": [current_country],
"last_states": [current_state],
"last_transaction_time": value.get("timestamp")
}
card_location_history.put(card_id, location_history)
else:
# Check for unusual location
time_since_last = value.get("timestamp") - location_history.get("last_transaction_time", 0)
# If transaction is in a different country than any in history
if current_country not in location_history.get("last_countries", []):
unusual_location = True
# If transaction is in a different state and happened within 2 hours of last transaction
elif (current_state not in location_history.get("last_states", []) and time_since_last < 7200000): # 2 hours in milliseconds
unusual_location = True
# Update location history (keep last 3)
last_countries = location_history.get("last_countries", [])
if current_country not in last_countries:
last_countries.append(current_country)
if len(last_countries) > 3:
last_countries = last_countries[-3:]
last_states = location_history.get("last_states", [])
if current_state not in last_states:
last_states.append(current_state)
if len(last_states) > 3:
last_states = last_states[-3:]
location_history = {
"last_countries": last_countries,
"last_states": last_states,
"last_transaction_time": value.get("timestamp")
}
card_location_history.put(card_id, location_history)
if unusual_location:
result = {
"transaction_id": value.get("transaction_id"),
"timestamp": value.get("timestamp"),
"customer_id": value.get("customer_id"),
"card_id": card_id,
"current_location": {
"country": current_country,
"state": current_state
},
"previous_locations": {
"countries": location_history.get("last_countries", [])[:-1],
"states": location_history.get("last_states", [])[:-1]
},
"time_since_last_transaction": time_since_last,
"alert_type": "unusual_location",
"risk_score": 70 if current_country not in location_history.get("last_countries", [])[:-1] else 40
}
else:
result = None
resultType: json
expression: result
stores:
- card_location_history
check_transaction_velocity:
type: valueTransformer
code: |
card_id = value.get("card_id")
current_time = value.get("timestamp")
result = None
# Get transaction history for this card
history = customer_transaction_history.get(card_id)
if history is None:
# First transaction for this card, initialize history
history = {
"transaction_times": [current_time],
"transaction_count_1h": 1,
"transaction_count_24h": 1,
"total_amount_24h": value.get("amount", 0)
}
customer_transaction_history.put(card_id, history)
result = None # No alert for first transaction
else:
# Update transaction history
transaction_times = history.get("transaction_times", [])
transaction_times.append(current_time)
# Keep only transactions from the last 24 hours
one_day_ago = current_time - 86400000 # 24 hours in milliseconds
transaction_times = [t for t in transaction_times if t > one_day_ago]
# Count transactions in the last hour
one_hour_ago = current_time - 3600000 # 1 hour in milliseconds
transaction_count_1h = sum(1 for t in transaction_times if t > one_hour_ago)
# Calculate total amount in the last 24 hours
total_amount_24h = history.get("total_amount_24h", 0) + value.get("amount", 0)
if len(transaction_times) < len(history.get("transaction_times", [])):
# Some transactions dropped out of the 24h window, recalculate total
# In a real system, you would store individual transaction amounts
# This is simplified for the example
total_amount_24h = value.get("amount", 0) * len(transaction_times)
# Update history
history = {
"transaction_times": transaction_times,
"transaction_count_1h": transaction_count_1h,
"transaction_count_24h": len(transaction_times),
"total_amount_24h": total_amount_24h
}
customer_transaction_history.put(card_id, history)
# More than 5 transactions in an hour
if transaction_count_1h > 5:
result = {
"transaction_id": value.get("transaction_id"),
"timestamp": value.get("timestamp"),
"customer_id": value.get("customer_id"),
"card_id": card_id,
"transactions_last_hour": transaction_count_1h,
"transactions_last_day": len(transaction_times),
"total_amount_24h": total_amount_24h,
"alert_type": "high_transaction_velocity",
"risk_score": min(100, transaction_count_1h * 10)
}
resultType: json
expression: result
stores:
- customer_transaction_history
calculate_fraud_score:
type: valueTransformer
code: |
# Base risk score from the alert
risk_score = value.get("risk_score", 0)
# Additional factors
alert_type = value.get("alert_type", "")
# Adjust score based on transaction type
if value.get("transaction_type") == "online":
risk_score += 10
# Adjust score based on merchant category
high_risk_categories = ["electronics", "jewelry", "digital_goods"]
if value.get("merchant", {}).get("category") in high_risk_categories:
risk_score += 15
# Cap the score at 100
risk_score = min(100, risk_score)
# Add the calculated score to the alert
value["final_risk_score"] = risk_score
value["is_likely_fraud"] = risk_score > 70
expression: value
resultType: json
pipelines:
# Pipeline for high-value transaction detection
high_value_detection:
from: transactions
via:
- type: filter
if: check_high_value
- type: transformValue
mapper: create_high_value_alert
to: high_value_transactions
# Pipeline for unusual location detection
unusual_location_detection:
from: transactions
via:
- type: transformValue
mapper: check_unusual_location
- type: filter
if:
expression: value is not None
to: unusual_location_alerts
# Pipeline for transaction velocity monitoring
velocity_monitoring:
from: transactions
via:
- type: transformValue
mapper: check_transaction_velocity
- type: filter
if:
expression: value is not None
to: velocity_alerts
# Pipeline for consolidating alerts and calculating final fraud score
fraud_scoring_high_value:
from: high_value_transactions
via:
- type: transformValue
mapper: calculate_fraud_score
to: fraud_alerts
fraud_scoring_location:
from: unusual_location_alerts
via:
- type: transformValue
mapper: calculate_fraud_score
to: fraud_alerts
fraud_scoring_velocity:
from: velocity_alerts
via:
- type: transformValue
mapper: calculate_fraud_score
to: fraud_alerts
Advanced Fraud Detection Techniques
Pattern Recognition
To detect complex fraud patterns, you can implement more sophisticated algorithms:
Pattern recognition processor (click to expand)
functions:
detect_fraud_pattern:
type: valueTransformer
code: |
# Pattern: Small test transaction followed by large transaction
card_id = value.get("card_id")
current_amount = value.get("amount", 0)
# Get transaction history
history = customer_transaction_history.get(card_id)
if history is None or "recent_transactions" not in history:
# Initialize history
history = {"recent_transactions": [{"amount": current_amount, "time": value.get("timestamp")}]}
customer_transaction_history.put(card_id, history)
return None
# Add current transaction to history
recent_transactions = history.get("recent_transactions", [])
recent_transactions.append({"amount": current_amount, "time": value.get("timestamp")})
# Keep only recent transactions (last 24 hours)
one_day_ago = value.get("timestamp") - 86400000
recent_transactions = [t for t in recent_transactions if t.get("time", 0) > one_day_ago]
# Sort by time
recent_transactions.sort(key=lambda x: x.get("time", 0))
# Look for pattern: small transaction (< $5) followed by large transaction within 30 minutes
pattern_found = False
for i in range(len(recent_transactions) - 1):
if (recent_transactions[i].get("amount", 0) < 5 and
recent_transactions[i+1].get("amount", 0) > 100 and
recent_transactions[i+1].get("time", 0) - recent_transactions[i].get("time", 0) < 1800000): # 30 minutes
pattern_found = True
break
# Update history
history["recent_transactions"] = recent_transactions
customer_transaction_history.put(card_id, history)
if pattern_found:
return {
"transaction_id": value.get("transaction_id"),
"timestamp": value.get("timestamp"),
"customer_id": value.get("customer_id"),
"card_id": card_id,
"alert_type": "fraud_pattern_detected",
"pattern_type": "test_then_charge",
"risk_score": 85
}
return None
resultType: struct
stores:
- customer_transaction_history
calculate_fraud_score:
type: valueTransformer
code: |
# Base risk score from the alert
risk_score = value.get("risk_score", 0)
# Additional factors
alert_type = value.get("alert_type", "")
# Adjust score based on transaction type
if value.get("transaction_type") == "online":
risk_score += 10
# Adjust score based on merchant category
high_risk_categories = ["electronics", "jewelry", "digital_goods"]
if value.get("merchant", {}).get("category") in high_risk_categories:
risk_score += 15
# Cap the score at 100
risk_score = min(100, risk_score)
# Add the calculated score to the alert
value["final_risk_score"] = risk_score
value["is_likely_fraud"] = risk_score > 70
Machine Learning Integration
For more advanced fraud detection, you can integrate machine learning models:
Machine learning fraud detector (click to expand)
functions:
ml_fraud_prediction:
type: valueTransformer
parameters:
- name: value
type: object
code: |
# In a real implementation, you would call an external ML service
# This is a simplified example
# Extract features
features = {
"amount": value.get("amount", 0),
"is_international": value.get("merchant", {}).get("location", {}).get("country") != "US",
"is_online": value.get("transaction_type") == "online",
"high_risk_merchant": value.get("merchant", {}).get("category") in ["electronics", "jewelry", "digital_goods"],
"transaction_hour": (value.get("timestamp", 0) / 3600000) % 24, # Hour of day
"transaction_count_1h": value.get("transactions_last_hour", 1),
"transaction_count_24h": value.get("transactions_last_day", 1)
}
# Simple rule-based model (in reality, this would be a trained ML model)
score = 0
if features["amount"] > 1000: score += 20
if features["is_international"]: score += 15
if features["is_online"]: score += 10
if features["high_risk_merchant"]: score += 15
if features["transaction_hour"] > 22 or features["transaction_hour"] < 6: score += 10
if features["transaction_count_1h"] > 3: score += 15
if features["transaction_count_24h"] > 10: score += 15
# Normalize score to 0-100
score = min(100, score)
# Add prediction to the transaction
value["ml_fraud_score"] = score
value["ml_fraud_probability"] = score / 100.0
value["ml_is_fraud"] = score > 60
return value
Real-time Alerting
To make your fraud detection system actionable, you need to generate alerts:
Real-time alerting processor (click to expand)
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/release/1.0.x/docs/ksml-language-spec.json
streams:
fraud_alerts:
topic: fraud_alerts
keyType: string # transaction_id
valueType: json # consolidated alert data
fraud_notifications:
topic: fraud_notifications
keyType: string # transaction_id
valueType: json # notifications to be sent out
functions:
generate_notification:
type: valueTransformer
code: |
risk_score = value.get("final_risk_score", 0)
# Determine alert level
alert_level = "low"
if risk_score > 70:
alert_level = "high"
elif risk_score > 40:
alert_level = "medium"
# Create alert message
alert = {
"transaction_id": value.get("transaction_id"),
"timestamp": value.get("timestamp"),
"customer_id": value.get("customer_id"),
"card_id": value.get("card_id"),
"merchant": value.get("merchant"),
"amount": value.get("amount"),
"alert_type": value.get("alert_type"),
"risk_score": risk_score,
"alert_level": alert_level,
"alert_message": f"Potential fraud detected: {value.get('alert_type')} with risk score {risk_score}",
"recommended_action": "block" if alert_level == "high" else "review"
}
return alert
pipelines:
alert_generation:
from: fraud_alerts
via:
- type: filter
if:
expression: value.get("final_risk_score", 0) > 30 # Only notify on medium to high risk
- type: transformValue
mapper: generate_notification
to: fraud_notifications
Testing and Validation
To test your fraud detection system:
- Generate sample transaction data with known fraud patterns
- Deploy your KSML application using the proper configuration
- Monitor the alert topics to verify detection accuracy
- Adjust thresholds and rules to balance detection rate and false positives
Production Considerations
When deploying fraud detection systems to production:
- Performance: Ensure your system can handle peak transaction volumes
- Latency: Minimize processing time to detect fraud quickly
- False Positives: Continuously tune your system to reduce false alarms
- Adaptability: Implement mechanisms to update rules and models as fraud patterns evolve
- Compliance: Ensure your system meets regulatory requirements for financial monitoring
- Security: Protect sensitive transaction data with appropriate encryption and access controls
Conclusion
KSML provides a powerful platform for building sophisticated fraud detection systems that can process high volumes of transactions in real-time. By combining multiple detection techniques, including pattern recognition, anomaly detection, and machine learning, you can create a robust system that effectively identifies fraudulent activities while minimizing false positives.
For more advanced fraud detection scenarios, explore:
- Complex Event Processing for detecting multi-stage fraud patterns
- External Service Integration for incorporating third-party risk scores
- KSML Definition Reference for a full explanation of the KSML definition syntax