Skip to content

Producer Tutorial

KSML producers let you quickly send test messages to Kafka in different formats like JSON, Avro, Protobuf, XML, CSV, and Binary. Write simple Python code to generate your data, and KSML handles the rest - no building or compiling needed.

In this tutorial, you'll learn all the properties and features of KSML producers.

Prerequisites

Before starting this tutorial:

Topic creation commands (click to expand)
# Producer Tutorial Topics
kafka-topics.sh --create --if-not-exists --bootstrap-server broker:9093 --partitions 1 --replication-factor 1 --topic my_topic \

Basic Producer Structure

A KSML producer definition consists of two main parts:

  1. Functions: Define the generator function that creates messages
  2. Producers: Configure how and when messages are produced

Here's the simplest possible producer:

Simplest Producer (click to expand)
functions:
  simple_generator:
    type: generator
    code: |
      key = "key1"
      value = {"message": "Hello, Kafka!"}
    expression: (key, value)
    resultType: (string, json)

producers:
  my_producer:
    generator: simple_generator
    to:
      topic: my_topic
      keyType: string
      valueType: json

This producer will generate one JSON message and stop, because no interval, count, or until condition is specified.

Producer Properties Reference

Let's explore all available properties for producer definitions:

Required Properties

generator (required)

The function that generates messages. Must be a function of type generator.

producers:
  my_producer:
    generator: my_generator_function  # References a function defined in the functions section
    to: my_topic

The generator function must return:

  • A tuple (key, value) for a single message, or
  • A list of tuples [(key1, value1), (key2, value2), ...] for multiple messages

For detailed information about generator functions, see the Generator Function Reference.

to (required)

The target topic where messages will be produced. Can be specified as a simple topic name or with detailed configuration.

Simple format:

producers:
  my_producer:
    generator: my_generator
    to: my_topic 

Detailed format:

producers:
  my_producer:
    generator: my_generator
    to:
      topic: sensor_data
      keyType: string
      valueType: json

Optional Properties

interval

The time to wait between generator calls. Supports various duration formats based on KSML's duration specification:

Duration Format: <number><unit> where unit is optional

  • 100 - 100 milliseconds (default unit)
  • 500ms - 500 milliseconds
  • 3s - 3 seconds
  • 5m - 5 minutes
  • 2h - 2 hours
  • 1d - 1 day
  • 4w - 4 weeks

Default: If not specified, and no count or until is provided, the producer enters "once mode" and generates exactly one message then stops.

Important: Specifying interval (even as 0 or 1ms) is different from omitting it entirely. See the behavior table below.

Interval Examples (click to expand)
functions:
  my_generator:
    type: generator
    code: |
      key = "key1"
      value = {"message": "Hello"}
    expression: (key, value)
    resultType: (string, json)

producers:
  # Produce every 3 seconds (indefinitely)
  slow_producer:
    generator: my_generator
    interval: 3s
    to:
      topic: my_topic
      keyType: string
      valueType: json

  # Produce as fast as possible for 100 messages
  rapid_producer:
    generator: my_generator
    interval: 1ms   # Very fast (can be 0 for no delay)
    count: 100      # Must specify count or until to avoid infinite loop
    to:
      topic: my_topic
      keyType: string
      valueType: json

count

The total number of messages to produce before stopping.

Default: If not specified (and until is also not specified), produces indefinitely.

Count Example (click to expand)
functions:
  my_generator:
    type: generator
    code: |
      key = "key1"
      value = {"message": "Hello"}
    expression: (key, value)
    resultType: (string, json)

producers:
  # Produce exactly 10 messages
  limited_producer:
    generator: my_generator
    interval: 1s
    count: 10
    to:
      topic: my_topic
      keyType: string
      valueType: json

Note: The producer will stop when either count is reached OR the until condition becomes true, whichever comes first.

batchSize

The number of messages to generate in each call to the generator. This is useful for performance optimization.

  • Default: 1 (one message per generator call)
  • Range: Must be between 1 and 1000
  • Validation: Values outside this range will trigger a warning and default to 1
Batch Size Example (click to expand)
functions:
  batch_generator:
    type: generator
    globalCode: |
      counter = 0
    code: |
      global counter
      # Generate multiple messages in one call
      messages = []
      for i in range(10):  # Generate 10 messages
        key = f"sensor{counter}"
        value = {"id": counter, "reading": counter * 10}
        messages.append((key, value))
        counter += 1
      return messages  # Return list of tuples
    resultType: list(tuple(string, json))

producers:
  batch_producer:
    generator: batch_generator
    interval: 5s
    batchSize: 10  # Matches the generator's batch size
    to:
      topic: my_topic
      keyType: string
      valueType: json

Performance Note: Using batching can significantly improve throughput when producing large volumes of data.

condition

A predicate function that validates whether a generated message should be produced. If the condition returns false, the message is discarded and the generator is called again.

Condition Example (click to expand)
functions:
  generate_random_value:
    type: generator
    globalCode: |
      import random
    code: |
      value = random.randint(0, 100)
      return ("sensor1", {"value": value})
    resultType: (string, json)

  only_high_values:
    type: predicate
    expression: value.get("value", 0) > 50

producers:
  filtered_producer:
    generator: generate_random_value
    condition: only_high_values  # Only produce if value > 50
    interval: 1s
    to:
      topic: my_topic
      keyType: string
      valueType: json

Use Cases:

  • Filtering out invalid or unwanted messages
  • Implementing probabilistic message generation

until

A predicate function that determines when to stop producing. When this function returns true, the producer stops immediately.

Until Example (click to expand)
functions:
  generate_sequence:
    type: generator
    globalCode: |
      counter = 0
    code: |
      global counter
      counter += 1
      return (f"key{counter}", {"count": counter})
    resultType: (string, json)

  stop_at_10:
    type: predicate
    expression: value.get("count", 0) >= 10

producers:
  sequence_producer:
    generator: generate_sequence
    until: stop_at_10  # Stop when count reaches 10
    interval: 1s
    to:
      topic: my_topic
      keyType: string
      valueType: json

Alternative with global state:

Until with Global State Example (click to expand)
functions:
  generate_with_stop:
    type: generic
    resultType: boolean
    globalCode: |
      counter = 0
      done = False

      def next_message():
        global counter, done
        counter += 1
        if counter >= 10:
          done = True
        return (f"key{counter}", {"count": counter})

producers:
  smart_producer:
    generator:
      code: |
        return next_message()
      resultType: (string, json)
    until:
      expression: done  # Access global variable
    interval: 1s
    to:
      topic: my_topic
      keyType: string
      valueType: json

Use Cases:

  • Producing a predetermined dataset exactly once
  • Implementing time-based or condition-based stopping logic

Producer Behavior Summary

Understanding when a producer starts and stops is crucial. Here's the complete behavior table:

interval count until Behavior
Not specified Not specified Not specified Produces 1 message then stops ("once mode")
Specified (any value) Not specified Not specified Produces indefinitely at the specified interval
Specified Specified Not specified Produces count messages at the specified interval, then stops
Specified Not specified Specified Produces indefinitely until until returns true, checking at each interval
Specified Specified Specified Produces until count is reached OR until returns true (whichever comes first)

Key Points:

  • "Not specified" means the property is completely omitted from the YAML
  • "Specified" means the property is included, even if set to a minimal value like interval: 1ms
  • Using interval without count or until will produce messages indefinitely.
  • When both count and until are specified, the producer stops when either condition is met first

Working with Different Data Formats

To learn about producing messages in different formats like JSON, Avro, Protobuf, XML, CSV, and Binary, see the Different Data Formats tutorial.