Skip to content

Editing Python Code

KSML allows you to write custom logic in Python within your pipeline definitions. This guide explains the different approaches for organizing and editing Python code, from inline to fully externalized modules.

Python Code Options

1. Inline Code

The simplest approach is to write Python code directly in your YAML file:

Inline code example (click to expand)
# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard
# other messages after logging.

streams:
  sensor_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: avro:SensorData
  sensor_filtered:
    topic: ksml_sensordata_filtered
    keyType: string
    valueType: avro:SensorData

functions:
  sensor_is_blue:
    type: predicate
    code: |
      if value == None:
        log.warn("No value in message with key={}", key)
        return False
      if value["color"] != "blue":
        log.warn("Unknown color: {}", value["color"])
        return False
    expression: True

pipelines:
  filter_pipeline:
    from: sensor_source
    via:
      - type: filter
        if: sensor_is_blue
      - type: peek
        forEach:
          code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
    to: sensor_filtered

Use when: Logic is short and specific to one function.

2. Global Code

Define reusable functions in a globalCode block that can be referenced by multiple functions:

Global code example (click to expand)
# This example shows how to filter messages from a simple stream using a function defined in globalCode.
# Here we only let "blue sensors" pass and discard other messages after logging.

streams:
  sensor_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: avro:SensorData
  sensor_filtered:
    topic: ksml_sensordata_filtered
    keyType: string
    valueType: avro:SensorData

functions:
  sensor_is_blue:
    type: predicate
    globalCode: |
      def check_sensor_is_blue(value, key):
        if value == None:
          log.warn("No value in message with key={}", key)
          return False
        if value["color"] != "blue":
          log.warn("Unknown color: {}", value["color"])
          return False
        return True
    expression: check_sensor_is_blue(value, key)

pipelines:
  filter_pipeline:
    from: sensor_source
    via:
      - type: filter
        if: sensor_is_blue
      - type: peek
        forEach:
          code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
    to: sensor_filtered

Use when: Multiple functions share common logic or utilities.

A note on globalCode vs code

Be aware that code in a globalCode block will be evaluated once, whereas code in a code block will be executed for every message. This means that for example import statements should not be in the code block, as doing so will carry a significant runtime overhead.
One time initializations and import statements should always be in globalCode.

3. Python Module Imports

For advanced scenarios, import Python modules using globalCode:

Python module import example (click to expand)

YAML definition:

# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard
# other messages after logging.

streams:
  sensor_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: avro:SensorData
  sensor_filtered:
    topic: ksml_sensordata_filtered
    keyType: string
    valueType: avro:SensorData

functions:
  module_import:
    globalCode: from test_filter_module import is_blue

  sensor_is_blue:
    type: predicate
    code: |
      return is_blue(value)
    expression: True

pipelines:
  filter_pipeline:
    from: sensor_source
    via:
      - type: filter
        if: sensor_is_blue
      - type: peek
        forEach:
          code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
    to: sensor_filtered

Python module (test_filter_module.py):

def is_blue(val):
    """ Filter values in the stream with attribute 'color' having value 'blue'"""
    if val == None:
        return False
    if val["color"] != "blue":
        return False
    return True

def is_red(val):
    """ Filter values in the stream with attribute 'color' having value 'red'"""
    if val == None:
        return False
    if val["color"] != "red":
        return False
    return True

Use when: You have a library of reusable functions or need to organize complex logic into modules.

Note: When using module imports, configure the pythonModulePath parameter in runner configuration:

  # Python context configuration for module imports
  ksml:
    pythonContext:
      pythonModulePath: /ksml  # Directory where Python modules are located (mounted volume)

Passing KSML Runtime Objects to Modules

When using Python modules, you cannot directly access KSML runtime variables (log, metrics, stores) inside the module because they are injected into the function's local scope, not the module's global scope.

To use these objects in your module functions, pass them as arguments from the YAML function definition.

Passing the Logger

In your YAML, pass the log object to your module function:

functions:
  module_imports:
    globalCode: from my_module import calculate_score

  calculate_fraud_score:
    type: valueTransformer
    # Pass the KSML-provided 'log' object as an argument
    expression: calculate_score(value, log)
    resultType: json

In your Python module, accept the logger as a parameter:

from ksml import PythonLogger

def calculate_score(value, log: PythonLogger):
    """Calculate score using the KSML logger."""
    risk_score = value.get("risk_score", 0)
    log.info("Calculated risk score: {}", risk_score)
    return {"score": risk_score}

Passing State Stores

For functions that use state stores, pass the store as an argument:

functions:
  check_location:
    type: valueTransformer
    resultType: json
    code: |
      result = check_unusual_location(value, card_location_history)
    expression: result
    stores:
      - card_location_history

In your Python module:

from ksml import KeyValueStore

def check_unusual_location(value, store: KeyValueStore):
    """Check location using the state store."""
    card_id = value.get("card_id")
    history = store.get(card_id)
    # ... process and update store
    store.put(card_id, updated_history)
    return result

Passing Metrics

Similarly, pass the metrics object when you need custom metrics:

functions:
  process_with_metrics:
    type: valueTransformer
    expression: process_value(value, metrics)
    resultType: json
from ksml import MetricsBridge

def process_value(value, metrics: MetricsBridge):
    """Process value and record metrics."""
    counter = metrics.counter("processed_records")
    counter.increment()
    return value

Editor Support with the ksml Module

KSML provides a ksml.py module that enables code completion and type checking in your IDE while also providing the actual Java types at runtime.

Setting Up Code Completion

Place the ksml.py module in your Python module directory (the same directory configured as pythonModulePath). The module works in two modes:

  • At edit time: Provides Protocol classes for IDE code completion and type checking
  • At runtime: Imports the actual Java types via GraalVM

Simply import the types you need in your module:

from ksml import PythonLogger, KeyValueStore, MetricsBridge

def my_function(value, log: PythonLogger, store: KeyValueStore):
    log.info("Processing value: {}", value)
    # Your IDE will provide full code completion for log and store

Available Runtime Objects

The ksml.py module provides type definitions for:

Logger (PythonLogger)

  • log.trace(), log.debug(), log.info(), log.warn(), log.error()
  • Level checks: isTraceEnabled(), isDebugEnabled(), etc.

Metrics (MetricsBridge)

  • metrics.counter(name) - Create/get a counter metric
  • metrics.meter(name) - Create/get a meter metric
  • metrics.timer(name) - Create/get a timer metric

State Stores

Three store types are available:

  • KeyValueStore - Simple key-value operations: get(), put(), delete(), range(), all()
  • SessionStore - Session-windowed store: fetch(), findSessions(), put(), remove()
  • WindowStore - Time-windowed store: fetch(), fetchAll(), put(), all()
Complete ksml.py module (click to expand)
from __future__ import annotations
from typing import Dict, Any, Optional, Union, List, Mapping, Protocol, Iterator, TYPE_CHECKING

if TYPE_CHECKING:
    class PythonLogger(Protocol):
        """Logger with SLF4J-like interface (maps to io.axual.ksml.proxy.log.LoggerBridge.PythonLogger)"""

        def getName(self) -> str: ...

        # Level checks
        def isTraceEnabled(self) -> bool: ...
        def isDebugEnabled(self) -> bool: ...
        def isInfoEnabled(self) -> bool: ...
        def isWarnEnabled(self) -> bool: ...
        def isErrorEnabled(self) -> bool: ...

        # Trace logging
        def trace(self, s: str, *args: Any) -> None: ...

        # Debug logging
        def debug(self, s: str, *args: Any) -> None: ...

        # Info logging
        def info(self, s: str, *args: Any) -> None: ...

        # Warn logging
        def warn(self, s: str, *args: Any) -> None: ...

        # Error logging
        def error(self, s: str, *args: Any) -> None: ...

    class CounterBridge(Protocol):
        """KSML Counter metric"""

        def increment(self) -> None: ...
        def increment(self, delta: int) -> None: ...

    class MeterBridge(Protocol):
        """KSML Meter metric"""

        def mark(self) -> None: ...
        def mark(self, nrOfEvents: int) -> None: ...

    class TimerBridge(Protocol):
        """KSML Timer metric"""

        def updateSeconds(self, valueSeconds: int) -> None: ...
        def updateMillis(self, valueMilliseconds: int) -> None: ...
        def updateNanos(self, valueNanoseconds: int) -> None: ...

    class MetricsBridge(Protocol):
        """Metrics bridge for creating custom metrics (maps to io.axual.ksml.proxy.metric.MetricsBridge)"""

        def counter(self, name: str) -> CounterBridge: ...
        def counter(self, name: str, tags: Mapping[str, str]) -> CounterBridge: ...

        def meter(self, name: str) -> MeterBridge: ...
        def meter(self, name: str, tags: Mapping[str, str]) -> MeterBridge: ...

        def timer(self, name: str) -> TimerBridge: ...
        def timer(self, name: str, tags: Mapping[str, str]) -> TimerBridge: ...

    class StateStore(Protocol):
        """Base interface for state stores"""

        def name(self) -> str: ...
        def persistent(self) -> bool: ...
        def isOpen(self) -> bool: ...

    class KeyValueStore(StateStore, Protocol):
        """Key-Value store interface"""

        def get(self, key: Any) -> Optional[Any]: ...
        def put(self, key: Any, value: Any) -> None: ...
        def putIfAbsent(self, key: Any, value: Any) -> Optional[Any]: ...
        def delete(self, key: Any) -> Optional[Any]: ...
        def range(self, from_key: Any, to_key: Any) -> Iterator[Any]: ...
        def all(self) -> Iterator[Any]: ...
        def approximateNumEntries(self) -> int: ...

    class SessionStore(StateStore, Protocol):
        """Session store interface"""

        def put(self, key: Any, value: Any) -> None: ...
        def remove(self, key: Any) -> None: ...
        def fetch(self, key: Any) -> Iterator[Any]: ...
        def fetchSession(self, key: Any, sessionStartTime: int, sessionEndTime: int) -> Any: ...
        def backwardFetch(self, key: Any) -> Iterator[Any]: ...
        def findSessions(self, key: Any, earliestSessionEndTime: int, latestSessionStartTime: int) -> Iterator[Any]: ...
        def backwardFindSessions(self, key: Any, earliestSessionEndTime: int, latestSessionStartTime: int) -> Iterator[Any]: ...

    class WindowStore(StateStore, Protocol):
        """Window store interface"""

        def put(self, key: Any, value: Any, windowStartTimestamp: int) -> None: ...
        def fetch(self, key: Any, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
        def fetch(self, key: Any, timeFrom: Any, timeTo: Any) -> Iterator[Any]: ...
        def backwardFetch(self, key: Any, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
        def fetchAll(self, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
        def backwardFetchAll(self, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
        def all(self) -> Iterator[Any]: ...
        def backwardAll(self) -> Iterator[Any]: ...

    class TimestampedKeyValueStore(StateStore, Protocol):
        """Timestamped Key-Value store interface"""

        def get(self, key: Any) -> Optional[ValueAndTimestamp]: ...
        def put(self, key: Any, value: ValueAndTimestamp) -> None: ...
        def put(self, key: Any, value: Any, timestamp: int) -> None: ...
        def putIfAbsent(self, key: Any, value: ValueAndTimestamp) -> Optional[ValueAndTimestamp]: ...
        def putIfAbsent(self, key: Any, value: Any, timestamp: int) -> Optional[ValueAndTimestamp]: ...
        def delete(self, key: Any) -> Optional[ValueAndTimestamp]: ...
        def range(self, from_key: Any, to_key: Any) -> Iterator[Any]: ...
        def all(self) -> Iterator[Any]: ...
        def approximateNumEntries(self) -> int: ...

    class TimestampedWindowStore(StateStore, Protocol):
        """Timestamped Window store interface"""

        def put(self, key: Any, value: Any, windowStartTimestamp: int, timestamp: int) -> None: ...
        def fetch(self, key: Any, time: int) -> Optional[Dict[str, Any]]: ...
        def fetch(self, key: Any, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
        def backwardFetch(self, key: Any, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
        def fetchAll(self, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
        def backwardFetchAll(self, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
        def all(self) -> Iterator[Any]: ...
        def backwardAll(self) -> Iterator[Any]: ...

    class VersionedKeyValueStore(StateStore, Protocol):
        """Versioned Key-Value store interface"""

        def get(self, key: Any) -> Optional[Dict[str, Any]]: ...
        def get(self, key: Any, asOfTimestamp: int) -> Optional[Dict[str, Any]]: ...
        def put(self, key: Any, value: Any, timestamp: int) -> int: ...
        def delete(self, key: Any, timestamp: int) -> Optional[Dict[str, Any]]: ...

    # KSML runtime injected variables
    log: PythonLogger
    metrics: MetricsBridge
    stores: Dict[str, Union[StateStore, KeyValueStore, SessionStore, WindowStore, TimestampedKeyValueStore, TimestampedWindowStore, VersionedKeyValueStore, Any]]
else:
    # at runtime, make sure that the type hints do not cause any issues
    import java
    PythonLogger = java.type('io.axual.ksml.proxy.log.LoggerBridge$PythonLogger')
    MetricsBridge = java.type('io.axual.ksml.proxy.metric.MetricsBridge')
    KeyValueStore = java.type('io.axual.ksml.proxy.store.KeyValueStoreProxy')
    SessionStore = java.type('io.axual.ksml.proxy.store.SessionStoreProxy')
    WindowStore = java.type('io.axual.ksml.proxy.store.WindowStoreProxy')
    TimestampedKeyValueStore = java.type('io.axual.ksml.proxy.store.TimestampedKeyValueStoreProxy')
    TimestampedWindowStore = java.type('io.axual.ksml.proxy.store.TimestampedWindowStoreProxy')
    VersionedKeyValueStore = java.type('io.axual.ksml.proxy.store.VersionedKeyValueStoreProxy')

Complete Example: Fraud Detection

For a comprehensive example showing all these patterns together, see the Fraud Detection use case:

  • fraud-detection-python-module.yaml - YAML pipeline definition
  • fraud_detection_module.py - Python module with type-hinted functions
  • ksml.py - Type module for IDE support and runtime Java type imports

This example demonstrates:

  • Importing functions from a Python module
  • Passing the log object to module functions
  • Passing KeyValueStore state stores to module functions
  • Using type hints for full IDE support

Best Practices

  1. Start inline, move to modules as complexity grows
  2. Use globalCode for shared utilities and helper functions
  3. Import modules when you need a library of reusable functions
  4. Pass runtime objects as arguments to module functions (log, metrics, stores)
  5. Use the ksml.py module for better IDE support and fewer runtime errors
  6. Keep functions pure - avoid side effects outside of logging and metrics
  7. Handle None values explicitly to prevent runtime errors

Example File Organization

my-project/
├── pipelines/
│   ├── my-pipeline.yaml           # Main pipeline definition
│   └── modules/
│       ├── data_validation.py     # Importable Python modules
│       └── ksml.py                # Type module for IDE support and runtime
└── config/
    └── ksml-runner.yaml           # Runner config with pythonModulePath

Summary

This guide covered approaches to organizing Python code in KSML:

  1. Inline code - Quick and simple for short logic
  2. Global code - Share utilities across multiple functions
  3. Module imports - Full Python module system for libraries

Key points for module imports:

  • Pass log, metrics, and state stores as function arguments
  • Use the ksml.py module for IDE support and runtime type imports
  • Configure pythonModulePath in runner configuration

See the Fraud Detection use case for a complete working example with all patterns demonstrated.