Producer Tutorial
KSML producers let you quickly send test messages to Kafka in different formats like JSON, Avro, Protobuf, XML, CSV, and Binary. Write simple Python code to generate your data, and KSML handles the rest - no building or compiling needed.
In this tutorial, you'll learn all the properties and features of KSML producers.
Prerequisites
Before starting this tutorial:
- Complete the Quick Start Guide
- Have the Docker Compose environment running
- Add the following topic to your
kafka-setupservice indocker-compose.ymlto run the examples:
Topic creation commands (click to expand)
Basic Producer Structure
A KSML producer definition consists of two main parts:
- Functions: Define the generator function that creates messages
- Producers: Configure how and when messages are produced
Here's the simplest possible producer:
Simplest Producer (click to expand)
This producer will generate one JSON message and stop, because no interval, count, or until condition is specified.
Producer Properties Reference
Let's explore all available properties for producer definitions:
Required Properties
generator (required)
The function that generates messages. Must be a function of type generator.
producers:
my_producer:
generator: my_generator_function # References a function defined in the functions section
to: my_topic
The generator function must return:
- A tuple
(key, value)for a single message, or - A list of tuples
[(key1, value1), (key2, value2), ...]for multiple messages
For detailed information about generator functions, see the Generator Function Reference.
to (required)
The target topic where messages will be produced. Can be specified as a simple topic name or with detailed configuration.
Simple format:
Detailed format:
producers:
my_producer:
generator: my_generator
to:
topic: sensor_data
keyType: string
valueType: json
Optional Properties
interval
The time to wait between generator calls. Supports various duration formats based on KSML's duration specification:
Duration Format: <number><unit> where unit is optional
100- 100 milliseconds (default unit)500ms- 500 milliseconds3s- 3 seconds5m- 5 minutes2h- 2 hours1d- 1 day4w- 4 weeks
Default: If not specified, and no count or until is provided, the producer enters "once mode" and generates exactly one message then stops.
Important: Specifying interval (even as 0 or 1ms) is different from omitting it entirely. See the behavior table below.
Interval Examples (click to expand)
functions:
my_generator:
type: generator
code: |
key = "key1"
value = {"message": "Hello"}
expression: (key, value)
resultType: (string, json)
producers:
# Produce every 3 seconds (indefinitely)
slow_producer:
generator: my_generator
interval: 3s
to:
topic: my_topic
keyType: string
valueType: json
# Produce as fast as possible for 100 messages
rapid_producer:
generator: my_generator
interval: 1ms # Very fast (can be 0 for no delay)
count: 100 # Must specify count or until to avoid infinite loop
to:
topic: my_topic
keyType: string
valueType: json
count
The total number of messages to produce before stopping.
Default: If not specified (and until is also not specified), produces indefinitely.
Count Example (click to expand)
functions:
my_generator:
type: generator
code: |
key = "key1"
value = {"message": "Hello"}
expression: (key, value)
resultType: (string, json)
producers:
# Produce exactly 10 messages
limited_producer:
generator: my_generator
interval: 1s
count: 10
to:
topic: my_topic
keyType: string
valueType: json
Note: The producer will stop when either count is reached OR the until condition becomes true, whichever comes first.
batchSize
The number of messages to generate in each call to the generator. This is useful for performance optimization.
- Default:
1(one message per generator call) - Range: Must be between 1 and 1000
- Validation: Values outside this range will trigger a warning and default to 1
Batch Size Example (click to expand)
functions:
batch_generator:
type: generator
globalCode: |
counter = 0
code: |
global counter
# Generate multiple messages in one call
messages = []
for i in range(10): # Generate 10 messages
key = f"sensor{counter}"
value = {"id": counter, "reading": counter * 10}
messages.append((key, value))
counter += 1
return messages # Return list of tuples
resultType: list(tuple(string, json))
producers:
batch_producer:
generator: batch_generator
interval: 5s
batchSize: 10 # Matches the generator's batch size
to:
topic: my_topic
keyType: string
valueType: json
Performance Note: Using batching can significantly improve throughput when producing large volumes of data.
condition
A predicate function that validates whether a generated message should be produced. If the condition returns false, the message is discarded and the generator is called again.
Condition Example (click to expand)
functions:
generate_random_value:
type: generator
globalCode: |
import random
code: |
value = random.randint(0, 100)
return ("sensor1", {"value": value})
resultType: (string, json)
only_high_values:
type: predicate
expression: value.get("value", 0) > 50
producers:
filtered_producer:
generator: generate_random_value
condition: only_high_values # Only produce if value > 50
interval: 1s
to:
topic: my_topic
keyType: string
valueType: json
Use Cases:
- Filtering out invalid or unwanted messages
- Implementing probabilistic message generation
until
A predicate function that determines when to stop producing. When this function returns true, the producer stops immediately.
Until Example (click to expand)
functions:
generate_sequence:
type: generator
globalCode: |
counter = 0
code: |
global counter
counter += 1
return (f"key{counter}", {"count": counter})
resultType: (string, json)
stop_at_10:
type: predicate
expression: value.get("count", 0) >= 10
producers:
sequence_producer:
generator: generate_sequence
until: stop_at_10 # Stop when count reaches 10
interval: 1s
to:
topic: my_topic
keyType: string
valueType: json
Alternative with global state:
Until with Global State Example (click to expand)
functions:
generate_with_stop:
type: generic
resultType: boolean
globalCode: |
counter = 0
done = False
def next_message():
global counter, done
counter += 1
if counter >= 10:
done = True
return (f"key{counter}", {"count": counter})
producers:
smart_producer:
generator:
code: |
return next_message()
resultType: (string, json)
until:
expression: done # Access global variable
interval: 1s
to:
topic: my_topic
keyType: string
valueType: json
Use Cases:
- Producing a predetermined dataset exactly once
- Implementing time-based or condition-based stopping logic
Producer Behavior Summary
Understanding when a producer starts and stops is crucial. Here's the complete behavior table:
| interval | count | until | Behavior |
|---|---|---|---|
| Not specified | Not specified | Not specified | Produces 1 message then stops ("once mode") |
| Specified (any value) | Not specified | Not specified | Produces indefinitely at the specified interval |
| Specified | Specified | Not specified | Produces count messages at the specified interval, then stops |
| Specified | Not specified | Specified | Produces indefinitely until until returns true, checking at each interval |
| Specified | Specified | Specified | Produces until count is reached OR until returns true (whichever comes first) |
Key Points:
- "Not specified" means the property is completely omitted from the YAML
- "Specified" means the property is included, even if set to a minimal value like
interval: 1ms - Using
intervalwithoutcountoruntilwill produce messages indefinitely. - When both
countanduntilare specified, the producer stops when either condition is met first
Working with Different Data Formats
To learn about producing messages in different formats like JSON, Avro, Protobuf, XML, CSV, and Binary, see the Different Data Formats tutorial.