Editing Python Code
KSML allows you to write custom logic in Python within your pipeline definitions. This guide explains the different approaches for organizing and editing Python code, from inline to fully externalized modules.
Python Code Options
1. Inline Code
The simplest approach is to write Python code directly in your YAML file:
Inline code example (click to expand)
# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard
# other messages after logging.
streams:
sensor_source:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
sensor_filtered:
topic: ksml_sensordata_filtered
keyType: string
valueType: avro:SensorData
functions:
sensor_is_blue:
type: predicate
code: |
if value == None:
log.warn("No value in message with key={}", key)
return False
if value["color"] != "blue":
log.warn("Unknown color: {}", value["color"])
return False
expression: True
pipelines:
filter_pipeline:
from: sensor_source
via:
- type: filter
if: sensor_is_blue
- type: peek
forEach:
code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
to: sensor_filtered
Use when: Logic is short and specific to one function.
2. Global Code
Define reusable functions in a globalCode block that can be referenced by multiple functions:
Global code example (click to expand)
# This example shows how to filter messages from a simple stream using a function defined in globalCode.
# Here we only let "blue sensors" pass and discard other messages after logging.
streams:
sensor_source:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
sensor_filtered:
topic: ksml_sensordata_filtered
keyType: string
valueType: avro:SensorData
functions:
sensor_is_blue:
type: predicate
globalCode: |
def check_sensor_is_blue(value, key):
if value == None:
log.warn("No value in message with key={}", key)
return False
if value["color"] != "blue":
log.warn("Unknown color: {}", value["color"])
return False
return True
expression: check_sensor_is_blue(value, key)
pipelines:
filter_pipeline:
from: sensor_source
via:
- type: filter
if: sensor_is_blue
- type: peek
forEach:
code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
to: sensor_filtered
Use when: Multiple functions share common logic or utilities.
A note on globalCode vs code
Be aware that code in a globalCode block will be evaluated once, whereas code in a code block will be executed for every message.
This means that for example import statements should not be in the code block, as doing so will carry a significant runtime overhead.
One time initializations and import statements should always be in globalCode.
3. Python Module Imports
For advanced scenarios, import Python modules using globalCode:
Python module import example (click to expand)
YAML definition:
# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard
# other messages after logging.
streams:
sensor_source:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
sensor_filtered:
topic: ksml_sensordata_filtered
keyType: string
valueType: avro:SensorData
functions:
module_import:
globalCode: from test_filter_module import is_blue
sensor_is_blue:
type: predicate
code: |
return is_blue(value)
expression: True
pipelines:
filter_pipeline:
from: sensor_source
via:
- type: filter
if: sensor_is_blue
- type: peek
forEach:
code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
to: sensor_filtered
Python module (test_filter_module.py):
def is_blue(val):
""" Filter values in the stream with attribute 'color' having value 'blue'"""
if val == None:
return False
if val["color"] != "blue":
return False
return True
def is_red(val):
""" Filter values in the stream with attribute 'color' having value 'red'"""
if val == None:
return False
if val["color"] != "red":
return False
return True
Use when: You have a library of reusable functions or need to organize complex logic into modules.
Note: When using module imports, configure the pythonModulePath parameter in runner configuration:
# Python context configuration for module imports
ksml:
pythonContext:
pythonModulePath: /ksml # Directory where Python modules are located (mounted volume)
Passing KSML Runtime Objects to Modules
When using Python modules, you cannot directly access KSML runtime variables (log, metrics, stores) inside the module because they are injected into the function's local scope, not the module's global scope.
To use these objects in your module functions, pass them as arguments from the YAML function definition.
Passing the Logger
In your YAML, pass the log object to your module function:
functions:
module_imports:
globalCode: from my_module import calculate_score
calculate_fraud_score:
type: valueTransformer
# Pass the KSML-provided 'log' object as an argument
expression: calculate_score(value, log)
resultType: json
In your Python module, accept the logger as a parameter:
from ksml import PythonLogger
def calculate_score(value, log: PythonLogger):
"""Calculate score using the KSML logger."""
risk_score = value.get("risk_score", 0)
log.info("Calculated risk score: {}", risk_score)
return {"score": risk_score}
Passing State Stores
For functions that use state stores, pass the store as an argument:
functions:
check_location:
type: valueTransformer
resultType: json
code: |
result = check_unusual_location(value, card_location_history)
expression: result
stores:
- card_location_history
In your Python module:
from ksml import KeyValueStore
def check_unusual_location(value, store: KeyValueStore):
"""Check location using the state store."""
card_id = value.get("card_id")
history = store.get(card_id)
# ... process and update store
store.put(card_id, updated_history)
return result
Passing Metrics
Similarly, pass the metrics object when you need custom metrics:
functions:
process_with_metrics:
type: valueTransformer
expression: process_value(value, metrics)
resultType: json
from ksml import MetricsBridge
def process_value(value, metrics: MetricsBridge):
"""Process value and record metrics."""
counter = metrics.counter("processed_records")
counter.increment()
return value
Editor Support with the ksml Module
KSML provides a ksml.py module that enables code completion and type checking in your IDE while also providing the actual Java types at runtime.
Setting Up Code Completion
Place the ksml.py module in your Python module directory (the same directory configured as pythonModulePath). The module works in two modes:
- At edit time: Provides Protocol classes for IDE code completion and type checking
- At runtime: Imports the actual Java types via GraalVM
Simply import the types you need in your module:
from ksml import PythonLogger, KeyValueStore, MetricsBridge
def my_function(value, log: PythonLogger, store: KeyValueStore):
log.info("Processing value: {}", value)
# Your IDE will provide full code completion for log and store
Available Runtime Objects
The ksml.py module provides type definitions for:
Logger (PythonLogger)
log.trace(),log.debug(),log.info(),log.warn(),log.error()- Level checks:
isTraceEnabled(),isDebugEnabled(), etc.
Metrics (MetricsBridge)
metrics.counter(name)- Create/get a counter metricmetrics.meter(name)- Create/get a meter metricmetrics.timer(name)- Create/get a timer metric
State Stores
Three store types are available:
KeyValueStore- Simple key-value operations:get(),put(),delete(),range(),all()SessionStore- Session-windowed store:fetch(),findSessions(),put(),remove()WindowStore- Time-windowed store:fetch(),fetchAll(),put(),all()
Complete ksml.py module (click to expand)
from __future__ import annotations
from typing import Dict, Any, Optional, Union, List, Mapping, Protocol, Iterator, TYPE_CHECKING
if TYPE_CHECKING:
class PythonLogger(Protocol):
"""Logger with SLF4J-like interface (maps to io.axual.ksml.proxy.log.LoggerBridge.PythonLogger)"""
def getName(self) -> str: ...
# Level checks
def isTraceEnabled(self) -> bool: ...
def isDebugEnabled(self) -> bool: ...
def isInfoEnabled(self) -> bool: ...
def isWarnEnabled(self) -> bool: ...
def isErrorEnabled(self) -> bool: ...
# Trace logging
def trace(self, s: str, *args: Any) -> None: ...
# Debug logging
def debug(self, s: str, *args: Any) -> None: ...
# Info logging
def info(self, s: str, *args: Any) -> None: ...
# Warn logging
def warn(self, s: str, *args: Any) -> None: ...
# Error logging
def error(self, s: str, *args: Any) -> None: ...
class CounterBridge(Protocol):
"""KSML Counter metric"""
def increment(self) -> None: ...
def increment(self, delta: int) -> None: ...
class MeterBridge(Protocol):
"""KSML Meter metric"""
def mark(self) -> None: ...
def mark(self, nrOfEvents: int) -> None: ...
class TimerBridge(Protocol):
"""KSML Timer metric"""
def updateSeconds(self, valueSeconds: int) -> None: ...
def updateMillis(self, valueMilliseconds: int) -> None: ...
def updateNanos(self, valueNanoseconds: int) -> None: ...
class MetricsBridge(Protocol):
"""Metrics bridge for creating custom metrics (maps to io.axual.ksml.proxy.metric.MetricsBridge)"""
def counter(self, name: str) -> CounterBridge: ...
def counter(self, name: str, tags: Mapping[str, str]) -> CounterBridge: ...
def meter(self, name: str) -> MeterBridge: ...
def meter(self, name: str, tags: Mapping[str, str]) -> MeterBridge: ...
def timer(self, name: str) -> TimerBridge: ...
def timer(self, name: str, tags: Mapping[str, str]) -> TimerBridge: ...
class StateStore(Protocol):
"""Base interface for state stores"""
def name(self) -> str: ...
def persistent(self) -> bool: ...
def isOpen(self) -> bool: ...
class KeyValueStore(StateStore, Protocol):
"""Key-Value store interface"""
def get(self, key: Any) -> Optional[Any]: ...
def put(self, key: Any, value: Any) -> None: ...
def putIfAbsent(self, key: Any, value: Any) -> Optional[Any]: ...
def delete(self, key: Any) -> Optional[Any]: ...
def range(self, from_key: Any, to_key: Any) -> Iterator[Any]: ...
def all(self) -> Iterator[Any]: ...
def approximateNumEntries(self) -> int: ...
class SessionStore(StateStore, Protocol):
"""Session store interface"""
def put(self, key: Any, value: Any) -> None: ...
def remove(self, key: Any) -> None: ...
def fetch(self, key: Any) -> Iterator[Any]: ...
def fetchSession(self, key: Any, sessionStartTime: int, sessionEndTime: int) -> Any: ...
def backwardFetch(self, key: Any) -> Iterator[Any]: ...
def findSessions(self, key: Any, earliestSessionEndTime: int, latestSessionStartTime: int) -> Iterator[Any]: ...
def backwardFindSessions(self, key: Any, earliestSessionEndTime: int, latestSessionStartTime: int) -> Iterator[Any]: ...
class WindowStore(StateStore, Protocol):
"""Window store interface"""
def put(self, key: Any, value: Any, windowStartTimestamp: int) -> None: ...
def fetch(self, key: Any, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
def fetch(self, key: Any, timeFrom: Any, timeTo: Any) -> Iterator[Any]: ...
def backwardFetch(self, key: Any, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
def fetchAll(self, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
def backwardFetchAll(self, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
def all(self) -> Iterator[Any]: ...
def backwardAll(self) -> Iterator[Any]: ...
class TimestampedKeyValueStore(StateStore, Protocol):
"""Timestamped Key-Value store interface"""
def get(self, key: Any) -> Optional[ValueAndTimestamp]: ...
def put(self, key: Any, value: ValueAndTimestamp) -> None: ...
def put(self, key: Any, value: Any, timestamp: int) -> None: ...
def putIfAbsent(self, key: Any, value: ValueAndTimestamp) -> Optional[ValueAndTimestamp]: ...
def putIfAbsent(self, key: Any, value: Any, timestamp: int) -> Optional[ValueAndTimestamp]: ...
def delete(self, key: Any) -> Optional[ValueAndTimestamp]: ...
def range(self, from_key: Any, to_key: Any) -> Iterator[Any]: ...
def all(self) -> Iterator[Any]: ...
def approximateNumEntries(self) -> int: ...
class TimestampedWindowStore(StateStore, Protocol):
"""Timestamped Window store interface"""
def put(self, key: Any, value: Any, windowStartTimestamp: int, timestamp: int) -> None: ...
def fetch(self, key: Any, time: int) -> Optional[Dict[str, Any]]: ...
def fetch(self, key: Any, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
def backwardFetch(self, key: Any, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
def fetchAll(self, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
def backwardFetchAll(self, timeFrom: int, timeTo: int) -> Iterator[Any]: ...
def all(self) -> Iterator[Any]: ...
def backwardAll(self) -> Iterator[Any]: ...
class VersionedKeyValueStore(StateStore, Protocol):
"""Versioned Key-Value store interface"""
def get(self, key: Any) -> Optional[Dict[str, Any]]: ...
def get(self, key: Any, asOfTimestamp: int) -> Optional[Dict[str, Any]]: ...
def put(self, key: Any, value: Any, timestamp: int) -> int: ...
def delete(self, key: Any, timestamp: int) -> Optional[Dict[str, Any]]: ...
# KSML runtime injected variables
log: PythonLogger
metrics: MetricsBridge
stores: Dict[str, Union[StateStore, KeyValueStore, SessionStore, WindowStore, TimestampedKeyValueStore, TimestampedWindowStore, VersionedKeyValueStore, Any]]
else:
# at runtime, make sure that the type hints do not cause any issues
import java
PythonLogger = java.type('io.axual.ksml.proxy.log.LoggerBridge$PythonLogger')
MetricsBridge = java.type('io.axual.ksml.proxy.metric.MetricsBridge')
KeyValueStore = java.type('io.axual.ksml.proxy.store.KeyValueStoreProxy')
SessionStore = java.type('io.axual.ksml.proxy.store.SessionStoreProxy')
WindowStore = java.type('io.axual.ksml.proxy.store.WindowStoreProxy')
TimestampedKeyValueStore = java.type('io.axual.ksml.proxy.store.TimestampedKeyValueStoreProxy')
TimestampedWindowStore = java.type('io.axual.ksml.proxy.store.TimestampedWindowStoreProxy')
VersionedKeyValueStore = java.type('io.axual.ksml.proxy.store.VersionedKeyValueStoreProxy')
Complete Example: Fraud Detection
For a comprehensive example showing all these patterns together, see the Fraud Detection use case:
fraud-detection-python-module.yaml- YAML pipeline definitionfraud_detection_module.py- Python module with type-hinted functionsksml.py- Type module for IDE support and runtime Java type imports
This example demonstrates:
- Importing functions from a Python module
- Passing the
logobject to module functions - Passing
KeyValueStorestate stores to module functions - Using type hints for full IDE support
Best Practices
- Start inline, move to modules as complexity grows
- Use
globalCodefor shared utilities and helper functions - Import modules when you need a library of reusable functions
- Pass runtime objects as arguments to module functions (
log,metrics, stores) - Use the
ksml.pymodule for better IDE support and fewer runtime errors - Keep functions pure - avoid side effects outside of logging and metrics
- Handle None values explicitly to prevent runtime errors
Example File Organization
my-project/
├── pipelines/
│ ├── my-pipeline.yaml # Main pipeline definition
│ └── modules/
│ ├── data_validation.py # Importable Python modules
│ └── ksml.py # Type module for IDE support and runtime
└── config/
└── ksml-runner.yaml # Runner config with pythonModulePath
Summary
This guide covered approaches to organizing Python code in KSML:
- Inline code - Quick and simple for short logic
- Global code - Share utilities across multiple functions
- Module imports - Full Python module system for libraries
Key points for module imports:
- Pass
log,metrics, and state stores as function arguments - Use the
ksml.pymodule for IDE support and runtime type imports - Configure
pythonModulePathin runner configuration
See the Fraud Detection use case for a complete working example with all patterns demonstrated.