Skip to the content.

« Back to index

Functions

Table of Contents

Introduction

Functions can be specified in the functions section of a KSML definition file. The layout typically looks like this:

functions:
  my_first_predicate:
    type: predicate
    expression: key=='Some string'

  compare_params:
    type: generic
    parameters:
      - name: firstParam
        type: string
      - name: secondParam
        type: int
    globalCode: |
      import something from somepackage
      globalVar = 3
    code: |
      print('Hello there!')
    expression: firstParam == str(secondParam)

Functions are defined by the following tags:

Parameter Value Type Default Description  
type string generic The type of the function defined  
parameters List of parameter definitions empty list A list of parameters, each of which contains the mandatory fields name and type. See example above.  
globalCode string empty Snippet of Python code that is executed once upon creation of the Kafka Streams topology. This section can contain statements like import to import function libraries used in the code and expression sections.  
code string empty Python source code, which will be included in the called function. Bla
expression string empty Python expression that contains the returned function result. Typically the code "return expression" is generated for the Python interpreter. For example expression: key would generate the Python code return key for the function.

See below for the list of supported function types.

Data types in Python

Internally, KSML uses an abstraction to deal with all kinds of data types. See types for more information on data types.

Data type mapping

Data types are automatically converted to/from Python in the following manner:

Data type Python type Example
boolean bool True, False
bytes bytearray  
double float 3.145
float float 1.23456
byte int between -128 and 127
short int between -65,536 and 65,535
int int between -2,147,483,648 and 2,147,483,647
long int between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807
string str “text”
enum str enum string literal, eg. “BLUE”, “EUROPE”
list array [ “key1”, “key2” ]
struct dict { “key1”: “value1”, “key2”: “value2” }
struct with schema dict { “key1”: “value1”, “key2”: “value2”, “@type”: “SensorData”, “@schema”: “…” }
tuple tuple (1, “text”, 3.14, { “key”: “value” })
union Real value is translated as specified in this table

Automatic conversion

KSML is able to automatically convert between types. Examples are:

Function Types

Type Returns Parameter Value Type Description
aggregator any key any The key of the message
    value any The value of the message
    aggregatedValue any The aggregated value thus far.
forEach none key any The key of the message
    value any The value of the message
foreignKeyExtractor any value any The value to extract the foreign key from
initializer any none    
keyTransformer any key any The key of the message
    value any The value of the message
keyValuePrinter string key any The key of the message
    value any The value of the message
keyValueToKeyValueListTransformer [ (any, any) ] key any The key of the message
    value any The value of the message
keyValueToValueListTransformer [ any ] key any The key of the message
    value any The value of the message
keyValueTransformer (any, any) key any The key of the message
    value any The value of the message
merger any key any The key of the message
    value1 any The first value to be merged
    value2 any The second value to be merged
predicate boolean key any The key of the message
    value any The value of the message
reducer any value1 any The first value to be reduced
    value2 any The second value to be reduced
streamPartitioner int topic String The topic of the message
    key any The key of the message
    value any The value of the message
    numPartitions int The number of partitions on the topic
topicNameExtractor string key any The key of the message
    value any The value of the message
valueJoiner any key any The key of both messages
    value1 any The first value to join
    value2 any The second value to join
valueTransformer any key any The key of the message
    value any The value of the message

Function parameters

Besides the parameters mentioned above, all Python functions in KSML get special parameters passed in:

Logger

Every function can access the log variable, which is mapped to a plain Java Logger object. It can be used to send output to the KSML log by calling its methods. It supports the following operations:

Method Description
error(message: str, value_params...) logs an error message
warn(message: str, value_params...) logs a warning message
info(message: str, value_params...) logs an informational message
debug(message: str, value_params...) logs a debug message
trace(message: str, value_params...) logs a trace message

The message contains double curly brackets {}, which will be substituted by the value parameters. Examples are:

log.error("Something went completely bad here!")
log.info("Received message from topic: key={}, value={}", key, value)
log.debug("I'm printing five variables here: {}, {}, {}, {}, {}. Lovely isn't it?", 1, 2, 3, "text", {"json":"is cool"})

Output of the above statements looks like:

[LOG TIMESTAMP] ERROR function.name   Something went completely bad here!
[LOG TIMESTAMP] INFO  function.name   Received message from topic: key=123, value={"key":"value"}
[LOG TIMESTAMP] DEBUG function.name   I'm printing five variables here: 1, 2, 3, text, {"json":"is cool"}. Lovely isn't it?

State stores

Some functions are allowed to access local state stores. These functions specify the stores attribute in their definitions. The state stores they reference are accessible as variables with the same name as the state store.

Examples:

streams:
  sensor_source_avro:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: avro:SensorData

stores:
  last_sensor_data_store:
    type: keyValue
    keyType: string
    valueType: json
    persistent: false
    historyRetention: 1h
    caching: false
    logging: false

functions:
  process_message:
    type: forEach
    code: |
      last_value = last_sensor_data_store.get(key)
      if last_value != None:
        log.info("Found last value: {} = {}", key, last_value)
      last_sensor_data_store.put(key, value)
      if value != None:
        log.info("Stored new value: {} = {}", key, value)
    stores:
      - last_sensor_data_store

pipelines:
  process_message:
    from: sensor_source_avro
    forEach: process_message

In this example the function process_message uses the state store last_sensor_data_store directly as a variable. It is allowed to do that when it declares such use in its definition under the stores attribute.

State stores have common methods like get and put, which you can call directly from Python code.