Skip to content

Working with Different Data Formats

Learn how to process, convert, and validate data using KSML's supported formats through practical, hands-on examples. This tutorial provides complete working examples for each data format.

For comprehensive syntax reference and format details, see the Data Types and Formats Reference.

Prerequisites

  • Basic understanding of Kafka concepts (topics, messages)
  • Familiarity with basic KSML concepts (streams, functions, pipelines)

Supported Data Formats

  • String: Plain text
  • JSON: Structured data without schema validation
  • Avro: Binary format with schema registry integration
  • CSV: Tabular data with optional schema
  • XML: Hierarchical data with XSD schema support
  • Binary: Raw bytes for custom protocols
  • SOAP: Web service messaging format

Specifying Data Formats

When defining streams in KSML, you specify the data format using the keyType and valueType properties:

Specifying data formats (click to expand)
streams:
  json_stream:
    topic: example_json_topic
    keyType: string
    valueType: json

  avro_stream:
    topic: example_avro_topic
    keyType: string
    valueType: avro:SensorData

Schema-based formats (Avro, XML, CSV) require a schema name: format:SchemaName (e.g., avro:SensorData).

Setup Requirements

  • Create docker-compose.yml with schema registry and pre-created topics
  • Note: This tutorial requires a different docker-compose.yml than other tutorials because Avro format needs a schema registry to store and manage schema definitions
Docker Compose Configuration (click to expand)
networks:
  ksml:
    name: ksml_example
    driver: bridge

services:
  broker:
    image: bitnami/kafka:3.8.0
    hostname: broker
    ports:
      - "9092:9092"
    networks:
      - ksml
    restart: always
    environment:
      KAFKA_CFG_PROCESS_ROLES: 'controller,broker'
      KAFKA_CFG_BROKER_ID: 0
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: '0@broker:9090'
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

      KAFKA_CFG_ADVERTISED_LISTENERS: 'INNER://broker:9093,OUTER://localhost:9092'
      KAFKA_CFG_LISTENERS: 'INNER://broker:9093,OUTER://broker:9092,CONTROLLER://broker:9090'
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'INNER:PLAINTEXT,OUTER:PLAINTEXT,CONTROLLER:PLAINTEXT'
      KAFKA_CFG_LOG_CLEANUP_POLICY: delete
      KAFKA_CFG_LOG_RETENTION_MINUTES: 10
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INNER
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_CFG_MIN_INSYNC_REPLICAS: 1
      KAFKA_CFG_NUM_PARTITIONS: 1
    healthcheck:
      # If the kafka topics can list data, the broker is healthy
      test: kafka-topics.sh --bootstrap-server broker:9093 --list
      interval: 5s
      timeout: 10s
      retries: 10
      start_period: 5s

  ksml:
    image: registry.axual.io/opensource/images/axual/ksml:snapshot
    networks:
      - ksml
    container_name: ksml
    working_dir: /ksml
    volumes:
      - ./examples:/ksml
    depends_on:
      broker:
        condition: service_healthy
      kafka-setup:
        condition: service_completed_successfully
      schema_registry:
        condition: service_healthy


  schema_registry:
    image: apicurio/apicurio-registry:3.0.2
    hostname: schema-registry
    depends_on:
      broker:
        condition: service_healthy
    ports:
      - "8081:8081"
    networks:
      - ksml
    restart: always
    environment:
      QUARKUS_HTTP_PORT: 8081
      QUARKUS_HTTP_CORS_ORIGINS: '*'
      QUARKUS_PROFILE: "prod"
      APICURIO_STORAGE_KIND: kafkasql
      APICURIO_KAFKASQL_BOOTSTRAP_SERVERS: 'broker:9093'
      APICURIO_KAFKASQL_TOPIC: '_apciurio-kafkasql-store'
    healthcheck:
      # If the api endpoint is available, the service is considered healthy
      test: curl http://localhost:8081/apis
      interval: 15s
      timeout: 10s
      retries: 10
      start_period: 10s

  kafka-ui:
    image: quay.io/cloudhut/kowl:master
    platform: linux/amd64
    container_name: kowl
    restart: always
    ports:
      - 8080:8080
    volumes:
      - ./kowl-ui-config.yaml:/config/kowl-ui-config.yaml:ro
    environment:
      CONFIG_FILEPATH: "/config/kowl-ui-config.yaml"
    depends_on:
      broker:
        condition: service_healthy
    networks:
      - ksml

  # This "container" is a workaround to pre-create topics, because setting KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE to true results in Kafka Streams errors
  kafka-setup:
    image: bitnami/kafka:3.8.0
    hostname: kafka-setup
    networks:
      - ksml
    depends_on:
      broker:
        condition: service_healthy
    restart: on-failure
    command: "bash -c 'echo Creating topics for data formats tutorial... && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_avro && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_json && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_json_raw && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_json_processed && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_transformed && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_converted_formats && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_xml && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_xml_processed && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_csv && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_csv_processed && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_binary && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_binary_processed && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_soap_requests && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_soap_responses && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_config && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_readings && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic combined_sensor_data'"
  • Create kowl-ui-config.yaml for Kafka UI:
Kafka UI Configuration (click to expand)
server:
  listenPort: 8080
  listenAddress: 0.0.0.0

kafka:
  brokers:
    - broker:9093
  schemaRegistry:
    enabled: true
    urls:
      - http://schema-registry:8081/apis/ccompat/v7
  • Create examples/ksml-runner.yaml with Avro configuration:
KSML Runner Configuration (click to expand)
ksml:
  definitions:
    producer: producer.yaml
    processor: processor.yaml
  schemaRegistries:
    my_schema_registry:
      config:
        schema.registry.url: http://schema-registry:8081/apis/ccompat/v7
  notations:
    avro:  
      type: confluent_avro         # For Avro there are two implementations: apicurio_avro and confluent_avro
      schemaRegistry: my_schema_registry
      ## Below this line, specify properties to be passed into Confluent's KafkaAvroSerializer and KafkaAvroDeserializer
      config:
        normalize.schemas: true
        auto.register.schemas: true
kafka:
  bootstrap.servers: broker:9093
  application.id: io.ksml.example.producer
  security.protocol: PLAINTEXT
  acks: all
  • For each example, create producer.yaml and processor.yaml files and reference them from ksml-runner.yaml
  • Restart KSML: docker compose down & docker compose up -d && docker compose logs ksml -f (which is faster than docker compose restart ksml)

Working with Avro Data

Avro provides schema-based binary serialization with validation, evolution support, and compact encoding.

This producer generates JSON data that KSML automatically converts to Avro format using the schema registry:

Producer definition for Avro messages (click to expand)
functions:
  generate_sensordata_message:
    type: generator
    globalCode: |
      import time
      import random
      sensorCounter = 0
    code: |
      global sensorCounter

      key = "sensor"+str(sensorCounter)           # Set the key to return ("sensor0" to "sensor9")
      sensorCounter = (sensorCounter+1) % 10      # Increase the counter for next iteration

      # Generate random sensor measurement data
      value = {
        "name": key,
        "timestamp": round(time.time()*1000),     # long timestamp (not string)
        "value": str(random.randrange(0, 100)),
        "type": random.choice(["AREA", "HUMIDITY", "LENGTH", "STATE", "TEMPERATURE"]),  # Valid enum values
        "unit": random.choice(["C", "F", "%", "Pa", "m2", "m", "boolean"]),
        "color": random.choice(["black", "blue", "red", "yellow", "white"]) if random.random() > 0.3 else None,
        "city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"]) if random.random() > 0.3 else None,
        "owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]) if random.random() > 0.3 else None
      }
    expression: (key, value)                      # Return a message tuple with the key and value
    resultType: (string, json)                    # Indicate the type of key and value

producers:
  # Produce an Avro SensorData message every 3 seconds
  sensordata_avro_producer:
    generator: generate_sensordata_message
    interval: 3s
    to:
      topic: sensor_data_avro
      keyType: string
      valueType: avro:SensorData

Create examples/SensorData.avsc schema file (JSON format, auto-loaded from working directory):

Avro Schema for examples below (click to expand)
{
  "namespace": "io.axual.ksml.example",
  "doc": "Emulated sensor data with a few additional attributes",
  "name": "SensorData",
  "type": "record",
  "fields": [
    {
      "doc": "The name of the sensor",
      "name": "name",
      "type": "string"
    },
    {
      "doc": "The timestamp of the sensor reading",
      "name": "timestamp",
      "type": "long"
    },
    {
      "doc": "The value of the sensor, represented as string",
      "name": "value",
      "type": "string"
    },
    {
      "doc": "The type of the sensor",
      "name": "type",
      "type": {
        "name": "SensorType",
        "type": "enum",
        "doc": "The type of a sensor",
        "symbols": [
          "AREA",
          "HUMIDITY",
          "LENGTH",
          "STATE",
          "TEMPERATURE"
        ]
      }
    },
    {
      "doc": "The unit of the sensor",
      "name": "unit",
      "type": "string"
    },
    {
      "doc": "The color of the sensor",
      "name": "color",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "doc": "The city of the sensor",
      "name": "city",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "doc": "The owner of the sensor",
      "name": "owner",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

This processor converts Avro messages to JSON using the convertValue operation:

Avro to JSON conversion processor (click to expand)
streams:
  avro_input:
    topic: sensor_data_avro
    keyType: string
    valueType: avro:SensorData
    offsetResetPolicy: latest

  json_output:
    topic: sensor_data_json
    keyType: string
    valueType: json

pipelines:
  avro_to_json_pipeline:
    from: avro_input
    via:
      # Log the incoming Avro data
      - type: peek
        forEach:
          code: |
            if value is not None:
              log.info("Original Avro: sensor={}, type={}, value={}{}, timestamp={}, owner={}",
                      value.get("name"), value.get("type"), value.get("value"), value.get("unit"), 
                      value.get("timestamp"), value.get("owner"))
            else:
              log.info("Received null message with key={}", key)

      # Explicitly convert from Avro to JSON
      - type: convertValue
        into: json

      # Log the converted JSON data
      - type: peek
        forEach:
          code: |
            if value is not None:
              log.info("Converted to JSON: sensor={}, type={}, city={}, color={}", 
                      value.get("name"), value.get("type"), value.get("city"), value.get("color"))
            else:
              log.info("JSON conversion result: null")

    to: json_output

This processor transforms Avro data (uppercases sensor names) while maintaining the Avro format:

Avro transformation processor (click to expand)
streams:
  avro_input:
    topic: sensor_data_avro
    keyType: string
    valueType: avro:SensorData
    offsetResetPolicy: latest

  transformed_output:
    topic: sensor_data_transformed
    keyType: string
    valueType: avro:SensorData

functions:
  uppercase_sensor_name:
    type: valueTransformer
    code: |
      # Simple transformation: uppercase the sensor name
      result = dict(value) if value else None
      if result and result.get("name"):
        result["name"] = result["name"].upper()
    expression: result
    resultType: avro:SensorData

pipelines:
  transformation_pipeline:
    from: avro_input
    via:
      # Step 1: Apply transformation
      - type: transformValue
        mapper: uppercase_sensor_name

      # Step 2: Log the transformation
      - type: peek
        forEach:
          code: |
            if value is not None:
              log.info("Transformed sensor: name={}, type={}, timestamp={}, value={}{}", 
                      value.get("name"), value.get("type"), value.get("timestamp"), 
                      value.get("value"), value.get("unit"))
            else:
              log.info("Transformed sensor: null")

    to: transformed_output 

Working with JsonSchema Data

JsonSchema provides structured JSON data validation with schema registry support and strict type enforcement, enabling schema evolution and compatibility checks.

Setup Requirements for JsonSchema

JsonSchema requires a specialized setup. Here we show how to support both Avro and JsonSchema together, with manual schema registration. Of course supporting Avro is not required for supporting JsonSchema, this is just an example. Use this complete Docker Compose configuration:

Complete Docker Compose setup with JsonSchema support (click to expand)
networks:
  ksml:
    name: ksml_example
    driver: bridge

services:
  broker:
    image: bitnami/kafka:3.8.0
    hostname: broker
    ports:
      - "9092:9092"
    networks:
      - ksml
    restart: always
    environment:
      KAFKA_CFG_PROCESS_ROLES: 'controller,broker'
      KAFKA_CFG_BROKER_ID: 0
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: '0@broker:9090'
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

      KAFKA_CFG_ADVERTISED_LISTENERS: 'INNER://broker:9093,OUTER://localhost:9092'
      KAFKA_CFG_LISTENERS: 'INNER://broker:9093,OUTER://broker:9092,CONTROLLER://broker:9090'
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'INNER:PLAINTEXT,OUTER:PLAINTEXT,CONTROLLER:PLAINTEXT'
      KAFKA_CFG_LOG_CLEANUP_POLICY: delete
      KAFKA_CFG_LOG_RETENTION_MINUTES: 10
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INNER
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_CFG_MIN_INSYNC_REPLICAS: 1
      KAFKA_CFG_NUM_PARTITIONS: 1
    healthcheck:
      # If the kafka topics can list data, the broker is healthy
      test: kafka-topics.sh --bootstrap-server broker:9093 --list
      interval: 5s
      timeout: 10s
      retries: 10
      start_period: 5s

  ksml:
    image: registry.axual.io/opensource/images/axual/ksml:snapshot
    networks:
      - ksml
    container_name: ksml
    working_dir: /ksml
    volumes:
      - ./examples:/ksml
    depends_on:
      broker:
        condition: service_healthy
      kafka-setup:
        condition: service_completed_successfully
      schema_registry:
        condition: service_healthy
      schema-registration:
        condition: service_completed_successfully


  schema_registry:
    image: apicurio/apicurio-registry:3.0.2
    hostname: schema-registry
    depends_on:
      broker:
        condition: service_healthy
    ports:
      - "8081:8081"
    networks:
      - ksml
    restart: always
    environment:
      QUARKUS_HTTP_PORT: 8081
      QUARKUS_HTTP_CORS_ORIGINS: '*'
      QUARKUS_PROFILE: "prod"
      APICURIO_STORAGE_KIND: kafkasql
      APICURIO_KAFKASQL_BOOTSTRAP_SERVERS: 'broker:9093'
      APICURIO_KAFKASQL_TOPIC: '_apciurio-kafkasql-store'
    healthcheck:
      # If the api endpoint is available, the service is considered healthy
      test: curl http://localhost:8081/apis
      interval: 15s
      timeout: 10s
      retries: 10
      start_period: 10s

  kafka-ui:
    image: quay.io/cloudhut/kowl:master
    platform: linux/amd64
    container_name: kowl
    restart: always
    ports:
      - 8080:8080
    volumes:
      - ./kowl-ui-config.yaml:/config/kowl-ui-config.yaml:ro
    environment:
      CONFIG_FILEPATH: "/config/kowl-ui-config.yaml"
    depends_on:
      broker:
        condition: service_healthy
    networks:
      - ksml

  # This "container" is a workaround to pre-create topics, because setting KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE to true results in Kafka Streams errors
  kafka-setup:
    image: bitnami/kafka:3.8.0
    hostname: kafka-setup
    networks:
      - ksml
    depends_on:
      broker:
        condition: service_healthy
    restart: on-failure
    command: "bash -c 'echo Creating topics for JsonSchema tutorial... && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_avro && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_transformed && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_jsonschema && \
                       kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_jsonschema_processed'"

  # Service to register JsonSchema schemas in Apicurio
  schema-registration:
    image: alpine:latest
    hostname: schema-registration
    networks:
      - ksml
    depends_on:
      schema_registry:
        condition: service_healthy
    volumes:
      - ./examples:/schemas:ro
    command:
      - /bin/sh
      - -c
      - |
        apk add --no-cache curl jq
        echo "Waiting for Apicurio Schema Registry to be ready..."
        until curl -s http://schema-registry:8081/apis > /dev/null 2>&1; do
          echo "Schema Registry not ready yet, waiting..."
          sleep 5
        done
        echo "Schema Registry is ready. Registering schemas..."
        SCHEMA_CONTENT=$$(cat /schemas/SensorData.json)
        ESCAPED_SCHEMA=$$(echo "$$SCHEMA_CONTENT" | jq -Rs .)
        PAYLOAD=$$(echo "{\"schema\": $$ESCAPED_SCHEMA, \"schemaType\": \"JSON\"}")
        echo "Registering SensorData schema..."
        curl -X POST -H "Content-Type: application/json" -d "$$PAYLOAD" \
          "http://schema-registry:8081/apis/ccompat/v7/subjects/sensor_data_jsonschema-value/versions?normalize=true"
        curl -X POST -H "Content-Type: application/json" -d "$$PAYLOAD" \
          "http://schema-registry:8081/apis/ccompat/v7/subjects/sensor_data_jsonschema_processed-value/versions?normalize=true"
        echo "Schema registration completed!"

Key differences from basic Avro setup:

  • Includes automatic JsonSchema schema registration service (schema-registration)
  • Creates topics for both Avro and JsonSchema examples
  • Uses Apicurio Schema Registry with both Confluent compatibility API and native Apicurio API endpoints

Create the required Kafka UI configuration file for schema registry integration:

Kafka UI Configuration (kowl-ui-config.yaml) (click to expand)
server:
  listenPort: 8080
  listenAddress: 0.0.0.0

kafka:
  brokers:
    - broker:9093
  schemaRegistry:
    enabled: true
    urls:
      - http://schema-registry:8081/apis/ccompat/v7

Note: This configuration file is essential for the Kafka UI (Kowl) to connect to both Kafka brokers and the schema registry for viewing schemas and deserializing messages.

Configure KSML runner to work with both Avro and JsonSchema registries:

Complete KSML Runner configuration for JsonSchema (click to expand)
ksml:
  definitions:
    jsonschema-producer: jsonschema-producer.yaml
    jsonschema-processor: jsonschema-processor.yaml
  schemaRegistries:
    my_confluent_registry:
      config:
        schema.registry.url: http://schema-registry:8081/apis/ccompat/v7
    my_apicurio_registry:
      config:
        apicurio.registry.url: http://schema-registry:8081/apis/registry/v2
  notations:
    avro:
      type: confluent_avro         # For Avro there are two implementations: apicurio_avro and confluent_avro
      schemaRegistry: my_confluent_registry
      config:
        normalize.schemas: true
        auto.register.schemas: true
    jsonschema:
      type: apicurio_jsonschema     # For JSON Schema, only apicurio_jsonschema is valid
      schemaRegistry: my_apicurio_registry
      config: {}
kafka:
  bootstrap.servers: broker:9093
  application.id: io.ksml.example.producer
  security.protocol: PLAINTEXT
  acks: all

Important configuration details:

  • Defines two schema registries: my_confluent_registry (for the Confluent Avro notation) and my_apicurio_registry (for JsonSchema)
  • Shows how to configure both confluent_avro and apicurio_jsonschema notations in the same application
  • JsonSchema schemas must be manually registered with Apicurio (auto-registration not supported by Apicurio)

JsonSchema Examples

This producer generates JSON data that KSML validates against JsonSchema format using the schema registry:

Producer definition for JsonSchema messages (click to expand)
functions:
  generate_sensordata_message:
    type: generator
    globalCode: |
      import time
      import random
      sensorCounter = 0
    code: |
      global sensorCounter

      key = "sensor"+str(sensorCounter)           # Set the key to return ("sensor0" to "sensor9")
      sensorCounter = (sensorCounter+1) % 10      # Increase the counter for next iteration

      # Generate random sensor measurement data
      value = {
        "name": key,
        "timestamp": str(round(time.time()*1000)),
        "type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
        "unit": random.choice(["C", "F", "%", "Pa"]),
        "value": str(random.randrange(0, 100)),
        "color": random.choice(["black", "blue", "red", "yellow", "white"]),
        "owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
        "city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
      }

      log.info("Generating sensor data: {}", key)
    expression: (key, value)                      # Return a message tuple with the key and value
    resultType: (string, json)                    # Indicate the type of key and value

producers:
  # Produce a JsonSchema SensorData message every 3 seconds
  sensordata_jsonschema_producer:
    generator: generate_sensordata_message
    interval: 3s
    to:
      topic: sensor_data_jsonschema
      keyType: string
      valueType: jsonschema:SensorData

Create examples/SensorData.json schema file (JSON Schema format, manually registered via Docker service):

JsonSchema Schema for examples below (click to expand)
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "$id": "https://ksml.io/sensordata.json",
  "title": "SensorData",
  "description": "Emulated sensor data with a few additional attributes",
  "type": "object",
  "properties": {
    "name": {
      "description": "The name of the sensor",
      "type": "string"
    },
    "timestamp": {
      "description": "The timestamp of the sensor reading",
      "type": "number"
    },
    "value": {
      "description": "The value of the sensor, represented as string",
      "type": "string"
    },
    "type": {
      "description": "The type of the sensor",
      "type": "string",
      "enum": [
        "AREA",
        "HUMIDITY",
        "LENGTH",
        "STATE",
        "TEMPERATURE"
      ]
    },
    "unit": {
      "description": "The unit of the sensor",
      "type": "string"
    },
    "color": {
      "description": "The color of the sensor",
      "type": "string",
      "default": null
    },
    "city": {
      "description": "The city of the sensor",
      "type": "string",
      "default": null
    },
    "owner": {
      "description": "The owner of the sensor",
      "type": "string",
      "default": null
    }
  }
}

This processor transforms JsonSchema data (adds processing timestamp and uppercase sensor ID) then converts to JSON format:

JsonSchema transformation processor (click to expand)
streams:
  jsonschema_input:
    topic: sensor_data_jsonschema
    keyType: string
    valueType: jsonschema:SensorData
    offsetResetPolicy: earliest

  jsonschema_processed:
    topic: sensor_data_jsonschema_processed
    keyType: string
    valueType: json

functions:
  enrich_sensor_data:
    type: valueTransformer
    code: |
      # Simple transformation: add processing timestamp
      import time
      result = dict(value) if value else {}
      result["processed_at"] = str(int(time.time() * 1000))
      # Uppercase the sensor name
      if result.get("name"):
        result["sensor_id"] = result["name"].upper()
    expression: result
    resultType: json

pipelines:
  # Main processing pipeline
  jsonschema_processing:
    from: jsonschema_input
    via:      
      # Apply transformation to enrich data
      - type: transformValue
        mapper: enrich_sensor_data

      # Log the processed data
      - type: peek
        forEach:
          code: |
            log.info("Processed JsonSchema sensor: name={}, sensor_id={}, processed_at={}, full_value={}", 
                     value.get("name") if value else "null",
                     value.get("sensor_id") if value else "null",
                     value.get("processed_at") if value else "null",
                     str(value))

    to: jsonschema_processed

Working with JSON Data

JSON provides flexible, human-readable structured data without schema validation requirements.

This producer generates JSON sensor data directly (no format conversion needed):

Producer definition for JSON messages (click to expand)
functions:
  generate_json_sensordata:
    type: generator
    globalCode: |
      import time
      import random
      sensorCounter = 0
    code: |
      global sensorCounter

      key = "sensor"+str(sensorCounter)           # Set the key to return ("sensor0" to "sensor9")
      sensorCounter = (sensorCounter+1) % 10      # Increase the counter for next iteration

      # Generate random sensor measurement data as JSON
      value = {
        "name": key,
        "timestamp": str(round(time.time()*1000)),
        "type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
        "unit": random.choice(["C", "F", "%", "Pa"]),
        "value": str(random.randrange(0, 100)),
        "color": random.choice(["black", "blue", "red", "yellow", "white"]),
        "owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
        "city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
      }
    expression: (key, value)                      # Return a message tuple with the key and value
    resultType: (string, json)                    # Indicate the type of key and value

producers:
  # Produce JSON sensor data messages every 2 seconds
  json_sensor_producer:
    generator: generate_json_sensordata
    interval: 2s
    to:
      topic: sensor_data_json_raw
      keyType: string
      valueType: json

This processor demonstrates key-value transformation using keyValueTransformer to modify both message keys and values:

Processor definition for JSON messages (click to expand)
streams:
  json_input:
    topic: sensor_data_json_raw
    keyType: string
    valueType: json
    offsetResetPolicy: latest

  json_output:
    topic: sensor_data_json_processed
    keyType: string
    valueType: json

functions:
  add_processing_info:
    type: keyValueTransformer
    code: |
      # Simple transformation: add processing info
      import time
      new_value = dict(value) if value else {}
      new_value["processed"] = True
      new_value["processed_at"] = str(int(time.time() * 1000))
      new_key = f"processed_{key}"
    expression: (new_key, new_value)
    resultType: (string, json)

pipelines:
  json_processing_pipeline:
    from: json_input
    via:
      # Transform both key and value
      - type: transformKeyValue
        mapper: add_processing_info

      # Log the transformed data
      - type: peek
        forEach:
          code: |
            log.info("Transformed: key={}, sensor={}, processed_at={}",
                    key, value.get("name"), value.get("processed_at"))

    to: json_output

Working with CSV Data

CSV handles tabular data with schema-based column definitions and structured object access.

Create examples/SensorData.csv schema file (defines column order):

CSV schema (click to expand)
name,timestamp,value,type,unit,color,city,owner

This producer generates JSON data that KSML converts to CSV format using the schema:

Producer definition for CSV messages (click to expand)
functions:
  generate_sensordata_message:
    type: generator
    globalCode: |
      import time
      import random
      sensorCounter = 0
    code: |
      global sensorCounter

      key = "sensor"+str(sensorCounter)           # Set the key to return ("sensor0" to "sensor9")
      sensorCounter = (sensorCounter+1) % 10      # Increase the counter for next iteration

      # Generate data matching SensorData.csv schema
      # Schema: name,timestamp,value,type,unit,color,city,owner
      value = {
        "name": key,
        "timestamp": str(round(time.time()*1000)),
        "value": str(random.randrange(0, 100)),
        "type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
        "unit": random.choice(["C", "F", "%", "Pa"]),
        "color": random.choice(["black", "blue", "red", "yellow", "white"]),
        "owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
        "city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
      }
    expression: (key, value)                      # Return a message tuple with the key and value
    resultType: (string, json)                    # Generate as JSON, KSML will convert to CSV

producers:
  # Produce CSV sensor data messages every 3 seconds
  sensordata_csv_producer:
    generator: generate_sensordata_message
    interval: 3s
    to:
      topic: ksml_sensordata_csv
      keyType: string
      valueType: csv:SensorData                  # KSML will convert JSON to CSV format

This processor demonstrates CSV data manipulation (uppercases city names) while maintaining CSV format:

Processor definition for CSV messages (click to expand)
streams:
  csv_input:
    topic: ksml_sensordata_csv
    keyType: string
    valueType: csv:SensorData
    offsetResetPolicy: latest

  csv_output:
    topic: ksml_sensordata_csv_processed
    keyType: string
    valueType: csv:SensorData

functions:
  uppercase_city:
    type: valueTransformer
    code: |
      # When using csv:SensorData, the data comes as a structured object (dict)
      if value and isinstance(value, dict):
        # Create a copy and uppercase the city
        enriched = dict(value)
        if "city" in enriched:
          enriched["city"] = enriched["city"].upper()
        result = enriched
      else:
        result = value
    expression: result
    resultType: csv:SensorData

pipelines:
  process_csv:
    from: csv_input
    via:
      # Log the original CSV data
      - type: peek
        forEach:
          code: |
            if value:
              log.info("Original: sensor={}, city={}", value.get("name"), value.get("city"))

      # Transform the CSV data - uppercase city
      - type: transformValue
        mapper: uppercase_city

      # Log the transformed CSV data
      - type: peek
        forEach:
          code: |
            if value:
              log.info("Transformed: sensor={}, city={}", value.get("name"), value.get("city"))

    to: csv_output

Working with XML Data

XML (eXtensible Markup Language) is a structured format for representing hierarchical data with custom tags and attributes.

  • XML data is represented as nested elements with opening and closing tags
  • Elements can contain text content, attributes, and child elements
  • XSD (XML Schema Definition) defines the structure, data types, and constraints for XML documents

Requirements for running KSML XML definitions

To run KSML XML processing definitions below, please follow these steps:

  • Save this examples/SensorData.xsd as XML schema:
XSD schema (click to expand)
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="SensorData">
        <xs:complexType>
            <xs:sequence>
                <xs:element name="name" type="xs:string"/>
                <xs:element name="timestamp" type="xs:long"/>
                <xs:element name="value" type="xs:string"/>
                <xs:element name="type" type="xs:string"/>
                <xs:element name="unit" type="xs:string"/>
                <xs:element name="color" type="xs:string" minOccurs="0" maxOccurs="1"/>
                <xs:element name="city" type="xs:string" minOccurs="0" maxOccurs="1"/>
                <xs:element name="owner" type="xs:string" minOccurs="0" maxOccurs="1"/>
            </xs:sequence>
        </xs:complexType>
    </xs:element>
</xs:schema>
  • Use this examples/producer.yaml that produces XML messages that our processing definition below can work with:
Producer definition for XML messages (click to expand)
functions:
  generate_sensordata_message:
    type: generator
    globalCode: |
      import time
      import random
      sensorCounter = 0
    code: |
      global sensorCounter

      key = "sensor"+str(sensorCounter)           # Set the key to return ("sensor0" to "sensor9")
      sensorCounter = (sensorCounter+1) % 10      # Increase the counter for next iteration

      # Generate data matching SensorData.xsd schema
      value = {
        "name": key,
        "timestamp": round(time.time()*1000),     # timestamp as long, not string
        "value": str(random.randrange(0, 100)),
        "type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
        "unit": random.choice(["C", "F", "%", "Pa"]),
        "color": random.choice(["black", "blue", "red", "yellow", "white"]),
        "city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"]),
        "owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"])
      }
    expression: (key, value)                      # Return a message tuple with the key and value
    resultType: (string, json)                    # Generate as JSON, KSML will convert to XML

producers:
  # Produce XML sensor data messages every 3 seconds
  sensordata_xml_producer:
    generator: generate_sensordata_message
    interval: 3s
    to:
      topic: ksml_sensordata_xml
      keyType: string
      valueType: xml:SensorData                   # KSML will convert JSON to XML format

This processor demonstrates XML data manipulation (uppercases city names) while maintaining XML format:

Processor definition for XML messages (click to expand)
streams:
  xml_input:
    topic: ksml_sensordata_xml
    keyType: string
    valueType: xml:SensorData
    offsetResetPolicy: latest

  xml_output:
    topic: ksml_sensordata_xml_processed
    keyType: string
    valueType: xml:SensorData

functions:
  uppercase_city:
    type: valueTransformer
    code: |
      # When using xml:SensorData, the data comes as a structured object (dict)
      if value and isinstance(value, dict):
        # Create a copy and uppercase the city
        enriched = dict(value)
        if "city" in enriched:
          enriched["city"] = enriched["city"].upper()
        result = enriched
      else:
        result = value
    expression: result
    resultType: xml:SensorData

pipelines:
  process_xml:
    from: xml_input
    via:
      # Log the original XML data
      - type: peek
        forEach:
          code: |
            if value:
              log.info("Original: sensor={}, city={}", value.get("name"), value.get("city"))

      # Transform the XML data - uppercase city
      - type: transformValue
        mapper: uppercase_city

      # Log the transformed XML data
      - type: peek
        forEach:
          code: |
            if value:
              log.info("Transformed: sensor={}, city={}", value.get("name"), value.get("city"))

    to: xml_output
  • Please note that kowl is not capable of deserializing XML messages and will display the value of the messages as blank.
  • To read the XML messages use kcat:
kcat -b localhost:9092 -t ksml_sensordata_xml -C -o -1 -c 1 -f '%s\n' | xmllint --format -
XML message example (click to expand)
<?xml version="1.1" encoding="UTF-8"?>
<SensorData>
  <city>Rotterdam</city>
  <color>black</color>
  <name>sensor6</name>
  <owner>Alice</owner>
  <timestamp>1754376106863</timestamp>
  <type>TEMPERATURE</type>
  <unit>%</unit>
  <value>12</value>
</SensorData>

Working with Binary Data

Binary data represents raw bytes as sequences of numeric values ranging from 0 to 255, ideal for handling non-text content like images, files, or custom protocols.

  • Binary data is represented as arrays of integers where each value corresponds to a single byte
  • Each byte can store values from 0-255, allowing for compact encoding of various data types
  • Binary processing enables direct byte manipulation, bit-level operations, and efficient handling of structured binary formats

This producer creates simple binary messages as byte arrays (7-byte messages with counter, random bytes, and ASCII "KSML"):

Producer definition for Binary messages (click to expand)
functions:
  generate_binary_message:
    type: generator
    globalCode: |
      import random
      counter = 0
    code: |
      global counter

      key = "msg" + str(counter)
      counter = (counter + 1) % 100

      # Create simple binary message as list of bytes
      value = [
        counter % 256,                    # Counter byte
        random.randrange(0, 256),         # Random byte
        ord('K'),                         # ASCII 'K'
        ord('S'),                         # ASCII 'S'
        ord('M'),                         # ASCII 'M'
        ord('L'),                         # ASCII 'L'
        random.randrange(0, 256)          # Another random byte
      ]

      log.info("Generated binary message: key={}, bytes={}", key, value)
    expression: (key, value)
    resultType: (string, bytes)

producers:
  binary_producer:
    generator: generate_binary_message
    interval: 3s
    to:
      topic: ksml_sensordata_binary
      keyType: string
      valueType: bytes

This processor demonstrates binary data manipulation (increments first byte) while maintaining binary format:

Processor definition for Binary messages (click to expand)
streams:
  binary_input:
    topic: ksml_sensordata_binary
    keyType: string
    valueType: bytes
    offsetResetPolicy: latest

  binary_output:
    topic: ksml_sensordata_binary_processed
    keyType: string
    valueType: bytes

functions:
  increment_first_byte:
    type: valueTransformer
    code: |
      # Simple binary manipulation: increment the first data byte
      if isinstance(value, list) and len(value) > 0:
        modified = list(value)
        modified[0] = (modified[0] + 1) % 256  # Increment and wrap at 256
        result = modified
      else:
        result = value
    expression: result
    resultType: bytes

pipelines:
  process_binary:
    from: binary_input
    via:
      # Log input binary
      - type: peek
        forEach:
          code: |
            log.info("Binary input: key={}, bytes={}", key, value)

      # Modify binary data
      - type: transformValue
        mapper: increment_first_byte

      # Log output binary
      - type: peek
        forEach:
          code: |
            log.info("Binary output: key={}, bytes={}", key, value)

    to: binary_output

Working with SOAP Data

SOAP provides structured web service messaging with envelope/body format and no WSDL requirements.

This producer creates SOAP request messages with envelope/body structure (no WSDL files required):

Producer definition for SOAP messages (click to expand)
functions:
  generate_soap_message:
    type: generator
    globalCode: |
      counter = 0
    code: |
      global counter

      key = "msg" + str(counter)
      counter = (counter + 1) % 100

      # Create simple SOAP message
      value = {
        "envelope": {
          "body": {
            "elements": [
              {
                "name": {
                  "localPart": "SensorRequest",
                  "namespaceURI": "http://example.com/ksml"
                },
                "value": {
                  "id": str(counter),
                  "type": "temperature"
                }
              }
            ]
          }
        }
      }

      log.info("Generated SOAP message: {}", key)
    expression: (key, value)
    resultType: (string, soap)

producers:
  soap_producer:
    generator: generate_soap_message
    interval: 3s
    to:
      topic: ksml_soap_requests
      keyType: string
      valueType: soap

This processor transforms SOAP requests into SOAP responses (extracts request data and creates response with sensor values):

Processor definition for SOAP messages (click to expand)
streams:
  soap_input:
    topic: ksml_soap_requests
    keyType: string
    valueType: soap
    offsetResetPolicy: latest

  soap_output:
    topic: ksml_soap_responses
    keyType: string
    valueType: soap

functions:
  create_response:
    type: valueTransformer
    code: |
      # Extract request data
      request_elements = value.get("envelope", {}).get("body", {}).get("elements", [])
      request_data = request_elements[0].get("value", {}) if request_elements else {}

      # Create SOAP response
      result = {
        "envelope": {
          "body": {
            "elements": [
              {
                "name": {
                  "localPart": "SensorResponse",
                  "namespaceURI": "http://example.com/ksml"
                },
                "value": {
                  "id": request_data.get("id", "unknown"),
                  "temperature": "25"
                }
              }
            ]
          }
        }
      }
    expression: result
    resultType: soap

pipelines:
  process_soap:
    from: soap_input
    via:
      # Log input
      - type: peek
        forEach:
          code: |
            log.info("SOAP request: key={}", key)

      # Transform to response
      - type: transformValue
        mapper: create_response

      # Log output
      - type: peek
        forEach:
          code: |
            log.info("SOAP response: key={}", key)

    to: soap_output

Converting Between Data Formats

Use the convertValue operation to transform data between formats within a single pipeline.

This producer generates Avro messages for format conversion demonstrations:

Producer definition (click to expand)
functions:
  generate_sensordata_message:
    type: generator
    globalCode: |
      import time
      import random
      sensorCounter = 0
    code: |
      global sensorCounter

      key = "sensor"+str(sensorCounter)           # Set the key to return ("sensor0" to "sensor9")
      sensorCounter = (sensorCounter+1) % 10      # Increase the counter for next iteration

      # Generate data matching SensorData schema
      value = {
        "name": key,
        "timestamp": str(round(time.time()*1000)),
        "value": str(random.randrange(0, 100)),
        "type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
        "unit": random.choice(["C", "F", "%", "Pa"]),
        "color": random.choice(["black", "blue", "red", "yellow", "white"]),
        "owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
        "city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
      }

      # Occasionally send null messages for testing
      if random.randrange(20) == 0:
        value = None
    expression: (key, value)                      # Return a message tuple with the key and value
    resultType: (string, json)                    # Generate as JSON, KSML will convert to Avro

producers:
  # Produce Avro sensor data messages every 3 seconds
  sensordata_avro_producer:
    generator: generate_sensordata_message
    interval: 3s
    to:
      topic: sensor_data_avro
      keyType: string
      valueType: avro:SensorData

This processor demonstrates multiple format conversions (Avro → JSON → String → JSON) using convertValue:

Processing definition for converting between multiple formats (click to expand)
streams:
  avro_input:
    topic: sensor_data_avro
    keyType: string
    valueType: avro:SensorData
    offsetResetPolicy: latest

  json_output:
    topic: sensor_data_converted_formats
    keyType: string
    valueType: json

pipelines:
  format_conversion:
    from: avro_input
    via:
      # Log original Avro data
      - type: peek
        forEach:
          code: |
            if value is not None:
              log.info("Original Avro: sensor={}, type={}, value={}{}",
                      value.get("name"), value.get("type"), value.get("value"), value.get("unit"))
            else:
              log.info("Received null Avro message with key={}", key)

      # Convert from Avro to JSON
      - type: convertValue
        into: json
      - type: peek
        forEach:
          code: |
            if value is not None:
              log.info("Converted to JSON: sensor={}, city={}", value.get("name"), value.get("city"))
            else:
              log.info("JSON conversion result: null")

      # Convert from JSON to string to see serialized representation
      - type: convertValue
        into: string
      - type: peek
        forEach:
          code: |
            if value is not None:
              log.info("As string (first 100 chars): {}", str(value)[:100])
            else:
              log.info("String conversion result: null")

      # Convert back to JSON for output
      - type: convertValue
        into: json
      - type: peek
        forEach:
          code: |
            if value is not None:
              log.info("Final JSON output: sensor={}, owner={}", value.get("name"), value.get("owner"))
            else:
              log.info("Final conversion result: null")

    to: json_output

Format Conversion and Multiple Formats

For comprehensive information on format conversion requirements, chaining conversions, and working with multiple formats in a single pipeline, see the Data Types and Formats Reference - Type Conversion section.

Conclusion

You've learned to work with KSML's data formats through practical examples: JSON, Avro, CSV, XML, Binary, and SOAP. Key concepts covered include format specification, schema usage, conversion operations, and multi-format pipelines.

For complete syntax reference, type definitions, and advanced format features, refer to the Data Types and Formats Reference.

Next Steps