Skip to content

Logging and Monitoring in KSML

Learn how to implement effective logging, monitoring, and error handling in your KSML stream processing pipelines using the built-in log object and peek operations.

Prerequisites

Before we begin:

  • Please make sure there is a running Docker Compose KSML environment as described in the Quick Start.
  • We recommend to have completed the KSML Basics Tutorial
  • Add the following topics to your kafka-setup service in docker-compose.yml to run the examples:
Topic creation commands - click to expand
# Logging and Monitoring Tutorial
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_logging_input && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_logging_output && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_monitoring_output && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_error_handled_output && \

Basic Logging with Different Levels

KSML allows you to log messages at different levels using the log object in Python functions.

This producer generates log messages with various importance levels and components:

Producer definition for logging messages (click to expand)
# Producer for logging and monitoring tutorial
# Generates simple messages with different importance levels for logging demonstration

functions:
  generate_log_message:
    type: generator
    globalCode: |
      import random
      import time
      log_counter = 0
    code: |
      global log_counter

      levels = ["INFO", "WARN", "ERROR", "DEBUG"]
      messages = [
        "System startup completed",
        "Database connection established", 
        "User authentication failed",
        "Cache refresh initiated",
        "Memory usage high",
        "Backup process completed"
      ]

      key = "log-" + str(log_counter)
      log_counter += 1

      value = {
        "level": random.choice(levels),
        "message": random.choice(messages),
        "timestamp": time.time(),
        "importance": random.randint(1, 10),
        "component": random.choice(["auth", "db", "cache", "backup"])
      }
    expression: (key, value)
    resultType: (string, json)

producers:
  log_message_producer:
    generator: generate_log_message
    interval: 2s
    to:
      topic: ksml_logging_input
      keyType: string
      valueType: json

KSML Features Demonstrated:

  • log object: Built-in logger available in all Python functions
  • Log levels: log.error(), log.warn(), log.info(), log.debug()
  • peek operation: Non-intrusive message inspection without modification
  • forEach function type: Process messages without transforming them

This processor demonstrates logging at different levels using the log object:

Processor definition with multi-level logging (click to expand)
# Processor for logging and monitoring tutorial  
# Demonstrates basic logging levels using the log object

streams:
  log_input:
    topic: ksml_logging_input
    keyType: string
    valueType: json
  log_output:
    topic: ksml_logging_output
    keyType: string
    valueType: json

functions:
  log_message:
    type: forEach
    code: |
      # Test TRACE logging
      log.trace("TRACE: Processing message with key={}", key)

      level = value.get("level", "INFO")
      message = value.get("message", "")
      component = value.get("component", "unknown")

      # Test DEBUG logging
      log.debug("DEBUG: Message details - level={}, component={}", level, component)

      if level == "ERROR":
        log.error("[{}] {}", component, message)
      elif level == "WARN":  
        log.warn("[{}] {}", component, message)
      elif level == "DEBUG":
        log.debug("[{}] {}", component, message) 
      else:
        log.info("[{}] {}", component, message)

pipelines:
  logging_pipeline:
    from: log_input
    via:
      - type: peek
        forEach:
          code: log_message(key, value)
    to: log_output

Monitoring with Peek Operations

The peek operation allows non-intrusive monitoring of message flow without modifying the data.

KSML Features Demonstrated:

  • peek operation: Inspect messages in the pipeline without modifying them
  • Global variables: Using globals() to maintain state across function calls
  • Conditional logging: Log only when specific conditions are met
  • Message counting: Track processed messages across invocations

This processor shows message counting and error detection using peek operations:

Processor definition for monitoring operations (click to expand)
# Monitoring processor with message counting and conditional logging
# Demonstrates peek operations for monitoring without data modification

streams:
  monitor_input:
    topic: ksml_logging_input  
    keyType: string
    valueType: json
  monitor_output:
    topic: ksml_monitoring_output
    keyType: string
    valueType: json

functions:
  monitor_messages:
    type: forEach
    code: |
      # Count messages
      global message_count
      if 'message_count' not in globals():
        message_count = 0
      message_count += 1

      # Conditional logging - only log ERROR level messages
      if value.get("level") == "ERROR":
        log.error("Error detected: {}", value.get("message"))

      # Log high importance messages as warnings
      if value.get("importance", 0) > 8:
        log.warn("High importance message ({}): {}", 
                value.get("importance"), value.get("message"))

      # Log every 10th message  
      if message_count % 10 == 0:
        log.info("Processed {} messages", message_count)

pipelines:
  monitoring_pipeline:
    from: monitor_input
    via:
      - type: peek
        forEach:
          code: monitor_messages(key, value)
    to: monitor_output

Error Handling with Logging

Use try-catch blocks in Python functions to handle errors gracefully and log them appropriately.

KSML Features Demonstrated:

  • transformValue operation: Transform message values with error handling
  • valueTransformer function type: Returns transformed values
  • Try-except blocks: Safe processing with error catching
  • Structured logging: Format logs with timestamps and component info
  • time.strftime(): Format timestamps for readable logs

This processor demonstrates basic error handling with logging:

Processor definition with error handling (click to expand)
# Error handling processor with structured logging
# Shows try-catch pattern and structured log messages

streams:
  error_input:
    topic: ksml_logging_input
    keyType: string  
    valueType: json
  error_output:
    topic: ksml_error_handled_output
    keyType: string
    valueType: json

functions:
  safe_process:
    type: valueTransformer
    code: |
      import time
      try:
        result = dict(value) if value else {}
        result["processed"] = True

        # Structured logging with timestamp and component info
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        log.info("[{}] Component: {} | Message: {}", 
                timestamp, value.get("component"), value.get("message"))

      except Exception as e:
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        log.error("[{}] Failed to process: {}", timestamp, str(e))
        result = {"error": str(e), "processed": False}
    expression: result
    resultType: json

pipelines:
  error_handling_pipeline:
    from: error_input
    via:
      - type: transformValue
        mapper: safe_process
    to: error_output

Configuring Log Levels

KSML provides standard logging levels through the log object available in Python functions:

  • ERROR: Critical errors requiring attention
  • WARN: Potential issues or unusual conditions
  • INFO: Normal operational events
  • DEBUG: Detailed troubleshooting information
  • TRACE: Very detailed debugging output

By default, KSML shows INFO, WARN, and ERROR logs. You can enable DEBUG and TRACE logging without rebuilding the image.

Enabling All Log Levels (Including DEBUG and TRACE)

  • Create a custom logback configuration file logback-trace.xml in your examples directory:
Custom logback configuration for TRACE logging (click to expand)
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="false">
    <!-- Define appenders directly without includes -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <target>System.out</target>
        <encoder>
            <pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %-36logger{36} %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
        <target>System.err</target>
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>WARN</level>
        </filter>
        <encoder>
            <pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %-36logger{36} %msg%n</pattern>
        </encoder>
    </appender>

    <!-- Root logger set to TRACE to allow all logs -->
    <root level="TRACE">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="STDERR"/>
    </root>

    <!-- Reduce noise from Kafka and other libraries -->
    <logger name="org.apache.kafka" level="WARN"/>
    <logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR"/>
    <logger name="org.apache.kafka.clients.producer.ProducerConfig" level="ERROR"/>
    <logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="ERROR"/>
    <logger name="org.apache.kafka.streams.StreamsConfig" level="ERROR"/>
    <logger name="io.confluent" level="WARN"/>

    <!-- Enable TRACE logging for KSML framework -->
    <logger name="io.axual.ksml" level="TRACE"/>

    <!-- Enable TRACE for all user functions in logging-processor.yaml -->
    <!-- KSML automatically takes the first letter of your YAML filename as the namespace abbreviation to keep logger names short and manageable.
    For example: logging-producer.yaml means that the namespace is "l" -->
    <logger name="l" level="TRACE"/>  <!-- Enables TRACE for ALL functions in logging-processor.yaml -->
</configuration>
  • Update your docker-compose.yml to use the custom configuration:
ksml:
  environment:
    - LOGBACK_CONFIGURATION_FILE=/ksml/logback-trace.xml
  • Restart the containers to see all log levels including DEBUG and TRACE.
    • To test, add for example log.trace("TRACE: Processing message with key={}", key) into your processing definition.

Best Practices

  1. Use appropriate log levels:
    • ERROR for failures, WARN for issues, INFO for events, DEBUG for details
  2. Include context:
    • Add message keys, component names, and relevant metadata
  3. Avoid excessive logging:
    • Use sampling or conditional logging for high-volume streams
  4. Structure messages:
    • Use consistent formats with key-value pairs
  5. Monitor performance:
    • Track throughput, processing time, and error rates

Conclusion

Effective logging and monitoring enable you to track pipeline behavior, diagnose issues quickly, and maintain reliable KSML applications. Use the log object for different severity levels and peek operations for non-intrusive monitoring.

Next Steps