Working with Different Data Formats
Learn how to process, convert, and validate data using KSML's supported formats through practical, hands-on examples. This tutorial provides complete working examples for each data format.
For comprehensive syntax reference and format details, see the Data Types and Formats Reference.
Prerequisites
- Basic understanding of Kafka concepts (topics, messages)
- Familiarity with basic KSML concepts (streams, functions, pipelines)
Supported Data Formats
- String: Plain text
- JSON: Structured data without schema validation
- Avro: Binary format with schema registry integration
- CSV: Tabular data with optional schema
- XML: Hierarchical data with XSD schema support
- Binary: Raw bytes for custom protocols
- SOAP: Web service messaging format
Specifying Data Formats
When defining streams in KSML, you specify the data format using the keyType
and valueType
properties:
Specifying data formats (click to expand)
Schema-based formats (Avro, XML, CSV) require a schema name: format:SchemaName
(e.g., avro:SensorData
).
Setup Requirements
- Create
docker-compose.yml
with schema registry and pre-created topics - Note: This tutorial requires a different docker-compose.yml than other tutorials because Avro format needs a schema registry to store and manage schema definitions
Docker Compose Configuration (click to expand)
networks:
ksml:
name: ksml_example
driver: bridge
services:
broker:
image: bitnami/kafka:3.8.0
hostname: broker
ports:
- "9092:9092"
networks:
- ksml
restart: always
environment:
KAFKA_CFG_PROCESS_ROLES: 'controller,broker'
KAFKA_CFG_BROKER_ID: 0
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: '0@broker:9090'
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CFG_ADVERTISED_LISTENERS: 'INNER://broker:9093,OUTER://localhost:9092'
KAFKA_CFG_LISTENERS: 'INNER://broker:9093,OUTER://broker:9092,CONTROLLER://broker:9090'
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'INNER:PLAINTEXT,OUTER:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_CFG_LOG_CLEANUP_POLICY: delete
KAFKA_CFG_LOG_RETENTION_MINUTES: 10
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INNER
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_CFG_MIN_INSYNC_REPLICAS: 1
KAFKA_CFG_NUM_PARTITIONS: 1
healthcheck:
# If the kafka topics can list data, the broker is healthy
test: kafka-topics.sh --bootstrap-server broker:9093 --list
interval: 5s
timeout: 10s
retries: 10
start_period: 5s
ksml:
image: registry.axual.io/opensource/images/axual/ksml:snapshot
networks:
- ksml
container_name: ksml
working_dir: /ksml
volumes:
- ./examples:/ksml
depends_on:
broker:
condition: service_healthy
kafka-setup:
condition: service_completed_successfully
schema_registry:
condition: service_healthy
schema_registry:
image: apicurio/apicurio-registry:3.0.2
hostname: schema-registry
depends_on:
broker:
condition: service_healthy
ports:
- "8081:8081"
networks:
- ksml
restart: always
environment:
QUARKUS_HTTP_PORT: 8081
QUARKUS_HTTP_CORS_ORIGINS: '*'
QUARKUS_PROFILE: "prod"
APICURIO_STORAGE_KIND: kafkasql
APICURIO_KAFKASQL_BOOTSTRAP_SERVERS: 'broker:9093'
APICURIO_KAFKASQL_TOPIC: '_apciurio-kafkasql-store'
healthcheck:
# If the api endpoint is available, the service is considered healthy
test: curl http://localhost:8081/apis
interval: 15s
timeout: 10s
retries: 10
start_period: 10s
kafka-ui:
image: quay.io/cloudhut/kowl:master
platform: linux/amd64
container_name: kowl
restart: always
ports:
- 8080:8080
volumes:
- ./kowl-ui-config.yaml:/config/kowl-ui-config.yaml:ro
environment:
CONFIG_FILEPATH: "/config/kowl-ui-config.yaml"
depends_on:
broker:
condition: service_healthy
networks:
- ksml
# This "container" is a workaround to pre-create topics, because setting KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE to true results in Kafka Streams errors
kafka-setup:
image: bitnami/kafka:3.8.0
hostname: kafka-setup
networks:
- ksml
depends_on:
broker:
condition: service_healthy
restart: on-failure
command: "bash -c 'echo Creating topics for data formats tutorial... && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_avro && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_json && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_json_raw && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_json_processed && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_transformed && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_converted_formats && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_xml && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_xml_processed && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_csv && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_csv_processed && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_binary && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_sensordata_binary_processed && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_soap_requests && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic ksml_soap_responses && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic device_config && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_readings && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic combined_sensor_data'"
- Create
kowl-ui-config.yaml
for Kafka UI:
Kafka UI Configuration (click to expand)
- Create
examples/ksml-runner.yaml
with Avro configuration:
KSML Runner Configuration (click to expand)
ksml:
definitions:
producer: producer.yaml
processor: processor.yaml
schemaRegistries:
my_schema_registry:
config:
schema.registry.url: http://schema-registry:8081/apis/ccompat/v7
notations:
avro:
type: confluent_avro # For Avro there are two implementations: apicurio_avro and confluent_avro
schemaRegistry: my_schema_registry
## Below this line, specify properties to be passed into Confluent's KafkaAvroSerializer and KafkaAvroDeserializer
config:
normalize.schemas: true
auto.register.schemas: true
kafka:
bootstrap.servers: broker:9093
application.id: io.ksml.example.producer
security.protocol: PLAINTEXT
acks: all
- For each example, create
producer.yaml
andprocessor.yaml
files and reference them fromksml-runner.yaml
- Restart KSML:
docker compose down & docker compose up -d && docker compose logs ksml -f
(which is faster thandocker compose restart ksml
)
Working with Avro Data
Avro provides schema-based binary serialization with validation, evolution support, and compact encoding.
This producer generates JSON data that KSML automatically converts to Avro format using the schema registry:
Producer definition for Avro messages (click to expand)
functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate random sensor measurement data
value = {
"name": key,
"timestamp": round(time.time()*1000), # long timestamp (not string)
"value": str(random.randrange(0, 100)),
"type": random.choice(["AREA", "HUMIDITY", "LENGTH", "STATE", "TEMPERATURE"]), # Valid enum values
"unit": random.choice(["C", "F", "%", "Pa", "m2", "m", "boolean"]),
"color": random.choice(["black", "blue", "red", "yellow", "white"]) if random.random() > 0.3 else None,
"city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"]) if random.random() > 0.3 else None,
"owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]) if random.random() > 0.3 else None
}
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Indicate the type of key and value
producers:
# Produce an Avro SensorData message every 3 seconds
sensordata_avro_producer:
generator: generate_sensordata_message
interval: 3s
to:
topic: sensor_data_avro
keyType: string
valueType: avro:SensorData
Create examples/SensorData.avsc
schema file (JSON format, auto-loaded from working directory):
Avro Schema for examples below (click to expand)
{
"namespace": "io.axual.ksml.example",
"doc": "Emulated sensor data with a few additional attributes",
"name": "SensorData",
"type": "record",
"fields": [
{
"doc": "The name of the sensor",
"name": "name",
"type": "string"
},
{
"doc": "The timestamp of the sensor reading",
"name": "timestamp",
"type": "long"
},
{
"doc": "The value of the sensor, represented as string",
"name": "value",
"type": "string"
},
{
"doc": "The type of the sensor",
"name": "type",
"type": {
"name": "SensorType",
"type": "enum",
"doc": "The type of a sensor",
"symbols": [
"AREA",
"HUMIDITY",
"LENGTH",
"STATE",
"TEMPERATURE"
]
}
},
{
"doc": "The unit of the sensor",
"name": "unit",
"type": "string"
},
{
"doc": "The color of the sensor",
"name": "color",
"type": [
"null",
"string"
],
"default": null
},
{
"doc": "The city of the sensor",
"name": "city",
"type": [
"null",
"string"
],
"default": null
},
{
"doc": "The owner of the sensor",
"name": "owner",
"type": [
"null",
"string"
],
"default": null
}
]
}
This processor converts Avro messages to JSON using the convertValue
operation:
Avro to JSON conversion processor (click to expand)
streams:
avro_input:
topic: sensor_data_avro
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
json_output:
topic: sensor_data_json
keyType: string
valueType: json
pipelines:
avro_to_json_pipeline:
from: avro_input
via:
# Log the incoming Avro data
- type: peek
forEach:
code: |
if value is not None:
log.info("Original Avro: sensor={}, type={}, value={}{}, timestamp={}, owner={}",
value.get("name"), value.get("type"), value.get("value"), value.get("unit"),
value.get("timestamp"), value.get("owner"))
else:
log.info("Received null message with key={}", key)
# Explicitly convert from Avro to JSON
- type: convertValue
into: json
# Log the converted JSON data
- type: peek
forEach:
code: |
if value is not None:
log.info("Converted to JSON: sensor={}, type={}, city={}, color={}",
value.get("name"), value.get("type"), value.get("city"), value.get("color"))
else:
log.info("JSON conversion result: null")
to: json_output
This processor transforms Avro data (uppercases sensor names) while maintaining the Avro format:
Avro transformation processor (click to expand)
streams:
avro_input:
topic: sensor_data_avro
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
transformed_output:
topic: sensor_data_transformed
keyType: string
valueType: avro:SensorData
functions:
uppercase_sensor_name:
type: valueTransformer
code: |
# Simple transformation: uppercase the sensor name
result = dict(value) if value else None
if result and result.get("name"):
result["name"] = result["name"].upper()
expression: result
resultType: avro:SensorData
pipelines:
transformation_pipeline:
from: avro_input
via:
# Step 1: Apply transformation
- type: transformValue
mapper: uppercase_sensor_name
# Step 2: Log the transformation
- type: peek
forEach:
code: |
if value is not None:
log.info("Transformed sensor: name={}, type={}, timestamp={}, value={}{}",
value.get("name"), value.get("type"), value.get("timestamp"),
value.get("value"), value.get("unit"))
else:
log.info("Transformed sensor: null")
to: transformed_output
Working with JsonSchema Data
JsonSchema provides structured JSON data validation with schema registry support and strict type enforcement, enabling schema evolution and compatibility checks.
Setup Requirements for JsonSchema
JsonSchema requires a specialized setup. Here we show how to support both Avro and JsonSchema together, with manual schema registration. Of course supporting Avro is not required for supporting JsonSchema, this is just an example. Use this complete Docker Compose configuration:
Complete Docker Compose setup with JsonSchema support (click to expand)
networks:
ksml:
name: ksml_example
driver: bridge
services:
broker:
image: bitnami/kafka:3.8.0
hostname: broker
ports:
- "9092:9092"
networks:
- ksml
restart: always
environment:
KAFKA_CFG_PROCESS_ROLES: 'controller,broker'
KAFKA_CFG_BROKER_ID: 0
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: '0@broker:9090'
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CFG_ADVERTISED_LISTENERS: 'INNER://broker:9093,OUTER://localhost:9092'
KAFKA_CFG_LISTENERS: 'INNER://broker:9093,OUTER://broker:9092,CONTROLLER://broker:9090'
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'INNER:PLAINTEXT,OUTER:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_CFG_LOG_CLEANUP_POLICY: delete
KAFKA_CFG_LOG_RETENTION_MINUTES: 10
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INNER
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_CFG_MIN_INSYNC_REPLICAS: 1
KAFKA_CFG_NUM_PARTITIONS: 1
healthcheck:
# If the kafka topics can list data, the broker is healthy
test: kafka-topics.sh --bootstrap-server broker:9093 --list
interval: 5s
timeout: 10s
retries: 10
start_period: 5s
ksml:
image: registry.axual.io/opensource/images/axual/ksml:snapshot
networks:
- ksml
container_name: ksml
working_dir: /ksml
volumes:
- ./examples:/ksml
depends_on:
broker:
condition: service_healthy
kafka-setup:
condition: service_completed_successfully
schema_registry:
condition: service_healthy
schema-registration:
condition: service_completed_successfully
schema_registry:
image: apicurio/apicurio-registry:3.0.2
hostname: schema-registry
depends_on:
broker:
condition: service_healthy
ports:
- "8081:8081"
networks:
- ksml
restart: always
environment:
QUARKUS_HTTP_PORT: 8081
QUARKUS_HTTP_CORS_ORIGINS: '*'
QUARKUS_PROFILE: "prod"
APICURIO_STORAGE_KIND: kafkasql
APICURIO_KAFKASQL_BOOTSTRAP_SERVERS: 'broker:9093'
APICURIO_KAFKASQL_TOPIC: '_apciurio-kafkasql-store'
healthcheck:
# If the api endpoint is available, the service is considered healthy
test: curl http://localhost:8081/apis
interval: 15s
timeout: 10s
retries: 10
start_period: 10s
kafka-ui:
image: quay.io/cloudhut/kowl:master
platform: linux/amd64
container_name: kowl
restart: always
ports:
- 8080:8080
volumes:
- ./kowl-ui-config.yaml:/config/kowl-ui-config.yaml:ro
environment:
CONFIG_FILEPATH: "/config/kowl-ui-config.yaml"
depends_on:
broker:
condition: service_healthy
networks:
- ksml
# This "container" is a workaround to pre-create topics, because setting KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE to true results in Kafka Streams errors
kafka-setup:
image: bitnami/kafka:3.8.0
hostname: kafka-setup
networks:
- ksml
depends_on:
broker:
condition: service_healthy
restart: on-failure
command: "bash -c 'echo Creating topics for JsonSchema tutorial... && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_avro && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_transformed && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_jsonschema && \
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic sensor_data_jsonschema_processed'"
# Service to register JsonSchema schemas in Apicurio
schema-registration:
image: alpine:latest
hostname: schema-registration
networks:
- ksml
depends_on:
schema_registry:
condition: service_healthy
volumes:
- ./examples:/schemas:ro
command:
- /bin/sh
- -c
- |
apk add --no-cache curl jq
echo "Waiting for Apicurio Schema Registry to be ready..."
until curl -s http://schema-registry:8081/apis > /dev/null 2>&1; do
echo "Schema Registry not ready yet, waiting..."
sleep 5
done
echo "Schema Registry is ready. Registering schemas..."
SCHEMA_CONTENT=$$(cat /schemas/SensorData.json)
ESCAPED_SCHEMA=$$(echo "$$SCHEMA_CONTENT" | jq -Rs .)
PAYLOAD=$$(echo "{\"schema\": $$ESCAPED_SCHEMA, \"schemaType\": \"JSON\"}")
echo "Registering SensorData schema..."
curl -X POST -H "Content-Type: application/json" -d "$$PAYLOAD" \
"http://schema-registry:8081/apis/ccompat/v7/subjects/sensor_data_jsonschema-value/versions?normalize=true"
curl -X POST -H "Content-Type: application/json" -d "$$PAYLOAD" \
"http://schema-registry:8081/apis/ccompat/v7/subjects/sensor_data_jsonschema_processed-value/versions?normalize=true"
echo "Schema registration completed!"
Key differences from basic Avro setup:
- Includes automatic JsonSchema schema registration service (
schema-registration
) - Creates topics for both Avro and JsonSchema examples
- Uses Apicurio Schema Registry with both Confluent compatibility API and native Apicurio API endpoints
Create the required Kafka UI configuration file for schema registry integration:
Kafka UI Configuration (kowl-ui-config.yaml) (click to expand)
Note: This configuration file is essential for the Kafka UI (Kowl) to connect to both Kafka brokers and the schema registry for viewing schemas and deserializing messages.
Configure KSML runner to work with both Avro and JsonSchema registries:
Complete KSML Runner configuration for JsonSchema (click to expand)
ksml:
definitions:
jsonschema-producer: jsonschema-producer.yaml
jsonschema-processor: jsonschema-processor.yaml
schemaRegistries:
my_confluent_registry:
config:
schema.registry.url: http://schema-registry:8081/apis/ccompat/v7
my_apicurio_registry:
config:
apicurio.registry.url: http://schema-registry:8081/apis/registry/v2
notations:
avro:
type: confluent_avro # For Avro there are two implementations: apicurio_avro and confluent_avro
schemaRegistry: my_confluent_registry
config:
normalize.schemas: true
auto.register.schemas: true
jsonschema:
type: apicurio_jsonschema # For JSON Schema, only apicurio_jsonschema is valid
schemaRegistry: my_apicurio_registry
config: {}
kafka:
bootstrap.servers: broker:9093
application.id: io.ksml.example.producer
security.protocol: PLAINTEXT
acks: all
Important configuration details:
- Defines two schema registries:
my_confluent_registry
(for the Confluent Avro notation) andmy_apicurio_registry
(for JsonSchema) - Shows how to configure both
confluent_avro
andapicurio_jsonschema
notations in the same application - JsonSchema schemas must be manually registered with Apicurio (auto-registration not supported by Apicurio)
JsonSchema Examples
This producer generates JSON data that KSML validates against JsonSchema format using the schema registry:
Producer definition for JsonSchema messages (click to expand)
functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate random sensor measurement data
value = {
"name": key,
"timestamp": str(round(time.time()*1000)),
"type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
"unit": random.choice(["C", "F", "%", "Pa"]),
"value": str(random.randrange(0, 100)),
"color": random.choice(["black", "blue", "red", "yellow", "white"]),
"owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
"city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
}
log.info("Generating sensor data: {}", key)
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Indicate the type of key and value
producers:
# Produce a JsonSchema SensorData message every 3 seconds
sensordata_jsonschema_producer:
generator: generate_sensordata_message
interval: 3s
to:
topic: sensor_data_jsonschema
keyType: string
valueType: jsonschema:SensorData
Create examples/SensorData.json
schema file (JSON Schema format, manually registered via Docker service):
JsonSchema Schema for examples below (click to expand)
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://ksml.io/sensordata.json",
"title": "SensorData",
"description": "Emulated sensor data with a few additional attributes",
"type": "object",
"properties": {
"name": {
"description": "The name of the sensor",
"type": "string"
},
"timestamp": {
"description": "The timestamp of the sensor reading",
"type": "number"
},
"value": {
"description": "The value of the sensor, represented as string",
"type": "string"
},
"type": {
"description": "The type of the sensor",
"type": "string",
"enum": [
"AREA",
"HUMIDITY",
"LENGTH",
"STATE",
"TEMPERATURE"
]
},
"unit": {
"description": "The unit of the sensor",
"type": "string"
},
"color": {
"description": "The color of the sensor",
"type": "string",
"default": null
},
"city": {
"description": "The city of the sensor",
"type": "string",
"default": null
},
"owner": {
"description": "The owner of the sensor",
"type": "string",
"default": null
}
}
}
This processor transforms JsonSchema data (adds processing timestamp and uppercase sensor ID) then converts to JSON format:
JsonSchema transformation processor (click to expand)
streams:
jsonschema_input:
topic: sensor_data_jsonschema
keyType: string
valueType: jsonschema:SensorData
offsetResetPolicy: earliest
jsonschema_processed:
topic: sensor_data_jsonschema_processed
keyType: string
valueType: json
functions:
enrich_sensor_data:
type: valueTransformer
code: |
# Simple transformation: add processing timestamp
import time
result = dict(value) if value else {}
result["processed_at"] = str(int(time.time() * 1000))
# Uppercase the sensor name
if result.get("name"):
result["sensor_id"] = result["name"].upper()
expression: result
resultType: json
pipelines:
# Main processing pipeline
jsonschema_processing:
from: jsonschema_input
via:
# Apply transformation to enrich data
- type: transformValue
mapper: enrich_sensor_data
# Log the processed data
- type: peek
forEach:
code: |
log.info("Processed JsonSchema sensor: name={}, sensor_id={}, processed_at={}, full_value={}",
value.get("name") if value else "null",
value.get("sensor_id") if value else "null",
value.get("processed_at") if value else "null",
str(value))
to: jsonschema_processed
Working with JSON Data
JSON provides flexible, human-readable structured data without schema validation requirements.
This producer generates JSON sensor data directly (no format conversion needed):
Producer definition for JSON messages (click to expand)
functions:
generate_json_sensordata:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate random sensor measurement data as JSON
value = {
"name": key,
"timestamp": str(round(time.time()*1000)),
"type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
"unit": random.choice(["C", "F", "%", "Pa"]),
"value": str(random.randrange(0, 100)),
"color": random.choice(["black", "blue", "red", "yellow", "white"]),
"owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
"city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
}
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Indicate the type of key and value
producers:
# Produce JSON sensor data messages every 2 seconds
json_sensor_producer:
generator: generate_json_sensordata
interval: 2s
to:
topic: sensor_data_json_raw
keyType: string
valueType: json
This processor demonstrates key-value transformation using keyValueTransformer
to modify both message keys and values:
Processor definition for JSON messages (click to expand)
streams:
json_input:
topic: sensor_data_json_raw
keyType: string
valueType: json
offsetResetPolicy: latest
json_output:
topic: sensor_data_json_processed
keyType: string
valueType: json
functions:
add_processing_info:
type: keyValueTransformer
code: |
# Simple transformation: add processing info
import time
new_value = dict(value) if value else {}
new_value["processed"] = True
new_value["processed_at"] = str(int(time.time() * 1000))
new_key = f"processed_{key}"
expression: (new_key, new_value)
resultType: (string, json)
pipelines:
json_processing_pipeline:
from: json_input
via:
# Transform both key and value
- type: transformKeyValue
mapper: add_processing_info
# Log the transformed data
- type: peek
forEach:
code: |
log.info("Transformed: key={}, sensor={}, processed_at={}",
key, value.get("name"), value.get("processed_at"))
to: json_output
Working with CSV Data
CSV handles tabular data with schema-based column definitions and structured object access.
Create examples/SensorData.csv
schema file (defines column order):
This producer generates JSON data that KSML converts to CSV format using the schema:
Producer definition for CSV messages (click to expand)
functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate data matching SensorData.csv schema
# Schema: name,timestamp,value,type,unit,color,city,owner
value = {
"name": key,
"timestamp": str(round(time.time()*1000)),
"value": str(random.randrange(0, 100)),
"type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
"unit": random.choice(["C", "F", "%", "Pa"]),
"color": random.choice(["black", "blue", "red", "yellow", "white"]),
"owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
"city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
}
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Generate as JSON, KSML will convert to CSV
producers:
# Produce CSV sensor data messages every 3 seconds
sensordata_csv_producer:
generator: generate_sensordata_message
interval: 3s
to:
topic: ksml_sensordata_csv
keyType: string
valueType: csv:SensorData # KSML will convert JSON to CSV format
This processor demonstrates CSV data manipulation (uppercases city names) while maintaining CSV format:
Processor definition for CSV messages (click to expand)
streams:
csv_input:
topic: ksml_sensordata_csv
keyType: string
valueType: csv:SensorData
offsetResetPolicy: latest
csv_output:
topic: ksml_sensordata_csv_processed
keyType: string
valueType: csv:SensorData
functions:
uppercase_city:
type: valueTransformer
code: |
# When using csv:SensorData, the data comes as a structured object (dict)
if value and isinstance(value, dict):
# Create a copy and uppercase the city
enriched = dict(value)
if "city" in enriched:
enriched["city"] = enriched["city"].upper()
result = enriched
else:
result = value
expression: result
resultType: csv:SensorData
pipelines:
process_csv:
from: csv_input
via:
# Log the original CSV data
- type: peek
forEach:
code: |
if value:
log.info("Original: sensor={}, city={}", value.get("name"), value.get("city"))
# Transform the CSV data - uppercase city
- type: transformValue
mapper: uppercase_city
# Log the transformed CSV data
- type: peek
forEach:
code: |
if value:
log.info("Transformed: sensor={}, city={}", value.get("name"), value.get("city"))
to: csv_output
Working with XML Data
XML (eXtensible Markup Language) is a structured format for representing hierarchical data with custom tags and attributes.
- XML data is represented as nested elements with opening and closing tags
- Elements can contain text content, attributes, and child elements
- XSD (XML Schema Definition) defines the structure, data types, and constraints for XML documents
Requirements for running KSML XML definitions
To run KSML XML processing definitions below, please follow these steps:
- Save this
examples/SensorData.xsd
as XML schema:
XSD schema (click to expand)
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="SensorData">
<xs:complexType>
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="timestamp" type="xs:long"/>
<xs:element name="value" type="xs:string"/>
<xs:element name="type" type="xs:string"/>
<xs:element name="unit" type="xs:string"/>
<xs:element name="color" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="city" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="owner" type="xs:string" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
- Use this
examples/producer.yaml
that produces XML messages that our processing definition below can work with:
Producer definition for XML messages (click to expand)
functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate data matching SensorData.xsd schema
value = {
"name": key,
"timestamp": round(time.time()*1000), # timestamp as long, not string
"value": str(random.randrange(0, 100)),
"type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
"unit": random.choice(["C", "F", "%", "Pa"]),
"color": random.choice(["black", "blue", "red", "yellow", "white"]),
"city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"]),
"owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"])
}
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Generate as JSON, KSML will convert to XML
producers:
# Produce XML sensor data messages every 3 seconds
sensordata_xml_producer:
generator: generate_sensordata_message
interval: 3s
to:
topic: ksml_sensordata_xml
keyType: string
valueType: xml:SensorData # KSML will convert JSON to XML format
This processor demonstrates XML data manipulation (uppercases city names) while maintaining XML format:
Processor definition for XML messages (click to expand)
streams:
xml_input:
topic: ksml_sensordata_xml
keyType: string
valueType: xml:SensorData
offsetResetPolicy: latest
xml_output:
topic: ksml_sensordata_xml_processed
keyType: string
valueType: xml:SensorData
functions:
uppercase_city:
type: valueTransformer
code: |
# When using xml:SensorData, the data comes as a structured object (dict)
if value and isinstance(value, dict):
# Create a copy and uppercase the city
enriched = dict(value)
if "city" in enriched:
enriched["city"] = enriched["city"].upper()
result = enriched
else:
result = value
expression: result
resultType: xml:SensorData
pipelines:
process_xml:
from: xml_input
via:
# Log the original XML data
- type: peek
forEach:
code: |
if value:
log.info("Original: sensor={}, city={}", value.get("name"), value.get("city"))
# Transform the XML data - uppercase city
- type: transformValue
mapper: uppercase_city
# Log the transformed XML data
- type: peek
forEach:
code: |
if value:
log.info("Transformed: sensor={}, city={}", value.get("name"), value.get("city"))
to: xml_output
- Please note that
kowl
is not capable of deserializing XML messages and will display the value of the messages as blank. - To read the XML messages use kcat:
XML message example (click to expand)
Working with Binary Data
Binary data represents raw bytes as sequences of numeric values ranging from 0 to 255, ideal for handling non-text content like images, files, or custom protocols.
- Binary data is represented as arrays of integers where each value corresponds to a single byte
- Each byte can store values from 0-255, allowing for compact encoding of various data types
- Binary processing enables direct byte manipulation, bit-level operations, and efficient handling of structured binary formats
This producer creates simple binary messages as byte arrays (7-byte messages with counter, random bytes, and ASCII "KSML"):
Producer definition for Binary messages (click to expand)
functions:
generate_binary_message:
type: generator
globalCode: |
import random
counter = 0
code: |
global counter
key = "msg" + str(counter)
counter = (counter + 1) % 100
# Create simple binary message as list of bytes
value = [
counter % 256, # Counter byte
random.randrange(0, 256), # Random byte
ord('K'), # ASCII 'K'
ord('S'), # ASCII 'S'
ord('M'), # ASCII 'M'
ord('L'), # ASCII 'L'
random.randrange(0, 256) # Another random byte
]
log.info("Generated binary message: key={}, bytes={}", key, value)
expression: (key, value)
resultType: (string, bytes)
producers:
binary_producer:
generator: generate_binary_message
interval: 3s
to:
topic: ksml_sensordata_binary
keyType: string
valueType: bytes
This processor demonstrates binary data manipulation (increments first byte) while maintaining binary format:
Processor definition for Binary messages (click to expand)
streams:
binary_input:
topic: ksml_sensordata_binary
keyType: string
valueType: bytes
offsetResetPolicy: latest
binary_output:
topic: ksml_sensordata_binary_processed
keyType: string
valueType: bytes
functions:
increment_first_byte:
type: valueTransformer
code: |
# Simple binary manipulation: increment the first data byte
if isinstance(value, list) and len(value) > 0:
modified = list(value)
modified[0] = (modified[0] + 1) % 256 # Increment and wrap at 256
result = modified
else:
result = value
expression: result
resultType: bytes
pipelines:
process_binary:
from: binary_input
via:
# Log input binary
- type: peek
forEach:
code: |
log.info("Binary input: key={}, bytes={}", key, value)
# Modify binary data
- type: transformValue
mapper: increment_first_byte
# Log output binary
- type: peek
forEach:
code: |
log.info("Binary output: key={}, bytes={}", key, value)
to: binary_output
Working with SOAP Data
SOAP provides structured web service messaging with envelope/body format and no WSDL requirements.
This producer creates SOAP request messages with envelope/body structure (no WSDL files required):
Producer definition for SOAP messages (click to expand)
functions:
generate_soap_message:
type: generator
globalCode: |
counter = 0
code: |
global counter
key = "msg" + str(counter)
counter = (counter + 1) % 100
# Create simple SOAP message
value = {
"envelope": {
"body": {
"elements": [
{
"name": {
"localPart": "SensorRequest",
"namespaceURI": "http://example.com/ksml"
},
"value": {
"id": str(counter),
"type": "temperature"
}
}
]
}
}
}
log.info("Generated SOAP message: {}", key)
expression: (key, value)
resultType: (string, soap)
producers:
soap_producer:
generator: generate_soap_message
interval: 3s
to:
topic: ksml_soap_requests
keyType: string
valueType: soap
This processor transforms SOAP requests into SOAP responses (extracts request data and creates response with sensor values):
Processor definition for SOAP messages (click to expand)
streams:
soap_input:
topic: ksml_soap_requests
keyType: string
valueType: soap
offsetResetPolicy: latest
soap_output:
topic: ksml_soap_responses
keyType: string
valueType: soap
functions:
create_response:
type: valueTransformer
code: |
# Extract request data
request_elements = value.get("envelope", {}).get("body", {}).get("elements", [])
request_data = request_elements[0].get("value", {}) if request_elements else {}
# Create SOAP response
result = {
"envelope": {
"body": {
"elements": [
{
"name": {
"localPart": "SensorResponse",
"namespaceURI": "http://example.com/ksml"
},
"value": {
"id": request_data.get("id", "unknown"),
"temperature": "25"
}
}
]
}
}
}
expression: result
resultType: soap
pipelines:
process_soap:
from: soap_input
via:
# Log input
- type: peek
forEach:
code: |
log.info("SOAP request: key={}", key)
# Transform to response
- type: transformValue
mapper: create_response
# Log output
- type: peek
forEach:
code: |
log.info("SOAP response: key={}", key)
to: soap_output
Converting Between Data Formats
Use the convertValue
operation to transform data between formats within a single pipeline.
This producer generates Avro messages for format conversion demonstrations:
Producer definition (click to expand)
functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate data matching SensorData schema
value = {
"name": key,
"timestamp": str(round(time.time()*1000)),
"value": str(random.randrange(0, 100)),
"type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
"unit": random.choice(["C", "F", "%", "Pa"]),
"color": random.choice(["black", "blue", "red", "yellow", "white"]),
"owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
"city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
}
# Occasionally send null messages for testing
if random.randrange(20) == 0:
value = None
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Generate as JSON, KSML will convert to Avro
producers:
# Produce Avro sensor data messages every 3 seconds
sensordata_avro_producer:
generator: generate_sensordata_message
interval: 3s
to:
topic: sensor_data_avro
keyType: string
valueType: avro:SensorData
This processor demonstrates multiple format conversions (Avro → JSON → String → JSON) using convertValue
:
Processing definition for converting between multiple formats (click to expand)
streams:
avro_input:
topic: sensor_data_avro
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
json_output:
topic: sensor_data_converted_formats
keyType: string
valueType: json
pipelines:
format_conversion:
from: avro_input
via:
# Log original Avro data
- type: peek
forEach:
code: |
if value is not None:
log.info("Original Avro: sensor={}, type={}, value={}{}",
value.get("name"), value.get("type"), value.get("value"), value.get("unit"))
else:
log.info("Received null Avro message with key={}", key)
# Convert from Avro to JSON
- type: convertValue
into: json
- type: peek
forEach:
code: |
if value is not None:
log.info("Converted to JSON: sensor={}, city={}", value.get("name"), value.get("city"))
else:
log.info("JSON conversion result: null")
# Convert from JSON to string to see serialized representation
- type: convertValue
into: string
- type: peek
forEach:
code: |
if value is not None:
log.info("As string (first 100 chars): {}", str(value)[:100])
else:
log.info("String conversion result: null")
# Convert back to JSON for output
- type: convertValue
into: json
- type: peek
forEach:
code: |
if value is not None:
log.info("Final JSON output: sensor={}, owner={}", value.get("name"), value.get("owner"))
else:
log.info("Final conversion result: null")
to: json_output
Format Conversion and Multiple Formats
For comprehensive information on format conversion requirements, chaining conversions, and working with multiple formats in a single pipeline, see the Data Types and Formats Reference - Type Conversion section.
Conclusion
You've learned to work with KSML's data formats through practical examples: JSON, Avro, CSV, XML, Binary, and SOAP. Key concepts covered include format specification, schema usage, conversion operations, and multi-format pipelines.
For complete syntax reference, type definitions, and advanced format features, refer to the Data Types and Formats Reference.
Next Steps
- Logging and Monitoring for adding effective logging to pipelines
- Intermediate Tutorials for advanced KSML features
- KSML Examples for more data format examples