Logging and Monitoring in KSML
Learn how to implement effective logging, monitoring, and error handling in your KSML stream processing pipelines using the built-in log
object and peek operations.
Prerequisites
Before we begin:
- Please make sure there is a running Docker Compose KSML environment as described in the Quick Start.
- We recommend to have completed the KSML Basics Tutorial
- Add the following topics to your
kafka-setup
service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
# Logging and Monitoring Tutorial
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_logging_input && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_logging_output && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_monitoring_output && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_error_handled_output && \
Basic Logging with Different Levels
KSML allows you to log messages at different levels using the log
object in Python functions.
This producer generates log messages with various importance levels and components:
Producer definition for logging messages (click to expand)
# Producer for logging and monitoring tutorial
# Generates simple messages with different importance levels for logging demonstration
functions:
generate_log_message:
type: generator
globalCode: |
import random
import time
log_counter = 0
code: |
global log_counter
levels = ["INFO", "WARN", "ERROR", "DEBUG"]
messages = [
"System startup completed",
"Database connection established",
"User authentication failed",
"Cache refresh initiated",
"Memory usage high",
"Backup process completed"
]
key = "log-" + str(log_counter)
log_counter += 1
value = {
"level": random.choice(levels),
"message": random.choice(messages),
"timestamp": time.time(),
"importance": random.randint(1, 10),
"component": random.choice(["auth", "db", "cache", "backup"])
}
expression: (key, value)
resultType: (string, json)
producers:
log_message_producer:
generator: generate_log_message
interval: 2s
to:
topic: ksml_logging_input
keyType: string
valueType: json
KSML Features Demonstrated:
log
object: Built-in logger available in all Python functions- Log levels:
log.error()
,log.warn()
,log.info()
,log.debug()
peek
operation: Non-intrusive message inspection without modificationforEach
function type: Process messages without transforming them
This processor demonstrates logging at different levels using the log
object:
Processor definition with multi-level logging (click to expand)
# Processor for logging and monitoring tutorial
# Demonstrates basic logging levels using the log object
streams:
log_input:
topic: ksml_logging_input
keyType: string
valueType: json
log_output:
topic: ksml_logging_output
keyType: string
valueType: json
functions:
log_message:
type: forEach
code: |
# Test TRACE logging
log.trace("TRACE: Processing message with key={}", key)
level = value.get("level", "INFO")
message = value.get("message", "")
component = value.get("component", "unknown")
# Test DEBUG logging
log.debug("DEBUG: Message details - level={}, component={}", level, component)
if level == "ERROR":
log.error("[{}] {}", component, message)
elif level == "WARN":
log.warn("[{}] {}", component, message)
elif level == "DEBUG":
log.debug("[{}] {}", component, message)
else:
log.info("[{}] {}", component, message)
pipelines:
logging_pipeline:
from: log_input
via:
- type: peek
forEach:
code: log_message(key, value)
to: log_output
Monitoring with Peek Operations
The peek
operation allows non-intrusive monitoring of message flow without modifying the data.
KSML Features Demonstrated:
peek
operation: Inspect messages in the pipeline without modifying them- Global variables: Using
globals()
to maintain state across function calls - Conditional logging: Log only when specific conditions are met
- Message counting: Track processed messages across invocations
This processor shows message counting and error detection using peek operations:
Processor definition for monitoring operations (click to expand)
# Monitoring processor with message counting and conditional logging
# Demonstrates peek operations for monitoring without data modification
streams:
monitor_input:
topic: ksml_logging_input
keyType: string
valueType: json
monitor_output:
topic: ksml_monitoring_output
keyType: string
valueType: json
functions:
monitor_messages:
type: forEach
code: |
# Count messages
global message_count
if 'message_count' not in globals():
message_count = 0
message_count += 1
# Conditional logging - only log ERROR level messages
if value.get("level") == "ERROR":
log.error("Error detected: {}", value.get("message"))
# Log high importance messages as warnings
if value.get("importance", 0) > 8:
log.warn("High importance message ({}): {}",
value.get("importance"), value.get("message"))
# Log every 10th message
if message_count % 10 == 0:
log.info("Processed {} messages", message_count)
pipelines:
monitoring_pipeline:
from: monitor_input
via:
- type: peek
forEach:
code: monitor_messages(key, value)
to: monitor_output
Error Handling with Logging
Use try-catch blocks in Python functions to handle errors gracefully and log them appropriately.
KSML Features Demonstrated:
transformValue
operation: Transform message values with error handlingvalueTransformer
function type: Returns transformed values- Try-except blocks: Safe processing with error catching
- Structured logging: Format logs with timestamps and component info
time.strftime()
: Format timestamps for readable logs
This processor demonstrates basic error handling with logging:
Processor definition with error handling (click to expand)
# Error handling processor with structured logging
# Shows try-catch pattern and structured log messages
streams:
error_input:
topic: ksml_logging_input
keyType: string
valueType: json
error_output:
topic: ksml_error_handled_output
keyType: string
valueType: json
functions:
safe_process:
type: valueTransformer
code: |
import time
try:
result = dict(value) if value else {}
result["processed"] = True
# Structured logging with timestamp and component info
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log.info("[{}] Component: {} | Message: {}",
timestamp, value.get("component"), value.get("message"))
except Exception as e:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log.error("[{}] Failed to process: {}", timestamp, str(e))
result = {"error": str(e), "processed": False}
expression: result
resultType: json
pipelines:
error_handling_pipeline:
from: error_input
via:
- type: transformValue
mapper: safe_process
to: error_output
Configuring Log Levels
KSML provides standard logging levels through the log
object available in Python functions:
- ERROR: Critical errors requiring attention
- WARN: Potential issues or unusual conditions
- INFO: Normal operational events
- DEBUG: Detailed troubleshooting information
- TRACE: Very detailed debugging output
By default, KSML shows INFO, WARN, and ERROR logs. You can enable DEBUG and TRACE logging without rebuilding the image.
Enabling All Log Levels (Including DEBUG and TRACE)
- Create a custom logback configuration file
logback-trace.xml
in yourexamples
directory:
Custom logback configuration for TRACE logging (click to expand)
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="false">
<!-- Define appenders directly without includes -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %-36logger{36} %msg%n</pattern>
</encoder>
</appender>
<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
<encoder>
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %-36logger{36} %msg%n</pattern>
</encoder>
</appender>
<!-- Root logger set to TRACE to allow all logs -->
<root level="TRACE">
<appender-ref ref="STDOUT"/>
<appender-ref ref="STDERR"/>
</root>
<!-- Reduce noise from Kafka and other libraries -->
<logger name="org.apache.kafka" level="WARN"/>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR"/>
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="ERROR"/>
<logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="ERROR"/>
<logger name="org.apache.kafka.streams.StreamsConfig" level="ERROR"/>
<logger name="io.confluent" level="WARN"/>
<!-- Enable TRACE logging for KSML framework -->
<logger name="io.axual.ksml" level="TRACE"/>
<!-- Enable TRACE for all user functions in logging-processor.yaml -->
<!-- KSML automatically takes the first letter of your YAML filename as the namespace abbreviation to keep logger names short and manageable.
For example: logging-producer.yaml means that the namespace is "l" -->
<logger name="l" level="TRACE"/> <!-- Enables TRACE for ALL functions in logging-processor.yaml -->
</configuration>
- Update your
docker-compose.yml
to use the custom configuration:
- Restart the containers to see all log levels including DEBUG and TRACE.
- To test, add for example
log.trace("TRACE: Processing message with key={}", key)
into your processing definition.
- To test, add for example
Best Practices
- Use appropriate log levels:
- ERROR for failures, WARN for issues, INFO for events, DEBUG for details
- Include context:
- Add message keys, component names, and relevant metadata
- Avoid excessive logging:
- Use sampling or conditional logging for high-volume streams
- Structure messages:
- Use consistent formats with key-value pairs
- Monitor performance:
- Track throughput, processing time, and error rates
Conclusion
Effective logging and monitoring enable you to track pipeline behavior, diagnose issues quickly, and maintain reliable KSML applications. Use the log
object for different severity levels and peek
operations for non-intrusive monitoring.
Next Steps
- Error Handling and Recovery for advanced error handling techniques
- Performance Optimization for optimizing pipeline performance
- Intermediate Tutorials for more advanced KSML features