Skip to content

Testing Your Pipeline

KSML includes a test runner that lets you verify your pipeline logic without a running Kafka broker. You write a YAML test definition that describes what data to send and what to assert, and the test runner handles the rest using Kafka's TopologyTestDriver.

How It Works

The test runner:

  1. Parses your KSML pipeline definition and builds a Kafka Streams topology
  2. Sends test messages into the topology's input topics
  3. Runs Python assertions against output topics and/or state stores
  4. Reports pass/fail results

Since the test runner is using Kafka's topology test driver, there is no infrastructure needed: no Kafka broker, no Schema Registry. Tests can be run on any machine.

A test definition file is a suite — it describes one pipeline plus one or more named tests that share the suite's configuration. Each test runs against a fresh TopologyTestDriver, so tests in the same suite are hermetic and do not share state.

Test Definition Format

A test definition is a flat YAML document referencing one KSML definition, the streams it touches, and one or more named tests:

name: "Filtering pipeline tests"          # optional; falls back to filename without extension
definition: path/to/pipeline.yaml         # path to the KSML pipeline definition YAML
schemaDirectory: path/to/schemas          # optional, for schemas
moduleDirectory: path/to/modules          # optional, for externalized Python modules

streams:
  sensor_source:                          # logical stream name (referenced by to: / on:)
    topic: input-topic-name
    keyType: string                       # optional, defaults to "string"
    valueType: "avro:SensorData"          # optional, defaults to "string"
  sensor_filtered:
    topic: output-topic-name
    keyType: string
    valueType: "avro:SensorData"

tests:
  blue_sensors_pass:                      # test identifier (referenced in reports)
    description: "Blue sensors pass through"   # optional, falls back to the test key
    produce:
      - to: sensor_source                 # references streams: key, not a topic name
        messages:
          - key: "my-key"
            value: { field: "value" }
            timestamp: 1709200000000      # optional, epoch millis

    assert:
      - on: sensor_filtered               # references streams: key, not a topic name
        code: |
          assert len(records) == 1
          assert records[0]["key"] == "my-key"

Suite-level fields

Field Required Description
name no Human-readable suite name shown in reports. Falls back to filename without extension.
definition yes Path to the KSML pipeline definition YAML (relative to the test file, on the classpath, or absolute).
schemaDirectory no Path to schema files. Required when any stream uses a schema-bearing notation like avro:Foo.
moduleDirectory no Path to externalized Python modules accessible to the pipeline.
streams no Map of named topic+type bindings, referenced by to: and on:. See below.
tests yes Map of named tests. Must contain at least one entry. See below.

Streams

The streams: map declares every Kafka topic the test suite produces to or asserts on. Each entry is keyed by a logical stream identifier (matching ^[a-zA-Z][a-zA-Z0-9_]*$) and binds it to a topic plus key/value types.

Field Required Default Description
topic yes Kafka topic name. Each topic may appear in at most one stream entry.
keyType no string Key serialization type (e.g. string, avro:MyKey).
valueType no string Value serialization type (e.g. string, avro:SensorData).

Type strings follow KSML's standard type grammar — the same one StreamDefinitionParser uses in pipeline yamls:

  • Schema-less notations (string, long, int, boolean, bytes, json, binary) may appear unqualified.
  • Schema-bearing notations (avro, confluent_avro, apicurio_avro) must be qualified with a schema name (e.g., avro:SensorData). The named schema is loaded from schemaDirectory and registered in an in-memory mock registry under <topic>-key / <topic>-value.
  • Bare schema-bearing notations (e.g., confluent_avro without :Schema) are rejected — the test runner has no real registry to resolve them against.

Note: The test runner supports: string, int, long, boolean, bytes, json, binary, avro, confluent_avro, apicurio_avro.

apicurio_avro is backed by a Confluent-compatible mock registry. It works for logic-level tests, but uses Confluent wire format rather than Apicurio's.

The following notations are not yet supported and will produce a runtime error if used: xml, csv, protobuf, json_schema.

Tests

The tests: map carries one or more test entries. Each key is a stable test identifier matching the same regex as stream keys; the value contains that test's produce data and assertions. Tests run in YAML-defined order; failure in one test does not stop later tests.

Field Required Description
description no Human-readable label. Falls back to the test key.
produce yes List of produce blocks.
assert yes List of assertion blocks.

Produce Blocks

Each produce block targets one stream by reference. You can define multiple produce blocks to feed data into different streams (e.g. for join tests).

Field Required Default Description
to yes Stream key (must exist in streams:).
messages * List of messages with key, value, and optional timestamp (epoch millis).
generator * Generator function (KSML generator syntax). Mutually exclusive with messages.
count no 1 Number of times to invoke the generator.

* Exactly one of messages or generator must be present.

Inline topic, keyType, or valueType fields on a produce block are not permitted — declare the stream under streams: and reference it via to:.

Assert Blocks

Each assert block runs Python code with injected variables. At least one of on or stores must be specified.

Field Required Description
on no* Stream key (must exist in streams:). The runner reads all records from the underlying topic and injects them as a records list variable, deserialized using the stream's serdes.
stores no* List of state store names (the names declared in the pipeline's stores: block). Each store is injected as a Python variable.
code yes Python assertion code using assert statements.

When on: is set, records is a list of dicts with key, value, and timestamp fields. When stores: is set, each store is available as a Python variable with the same API as in pipeline functions (e.g. store.get(key), store.put(key, value)).

Note: Avro output streams must declare the correct valueType" records[i]["value"] is a dict only when the stream's valueType in streams: matches what the pipeline actually writes. If the pipeline writes Avro but the stream is declared as valueType: string (or the field is omitted), the runner falls back to StringDeserializer. The raw Confluent wire bytes — magic byte + 4-byte schema ID + Avro payload — are returned as a string, and indexing into records[i]["value"] with a field name fails with TypeError: string indices must be integers.

Always declare `valueType: "avro:SchemaName"` (or `confluent_avro:SchemaName`) in `streams:` for any output topic that your pipeline serializes as Avro, and provide the matching `.avsc` file via `schemaDirectory`.

Reporting

Each test result is labeled <suite> › <test> where <suite> is the file's name: (or the filename without extension) and <test> is the test entry's description: (or the test key if absent). Suite-level parse failures (missing definition:, malformed YAML, undefined stream references, etc.) are reported as a single ERROR result for the whole suite.

Example: Testing a Filter Pipeline

Let's walk through testing a pipeline that filters sensor data, keeping only sensors with color "blue".

The Pipeline

Pipeline definition: test-filter.yaml (click to expand)
# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard
# other messages after logging.

streams:
  sensor_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: avro:SensorData
  sensor_filtered:
    topic: ksml_sensordata_filtered
    keyType: string
    valueType: avro:SensorData

functions:
  sensor_is_blue:
    type: predicate
    code: |
      if value == None:
        log.warn("No value in message with key={}", key)
        return False
      if value["color"] != "blue":
        log.warn("Unknown color: {}", value["color"])
        return False
    expression: True

pipelines:
  filter_pipeline:
    from: sensor_source
    via:
      - type: filter
        if: sensor_is_blue
      - type: peek
        forEach:
          code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
    to: sensor_filtered

This pipeline reads from ksml_sensordata_avro, filters messages where the sensor color is "blue", and writes the matching messages to ksml_sensordata_filtered.

The Test

Test definition: sample-filter-test.yaml (click to expand)
name: "Filter pipeline passes blue sensors"
definition: pipelines/test-filter.yaml
schemaDirectory: schemas

streams:
  sensor_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: "avro:SensorData"
  sensor_filtered:
    topic: ksml_sensordata_filtered
    keyType: string
    valueType: "avro:SensorData"

tests:
  blue_sensors_pass:
    description: "Blue sensors pass through, red ones are filtered out"
    produce:
      - to: sensor_source
        messages:
          - key: "sensor-1"
            value:
              name: "sensor-1"
              timestamp: 1000
              value: "25.0"
              type: "TEMPERATURE"
              unit: "celsius"
              color: "blue"
          - key: "sensor-2"
            value:
              name: "sensor-2"
              timestamp: 2000
              value: "60.0"
              type: "HUMIDITY"
              unit: "percent"
              color: "red"
          - key: "sensor-3"
            value:
              name: "sensor-3"
              timestamp: 3000
              value: "10.0"
              type: "LENGTH"
              unit: "meters"
              color: "blue"

    assert:
      - on: sensor_filtered
        code: |
          assert len(records) == 2, f"Expected 2 filtered records, got {len(records)}"
          assert records[0]["key"] == "sensor-1", f"Expected key 'sensor-1', got {records[0]['key']}"
          assert records[1]["key"] == "sensor-3", f"Expected key 'sensor-3', got {records[1]['key']}"

The suite declares two logical streams (sensor_source and sensor_filtered) bound to the two Kafka topics, then defines one test that produces three sensor messages (two blue, one red) into sensor_source and asserts that only the two blue sensors appear on sensor_filtered.

Example: Testing a Pipeline Using confluent_avro

If your pipeline uses confluent_avro (or apicurio_avro) — notations that would normally fetch schemas from a real registry at runtime — the suite's streams: block tells the test runner which schemas to register in the mock registry. Each stream entry's valueType: "avro:SensorData" causes SensorData.avsc to be loaded from schemaDirectory and registered under the <topic>-key / <topic>-value subjects. The KSML definition's confluent_avro types resolve from the mock registry, and the assertion can deserialize the Avro output records correctly.

The Pipeline

Pipeline definition: test-filter-confluent-avro.yaml (click to expand)
# Filter pipeline using confluent_avro (schema inferred from registry at runtime).
# This pipeline is identical to test-filter.yaml but uses registry-inferred schema types.

streams:
  sensor_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: "avro:SensorData"

functions:
  sensor_is_blue:
    type: predicate
    code: |
      if value == None:
        log.warn("No value in message with key={}", key)
        return False
      if value["color"] != "blue":
        log.warn("Unknown color: {}", value["color"])
        return False
    expression: True

pipelines:
  filter_pipeline:
    from: sensor_source
    via:
      - type: filter
        if: sensor_is_blue
    to:
      topic: ksml_sensordata_filtered
      keyType: string
      valueType: confluent_avro

The Test

Test definition: sample-filter-test-confluent-avro.yaml (click to expand)
name: "Filter pipeline with confluent_avro and registry block"
definition: pipelines/test-filter-confluent-avro.yaml
schemaDirectory: schemas

streams:
  sensor_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: "avro:SensorData"
  sensor_filtered:
    topic: ksml_sensordata_filtered
    keyType: string
    valueType: "avro:SensorData"

tests:
  blue_sensors_pass:
    produce:
      - to: sensor_source
        messages:
          - key: "sensor-1"
            value:
              name: "sensor-1"
              timestamp: 1000
              value: "25.0"
              type: "TEMPERATURE"
              unit: "celsius"
              color: "blue"
          - key: "sensor-2"
            value:
              name: "sensor-2"
              timestamp: 2000
              value: "60.0"
              type: "HUMIDITY"
              unit: "percent"
              color: "red"
          - key: "sensor-3"
            value:
              name: "sensor-3"
              timestamp: 3000
              value: "10.0"
              type: "LENGTH"
              unit: "meters"
              color: "blue"

    assert:
      - on: sensor_filtered
        code: |
          assert len(records) == 2, f"Expected 2 filtered records, got {len(records)}"
          assert records[0]["key"] == "sensor-1", f"Expected key 'sensor-1', got {records[0]['key']}"
          assert records[1]["key"] == "sensor-3", f"Expected key 'sensor-3', got {records[1]['key']}"

The same suite shape works for both avro:SensorData (where the schema name is part of the type) and confluent_avro:SensorData (where it would normally come from a real registry) — the test runner handles them uniformly through the streams: block.

Running Tests with Docker

The KSML Docker image includes the test runner at /opt/ksml/ksml-test.jar. Mount your test files and override the entrypoint:

docker run --rm \
  -v ./my-tests:/tests \
  --entrypoint java \
  axual/ksml:latest \
  -Djava.security.manager=allow -jar /opt/ksml/ksml-test.jar \
  /tests/my-test.yaml

You can pass multiple test files or directories:

docker run --rm \
  -v ./my-tests:/tests \
  --entrypoint java \
  axual/ksml:latest \
  -Djava.security.manager=allow -jar /opt/ksml/ksml-test.jar \
  /tests/

Example Output

Output for a single multi-test suite that exercises five cases against the same pipeline:

=== KSML Test Results ===

  PASS  Filtering & transforming pipeline › Valid sensor data is filtered through and transformed
  PASS  Filtering & transforming pipeline › Out-of-range temperature is filtered out
  PASS  Filtering & transforming pipeline › Sensor data without temperature is filtered out
  PASS  Filtering & transforming pipeline › Sensor data without humidity is filtered out
  PASS  Filtering & transforming pipeline › Malformed sensor data is routed to alerts_stream when transformation fails

5 passed, 0 failed, 0 errors

When a test fails or errors, the offending result also includes the assertion message or exception detail on the line below:

=== KSML Test Results ===

  PASS  Filtering & transforming pipeline › Valid sensor data is filtered through and transformed
  FAIL  Filtering & transforming pipeline › Out-of-range temperature is filtered out
        AssertionError: Expected no records on filtered_data, got 1
  PASS  Filtering & transforming pipeline › Sensor data without temperature is filtered out
  PASS  Filtering & transforming pipeline › Sensor data without humidity is filtered out
  PASS  Filtering & transforming pipeline › Malformed sensor data is routed to alerts_stream when transformation fails

4 passed, 1 failed, 0 errors

A failure in one test does not stop later tests in the same suite from running. The exit code of the runner is 0 when every result is PASS and 1 otherwise. This makes it easy to integrate into CI/CD pipelines.

Writing Assertions

Assertions use Python's assert statement. Some common patterns:

Check record count

assert len(records) == 3, f"Expected 3 records, got {len(records)}"

Check specific record values

assert records[0]["key"] == "sensor-1"
assert records[0]["value"]["color"] == "blue"

Check timestamps

assert records[0]["timestamp"] == 1709200000000

Check state store contents

# With stores: [my_store] in the assert block
value = my_store.get("sensor-1")
assert value is not None, "Expected sensor-1 in store"
assert value["temperature"] == "25.0"

Combine output records and state stores in one block

assert:
  - on: enriched_output
    stores:
      - last_seen_store
    code: |
      assert len(records) == 1
      assert last_seen_store.get(records[0]["key"]) is not None

Schema Validation for Test Files

A JSON Schema is available for test definition files at docs/ksml-test-spec.json. See the Schema Validation page for instructions on setting up editor auto-completion and validation. The schema enforces the identifier regex on stream and test keys via patternProperties, so editors will flag invalid keys as you type.

Logging

The test runner ships with a default Logback configuration that keeps output quiet: WARN for everything, INFO for the test runner itself so you still see the Running suite: ... progress lines and the final results table.

To get verbose output for one run, point Logback at a custom logback configuration file at invocation time:

docker run --rm \
  -v ./my-tests:/tests \
  --entrypoint java \
  axual/ksml:latest \
  -Dlogback.configurationFile=/tests/logback-debug.xml \
  -jar /opt/ksml/ksml-test.jar /tests/my-test.yaml