Skip to content

Data Types and Notations Reference

KSML supports a wide range of data types and formats for both keys and values in your streams. This comprehensive reference covers all data types, notation formats, and type conversion capabilities available in KSML.

Data Types in KSML

Primitive Types

KSML supports the following primitive types:

Type Description Example Java Equivalent
boolean True or false values true, false Boolean
byte 8-bit integer 42 Byte
short 16-bit integer 1000 Short
int 32-bit integer 1000000 Integer
long 64-bit integer 9223372036854775807 Long
float Single-precision floating point 3.14 Float
double Double-precision floating point 3.141592653589793 Double
string Text string "Hello, World!" String
bytes Array of bytes Binary data byte[]
null Null value null null

Complex Types

KSML also supports several complex types that can contain multiple values:

Enum

An enumeration defines a set of allowed values.

Syntax:

valueType: enum(<value1>, <value2>, ...)   # Quotes optional

Example:

streams:
  order_status_stream:
    topic: order-statuses
    keyType: string
    valueType: enum(PENDING, PROCESSING, SHIPPED, DELIVERED, CANCELLED)  # Works without quotes
    # valueType: "enum(PENDING, PROCESSING, SHIPPED, DELIVERED, CANCELLED)"  # Also works with quotes

In Python code, an enum value is always represented as a string:

functions:
  update_status:
    type: valueTransformer
    code: |
      if value.get("shipped"):
        return "SHIPPED"
      elif value.get("processing"):
        return "PROCESSING"
    expression: "PENDING"
    resultType: string

Producer - Enum example (click to expand)
# Demonstrates enum data type usage in KSML
streams:
  order_events:
    topic: order_events
    keyType: string
    valueType: json

functions:
  generate_order_event:
    type: generator
    resultType: (string, json)
    code: |
      order = nextOrder()
      return (order["order_id"], order)
    globalCode: |
      # Define orders with enum status values
      orders = [
        {"order_id": "ORD001", "customer": "Alice", "amount": 150.00, "status": "PENDING"},
        {"order_id": "ORD002", "customer": "Bob", "amount": 75.50, "status": "PROCESSING"},
        {"order_id": "ORD003", "customer": "Charlie", "amount": 200.00, "status": "SHIPPED"},
        {"order_id": "ORD004", "customer": "Diana", "amount": 50.00, "status": "DELIVERED"},
        {"order_id": "ORD005", "customer": "Eve", "amount": 125.00, "status": "CANCELLED"},
      ]

      index = 0
      done = False

      def nextOrder():
        global index, done
        if index >= len(orders):
          done = True
          index = 0
        order = orders[index]
        index += 1
        return order

producers:
  order_producer:
    to: order_events
    interval: 1000
    until:
      expression: done
    generator: generate_order_event
Processor - Enum example (click to expand)
# Demonstrates enum data type usage and validation in KSML
streams:
  order_events:
    topic: order_events
    keyType: string
    valueType: json

  # Stream with enum type for order status
  # Enum values are represented as strings with validation
  order_status_stream:
    topic: order_statuses
    keyType: string
    valueType: enum(PENDING, PROCESSING, SHIPPED, DELIVERED, CANCELLED)

functions:
  extract_and_validate_status:
    type: valueTransformer
    resultType: string
    code: |
      # Extract status from order event
      # In Python, enum values are always represented as strings
      status = value.get("status", "PENDING")

      # Validate against allowed enum values
      valid_statuses = ["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED"]
      if status in valid_statuses:
        return status
      else:
        # Return default if invalid
        return "PENDING"

pipelines:
  # Process orders and extract enum status
  process_order_status:
    from: order_events
    via:
      # Transform JSON value to validated enum string
      - type: transformValue
        mapper: extract_and_validate_status
      - type: peek
        forEach:
          code: |
            log.info("Order {} has status: {}", key, value)
    to: order_status_stream

List

A list contains multiple elements of the same type.

Syntax:

# Function notation (recommended - avoids YAML validation warnings)
valueType: list(<element_type>) # valueType: [<element_type>] also works

Example:

streams:
  tags_stream:
    topic: tags
    keyType: string
    valueType: list(string)     # Function notation, valueType: [string] is also valid

  categories_stream:
    topic: categories
    keyType: string
    valueType: list(string)      # Alternative notation (no quotes needed)

In Python code, a list is represented as a Python list:

functions:
  extract_tags:
    type: keyValueToValueListTransformer
    expression: value.get("tags", [])
    resultType: list(string)    # Function notation, resultType: [string] is also valid

  extract_categories:
    type: keyValueToValueListTransformer
    expression: value.get("categories", [])
    resultType: list(string)    # Alternative notation (no quotes needed)

See it in action:

Example

functions:
  enhance_grades:
    type: valueTransformer
    resultType: list(int)    # Alternative to "[int]" - demonstrates list() syntax without quotes
    code: |
      grades = value.get("grades", [])
      # Add a bonus point to each grade
      enhanced_grades = [grade + 5 for grade in grades if grade < 95]
    expression: enhanced_grades

This example demonstrates using list(int) syntax in function result types to avoid YAML validation warnings:

Producer - list() syntax example (click to expand)
# Simple producer demonstrating list() and tuple() syntax alternatives

functions:
  generate_grades:
    type: generator
    resultType: tuple(string, json)  # Alternative to "(string, json)" - no quotes needed
    globalCode: |
      import random
    code: |
      student_id = f"student_{random.randint(1000, 9999)}"

      # Generate simple student grades data
      student_data = {
        "grades": [random.randint(70, 100) for _ in range(3)]
      }
    expression: (student_id, student_data)

streams:
  student_grades:
    topic: student_grades
    keyType: string
    valueType: json

producers:
  grades_producer:
    to: student_grades
    interval: 3000
    generator: generate_grades
Processor - list() syntax example (click to expand)
# Simple processor demonstrating list() syntax alternative

streams:
  student_grades:
    topic: student_grades
    keyType: string
    valueType: json

  enhanced_grades:
    topic: enhanced_grades
    keyType: string
    valueType: json

functions:
  enhance_grades:
    type: valueTransformer
    resultType: list(int)    # Alternative to "[int]" - demonstrates list() syntax without quotes
    code: |
      grades = value.get("grades", [])
      # Add a bonus point to each grade
      enhanced_grades = [grade + 5 for grade in grades if grade < 95]
    expression: enhanced_grades

pipelines:
  process_grades:
    from: student_grades
    via:
      - type: transformValue
        mapper: enhance_grades
      - type: convertValue
        into: json
      - type: peek
        forEach:
          code: |
            log.info("ENHANCED GRADES - Student: {}, enhanced_grades: {}", key, value)
    to: enhanced_grades

What this example does:

  • Producer uses resultType: tuple(string, json) instead of "(string, json)" to avoid quotes
  • Processor uses resultType: list(int) instead of "[int]" to avoid YAML validation warnings
  • Functionality remains identical - the new syntax is purely for YAML compatibility

Map

A map contains key-value pairs where keys are always strings and values are of a specified type.

Syntax:

valueType: map(<value_type>)   # Quotes optional

Example:

streams:
  user_preferences:
    topic: user-preferences
    keyType: string
    valueType: map(string)  # Map with string keys and string values (quotes optional)

  scores:
    topic: scores
    keyType: string
    valueType: map(int)     # Map with string keys and integer values, "map(int)" also valid

In Python code, a map is represented as a Python dictionary:

functions:
  create_preferences:
    type: valueTransformer
    code: |
      return {
        "theme": value.get("selected_theme", "default"),
        "language": value.get("user_language", "en"),
        "notifications": value.get("notify_enabled", "true")
      }
    expression: result
    resultType: map(string) # "map(string)" also valid

  calculate_scores:
    type: valueTransformer
    code: |
      return {
        "math": 85,
        "science": 92,
        "english": 78
      }
    expression: result
    resultType: map(int) # "map(int)" also valid 

Key characteristics:

  • Keys are always strings (this is enforced by the type system)
  • All values must be of the same type as specified in map(<value_type>)
  • Useful for representing configuration objects, dictionaries, and key-value stores

Example

streams:
  user_preferences:
    topic: user_preferences
    keyType: string
    valueType: map(string)  # Map with string keys and string values

  user_scores:
    topic: user_scores
    keyType: string
    valueType: map(int)     # Map with string keys and integer values

This simple example demonstrates using map(string) and map(int) types in stream definitions and function result types:

Producer - map example (click to expand)
# Simple producer demonstrating map(string) and map(int) data types

functions:
  generate_preferences:
    type: generator
    resultType: (string, map(string))  # Returns a map with string values
    globalCode: |
      import random
      import time
    code: |
      user_id = f"user_{random.randint(1000, 9999)}"

      # Generate preferences map with string values
      preferences = {
        "theme": random.choice(["dark", "light"]),
        "language": random.choice(["en", "es", "fr"]),
        "layout": random.choice(["grid", "list"])
      }
    expression: (user_id, preferences)

  generate_scores:
    type: generator
    resultType: (string, map(int))  # Returns a map with integer values
    globalCode: |
      import random
    code: |
      user_id = f"user_{random.randint(1000, 9999)}"

      # Generate scores map with integer values
      scores = {
        "math": random.randint(60, 100),
        "science": random.randint(60, 100),
        "english": random.randint(60, 100)
      }
    expression: (user_id, scores)

streams:
  user_preferences:
    topic: user_preferences
    keyType: string
    valueType: map(string)  # Map with string keys and string values

  user_scores:
    topic: user_scores
    keyType: string
    valueType: map(int)     # Map with string keys and integer values

producers:
  preferences_producer:
    to: user_preferences
    interval: 3000
    generator: generate_preferences

  scores_producer:
    to: user_scores
    interval: 4000
    generator: generate_scores
Processor - map example (click to expand)
# Simple processor demonstrating map(string) and map(int) usage

streams:
  user_preferences:
    topic: user_preferences
    keyType: string
    valueType: map(string)  # Map with string keys and string values

  user_scores:
    topic: user_scores
    keyType: string
    valueType: map(int)     # Map with string keys and integer values

  processed_preferences:
    topic: processed_preferences
    keyType: string
    valueType: map(string)  # Output also as map(string)

  processed_scores:
    topic: processed_scores
    keyType: string
    valueType: map(int)     # Output also as map(int)

functions:
  enhance_preferences:
    type: valueTransformer
    resultType: map(string)  # Function returns map(string)
    code: |
      # Add a status field to the preferences map
      enhanced = dict(value)  # Copy input map
      enhanced["status"] = "active"  # Add string value
    expression: enhanced

  calculate_stats:
    type: valueTransformer
    resultType: map(int)     # Function returns map(int)  
    code: |
      # Calculate some statistics from scores map
      scores = dict(value)  # Copy input map
      total = sum(scores.values())
      average = total // len(scores)  # Integer division

      stats = dict(scores)  # Start with original scores
      stats["total"] = total      # Add integer values
      stats["average"] = average
    expression: stats

pipelines:
  process_preferences:
    from: user_preferences
    via:
      - type: transformValue
        mapper: enhance_preferences
      - type: peek
        forEach:
          code: |
            log.info("PREFERENCES MAP - User: {}, prefs: {}", key, value)
    to: processed_preferences

  process_scores:
    from: user_scores  
    via:
      - type: transformValue
        mapper: calculate_stats
      - type: peek
        forEach:
          code: |
            log.info("SCORES MAP - User: {}, total: {}, average: {}", 
                   key, value.get("total"), value.get("average"))
    to: processed_scores

What this example does:

  • Stream definitions use valueType: map(string) and valueType: map(int) to define strongly-typed maps
  • Function result types use resultType: (string, map(string)) to return maps with type safety
  • Processing functions use resultType: map(string) and resultType: map(int) to transform and validate map contents
  • Demonstrates how the map(valuetype) syntax ensures all values in a map conform to the specified type

Struct

A struct is a key-value map where all keys are strings. This is the most common complex type and is used for JSON objects, Avro records, etc.

Syntax:

valueType: struct

Example:

streams:
  user_profiles:
    topic: user-profiles
    keyType: string
    valueType: struct

In Python code, a struct is represented as a dictionary:

functions:
  create_user:
    type: valueTransformer
    expression: |
      return {
        "id": value.get("user_id"),
        "name": value.get("first_name") + " " + value.get("last_name"),
        "email": value.get("email"),
        "age": value.get("age")
      }

Producer - Struct example (click to expand)
# Demonstrates struct data type usage in KSML
streams:
  user_profiles:
    topic: user_profiles
    keyType: string
    valueType: struct  # Using struct value type

functions:
  generate_user_profile:
    type: generator
    globalCode: |
      import time

      user_id = 1

      def get_user_profile():
        global user_id
        # Create a struct (dictionary) for user profile
        profile = {
          "user_id": f"USER_{user_id:03d}",
          "name": f"User Name {user_id}",
          "age": 20 + (user_id % 50),
          "email": f"user{user_id}@example.com",
          "preferences": {
            "newsletter": user_id % 2 == 0,
            "notifications": user_id % 3 != 0
          },
          "created_at": int(time.time() * 1000)
        }
        user_id = (user_id % 5) + 1  # Cycle through 5 users
        return profile
    code: |
      profile = get_user_profile()
      return (profile["user_id"], profile)
    resultType: (string, struct)  # Returning struct type

producers:
  profile_producer:
    to: user_profiles
    interval: 3000  # Generate profile every 3 seconds
    generator: generate_user_profile
Processor - Struct example (click to expand)
# Demonstrates struct data type manipulation in KSML
streams:
  user_profiles:
    topic: user_profiles
    keyType: string
    valueType: struct  # Input as struct type

  enriched_profiles:
    topic: enriched_profiles
    keyType: string
    valueType: struct  # Output as struct type

functions:
  enrich_user_profile:
    type: valueTransformer
    code: |
      # Working with struct data (dictionary)
      # Access nested struct fields
      preferences = value.get("preferences", {})

      # Create enriched struct with additional fields
      enriched = {
        "user_id": value.get("user_id"),
        "name": value.get("name"),
        "age": value.get("age"),
        "email": value.get("email"),
        "age_group": "young" if value.get("age", 0) < 30 else "adult",
        "subscription_status": "active" if preferences.get("newsletter", False) else "inactive",
        "original_preferences": preferences,
        "enriched_at": int(time.time() * 1000)
      }
      return enriched
    resultType: struct  # Function returns struct type
    globalCode: |
      import time

pipelines:
  enrich_profiles:
    from: user_profiles
    via:
      # Transform struct to enriched struct
      - type: transformValue
        mapper: enrich_user_profile

      # Log the enriched struct
      - type: peek
        forEach:
          code: |
            log.info("Enriched profile: {} - {} ({} years, {})", 
                     key, 
                     value.get("name"),
                     value.get("age"),
                     value.get("subscription_status"))

    to: enriched_profiles

Tuple

A tuple combines multiple elements of different types into a single value.

Syntax:

# Standard bracket notation
valueType: (<type1>, <type2>, ...)

# Alternative function notation (avoids YAML validation warnings)
valueType: tuple(<type1>, <type2>, ...)

Example:

streams:
  sensor_stream:
    topic: sensor-data
    keyType: string
    valueType: (string, avro:SensorData)     # Standard notation

  coordinate_stream:
    topic: coordinates
    keyType: string
    valueType: tuple(double, double)           # Alternative notation (no quotes needed)

In Python code, a tuple is represented as a Python tuple:

functions:
  create_user_age_pair:
    type: keyValueTransformer
    expression: (value.get("name"), value.get("age"))
    resultType: (string, int)               # Standard notation

  create_coordinate_pair:
    type: keyValueTransformer
    expression: (value.get("lat"), value.get("lng"))
    resultType: tuple(double, double)         # Alternative notation (no quotes needed)

See it in action:

Example

functions:
  generate_grades:
    type: generator
    resultType: tuple(string, json)  # Alternative to "(string, json)" - no quotes needed
    globalCode: |
      import random
    code: |
      student_id = f"student_{random.randint(1000, 9999)}"

      # Generate simple student grades data
      student_data = {
        "grades": [random.randint(70, 100) for _ in range(3)]
      }

This example demonstrates using tuple(string, json) syntax in function result types to avoid YAML validation warnings:

Producer - tuple() syntax example (click to expand)
# Simple producer demonstrating list() and tuple() syntax alternatives

functions:
  generate_grades:
    type: generator
    resultType: tuple(string, json)  # Alternative to "(string, json)" - no quotes needed
    globalCode: |
      import random
    code: |
      student_id = f"student_{random.randint(1000, 9999)}"

      # Generate simple student grades data
      student_data = {
        "grades": [random.randint(70, 100) for _ in range(3)]
      }
    expression: (student_id, student_data)

streams:
  student_grades:
    topic: student_grades
    keyType: string
    valueType: json

producers:
  grades_producer:
    to: student_grades
    interval: 3000
    generator: generate_grades
Processor - tuple() syntax example (click to expand)
# Simple processor demonstrating list() syntax alternative

streams:
  student_grades:
    topic: student_grades
    keyType: string
    valueType: json

  enhanced_grades:
    topic: enhanced_grades
    keyType: string
    valueType: json

functions:
  enhance_grades:
    type: valueTransformer
    resultType: list(int)    # Alternative to "[int]" - demonstrates list() syntax without quotes
    code: |
      grades = value.get("grades", [])
      # Add a bonus point to each grade
      enhanced_grades = [grade + 5 for grade in grades if grade < 95]
    expression: enhanced_grades

pipelines:
  process_grades:
    from: student_grades
    via:
      - type: transformValue
        mapper: enhance_grades
      - type: convertValue
        into: json
      - type: peek
        forEach:
          code: |
            log.info("ENHANCED GRADES - Student: {}, enhanced_grades: {}", key, value)
    to: enhanced_grades

What this example does:

  • Producer function uses resultType: tuple(string, json) instead of "(string, json)" to avoid quotes
  • Processor function uses resultType: list(int) to demonstrate both new syntaxes working together
  • No functional difference - the new syntax provides YAML-friendly alternatives

Union

A union type can be one of several possible types.

Syntax:

valueType: union(<type1>, <type2>, ...)

Example:

Union types are used in two main places in KSML:

1. In stream definitions - to specify that a stream can contain multiple types:

streams:
  optional_messages:
    topic: optional-messages
    keyType: string
    valueType: union(null, json)  # This stream accepts either null OR a JSON object

2. In function return types - to specify that a function can return multiple types:

functions:
  generate_optional:
    type: generator
    code: |
      # Can return either null or a message
      if random.random() > 0.5:
        return ("key1", {"data": "value"})
      else:
        return ("key1", None)
    resultType: (string, union(null, json))  # Returns a tuple with union type

What union types mean:

  • union(null, json) means the value can be either null OR a JSON object
  • When processing union types, your code must check which type was received and handle each case

Complete example showing both usages:

Producer - Union example (click to expand)
# Demonstrates union data type usage in KSML
streams:
  optional_messages:
    topic: optional_messages
    keyType: string
    valueType: json  # Using JSON for Kafka serialization

functions:
  generate_optional_message:
    type: generator
    globalCode: |
      import time
      import random
      message_id = 1

      def get_optional_message():
        global message_id
        # Generate union type: either a message or null
        if random.random() > 0.3:  # 70% chance of message
          message = {
            "id": message_id,
            "content": f"Message {message_id}",
            "timestamp": int(time.time() * 1000)
          }
        else:  # 30% chance of null
          message = None

        key = f"MSG_{message_id:04d}"
        message_id += 1
        return (key, message)
    code: |
      return get_optional_message()
    resultType: (string, union(null, json))  # Function returns union type

producers:
  optional_producer:
    to: optional_messages
    interval: 2000  # Generate message every 2 seconds
    generator: generate_optional_message
Processor - Union example (click to expand)
# Demonstrates processing of union types in KSML
streams:
  optional_messages:
    topic: optional_messages
    keyType: string
    valueType: union(null, json)  # Input as union type

  processed_messages:
    topic: processed_messages
    keyType: string
    valueType: json  # Output as JSON

functions:
  # This function accepts a union type
  process_optional:
    type: valueTransformer
    code: |
      # Handle union type (null or JSON message)
      if value is None:
        # Handle null case
        return {
          "status": "empty",
          "message": "No content received",
          "processed_at": int(time.time() * 1000)
        }
      else:
        # Handle JSON message case
        return {
          "status": "processed",
          "original_id": value.get("id"),
          "content_length": len(value.get("content", "")),
          "processed_at": int(time.time() * 1000)
        }
    # Function signature shows it processes union type
    resultType: json
    globalCode: |
      import time

pipelines:
  process_optional_messages:
    from: optional_messages
    via:
      # Transform the union type value
      - type: transformValue
        mapper: process_optional

      # Log the processing result
      - type: peek
        forEach:
          code: |
            status = value.get("status")
            if status == "empty":
              log.info("Received null value for key: {}", key)
            else:
              log.info("Processed message {} for key: {}", 
                       value.get("original_id"), key)

    to: processed_messages

Windowed

Windowing operations in Kafka Streams group messages together in time-based windows. KSML provides the windowed(<base_type>) syntax to work with these windowed keys.

Syntax:

# Without notation - requires manual transformation for Kafka output
keyType: windowed(<base_type>)

# With notation - automatically serializes to the specified format
keyType: <notation>:windowed(<base_type>)  # e.g., json:windowed(string), avro:windowed(string)

Understanding Windowed Keys:

After windowing operations (like windowByTime), Kafka Streams internally creates windowed keys that contain:

  • The original key value
  • Window start timestamp (milliseconds)
  • Window end timestamp (milliseconds)
  • Human-readable start/end times

Two Approaches for Handling Windowed Keys:

1. Without Notation (Manual Transformation Required):

When using plain windowed(string), the windowed keys cannot be directly serialized to Kafka topics. You must manually transform them to a regular type:

      - type: convertKey
        into: windowed(string)

2. With Notation Prefix (Automatic Serialization):

Using a notation prefix like json:windowed(string) or avro:windowed(string) enables automatic serialization of the windowed key structure:

      - type: convertKey
        into: json:windowed(string)  # Recommended: use notation prefix for automatic serialization

The notation automatically serializes the windowed key as a structured object with fields: start, end, startTime, endTime, and key.

Complete Examples:

Producer - Generates events for windowing (click to expand)
# Demonstrates generating events for windowed processing
streams:
  user_events:
    topic: user_events
    keyType: string
    valueType: json  # Using JSON for readability in Kowl UI

functions:
  generate_user_event:
    type: generator
    globalCode: |
      import time
      import random

      # Simulate events from 5 different users
      users = ["user1", "user2", "user3", "user4", "user5"]
      event_types = ["click", "view", "purchase"]

      def get_user_event():
        user = random.choice(users)
        event_type = random.choice(event_types)

        event = {
          "user": user,
          "type": event_type,
          "timestamp": int(time.time() * 1000)
        }

        return (user, event)  # Key by user for windowing
    code: |
      return get_user_event()
    resultType: (string, json)

producers:
  event_producer:
    to: user_events
    interval: 1000  # Generate event every second
    generator: generate_user_event
Processor - Manual transformation approach (click to expand)

This example shows how to manually transform windowed keys to regular strings when not using notation:

# Demonstrates windowed type usage in KSML
# 
# This example shows how to use the windowed(<base_type>) syntax with convertKey operation.
# After windowing operations, KSML internally uses windowed keys. The convertKey operation
# with 'into: windowed(string)' explicitly converts to this type for internal processing.
#
# IMPORTANT: While windowed types can be used internally and in stream definitions,
# they cannot be serialized to Kafka topics. The final transformation to regular strings
# is necessary for writing to Kafka.
streams:
  user_events:
    topic: user_events
    keyType: string
    valueType: json  # Input events

  # Output stream with regular string keys (transformed from windowed keys)
  windowed_counts:
    topic: windowed_counts  
    keyType: string  # Regular string key after transformation
    valueType: json  # JSON value containing count and window info

stores:
  event_counts_store:
    type: window
    retention: 1m  # Keep window data for 1 minute
    keyType: string
    valueType: long

pipelines:
  count_events_by_window:
    from: user_events
    via:
      # Group by key (user) for counting
      - type: groupByKey

      # Apply a 10-second tumbling window
      - type: windowByTime
        windowType: tumbling
        duration: 10s

      # Count events in each window
      - type: count
        store: event_counts_store

      # Convert to stream for processing
      - type: toStream

      # DEMONSTRATES windowed(<base_type>) USAGE:
      # Explicitly convert the key to windowed(string) type
      # This shows how KSML handles windowed keys internally
      - type: convertKey
        into: windowed(string)

      # Log the windowed counts with the windowed key type
      - type: peek
        forEach:
          code: |
            log.info("Windowed key type - User {} had {} events in window [{} - {}]", 
                     key['key'], value, key['startTime'], key['endTime'])

      # Transform windowed key to string for Kafka output
      # This is necessary because Kafka topics cannot serialize windowed keys
      - type: transformKeyValue
        mapper:
          resultType: (string, json)
          code: |
            # Extract window information from the windowed key
            # Convert to a string key format: "user_startTime_endTime"
            new_key = f"{key['key']}_{key['start']}_{key['end']}"

            # Create a JSON value with all the information
            new_value = {
                "user": key['key'],
                "count": value,
                "window_start": key['start'],
                "window_end": key['end'],
                "window_start_time": key['startTime'],
                "window_end_time": key['endTime']
            }

            log.info("Transformed to string key - User {} had {} events in window [{} - {}]", 
                     key['key'], value, key['startTime'], key['endTime'])

            return (new_key, new_value)

    # Now we can write to the topic with regular string keys
    to: windowed_counts
Processor - Automatic serialization with notation (click to expand)

This example shows the simpler approach using notation for automatic serialization:

# Demonstrates windowed type usage in KSML
# 
# This example shows how to use the windowed(<base_type>) syntax with convertKey operation.
# After windowing operations, KSML internally uses windowed keys.
#
# RECOMMENDED APPROACH:
# Use a notation prefix like 'json:windowed(string)' or 'avro:windowed(string)' instead of
# plain 'windowed(string)'. The notation automatically handles serialization of the windowed
# key structure (start, end, startTime, endTime, key) to the specified format, allowing you
# to write directly to Kafka topics without manual transformation.
#
# Example: 
#   - type: convertKey
#     into: json:windowed(string)  # Automatically serializes as JSON structure
#
# Without notation, windowed keys cannot be serialized to Kafka topics and require manual
# transformation to regular types.
streams:
  user_events:
    topic: user_events
    keyType: string
    valueType: json  # Input events

  # Output stream with JSON-serialized windowed keys
  windowed_counts:
    topic: windowed_counts  
    keyType: json  # JSON notation automatically serializes the windowed key structure
    valueType: long  # Simple count value

stores:
  event_counts_store:
    type: window
    retention: 1m  # Keep window data for 1 minute
    keyType: string
    valueType: long

pipelines:
  count_events_by_window:
    from: user_events
    via:
      # Group by key (user) for counting
      - type: groupByKey

      # Apply a 10-second tumbling window
      - type: windowByTime
        windowType: tumbling
        duration: 10s

      # Count events in each window
      - type: count
        store: event_counts_store

      # Convert to stream for processing
      - type: toStream

      # DEMONSTRATES windowed(<base_type>) USAGE WITH NOTATION:
      # Using json:windowed(string) automatically serializes windowed keys as JSON structures
      # containing: start, end, startTime, endTime, and key fields
      - type: convertKey
        into: json:windowed(string)  # Recommended: use notation prefix for automatic serialization

      # Log the windowed counts
      - type: peek
        forEach:
          code: |
            log.info("JSON Windowed key - User {} had {} events in window [{} - {}]", 
                     key['key'], value, key['startTime'], key['endTime'])

    # Now we can write to the topic with regular string keys
    to: windowed_counts

When to Use Each Approach:

  • Use notation prefix (json:windowed(string)) when you want to:

    • Write windowed keys directly to Kafka topics
    • Preserve the complete window structure in a standard format
    • Avoid manual transformation code
  • Use plain windowed type (windowed(string)) when you:

    • Only need windowed keys for internal processing
    • Want custom key formatting for output
    • Need to extract specific window information

Key Takeaway:

Windowed types enable time-based analytics like counting events per time window, calculating moving averages, or detecting patterns over time intervals. The notation prefix approach simplifies working with windowed data by handling serialization automatically.

The Any and "?" Types

KSML supports wildcard types any and ? (which are equivalent) that represent unknown or variable data types. These map internally to DataType.UNKNOWN and can only be used for function parameters when the exact type is not known at definition time. They cannot be used for stream types or function result types due to serialization and type system requirements.

Syntax:

The any and ? types can be used in:

  • Function parameters only (type: any or type: "?")
# Function parameters (SUPPORTED)
functions:
  my_function:
    type: generic
    parameters:
      - name: input
        type: any      # Accepts any type
      - name: other
        type: "?"      # Alternative syntax (quote to avoid YAML issues)
    code: |
      # Process the input parameter of unknown type
      return "processed"
    resultType: string   # Must be a concrete type

# Stream types (NOT SUPPORTED)
# valueType: any     # ❌ This will fail with "JSON serde not found"
# keyType: "?"       # ❌ This will fail with serialization error

# Function result types (NOT SUPPORTED)
# resultType: any    # ❌ This will fail with topology type checking error

Why the limitations exist:

  • Stream types: Kafka requires concrete serialization formats. The any type cannot be serialized to Kafka topics because there's no serde for unknown data types.
  • Result types: The topology type system requires concrete types for type checking and ensuring data flows correctly between operations.

Key Use Cases:

  • Generic utility functions that accept multiple data types as parameters
  • Helper functions that need to handle variable input types
  • Functions that process data generically before converting to concrete output types
Producer - Any type demonstration (click to expand)

This example demonstrates using the ? type for function parameters, showing how to create generic utility functions.

# Simple demonstration of 'any' and '?' types in KSML

streams:
  any_data:
    topic: any_data
    keyType: string
    valueType: json

functions:
  generate_data:
    type: generator
    code: |
      import random

      # Generate different JSON-compatible data structures
      if random.choice([True, False]):
        data = {"type": "text", "value": "hello world"}
      else:
        data = {"type": "number", "value": 42}

      return ("key1", data)

    resultType: (string, json)

  # Function with '?' parameter type
  describe_data:
    type: generic
    parameters:
      - name: value
        type: "?"  # Accepts any type
    code: |
      return "processed"

    resultType: string

producers:
  test_producer:
    generator: generate_data
    interval: 2s
    to: any_data
Processor - Any type processing (click to expand)

This example shows how to process data using the any type for function parameters, demonstrating type-agnostic helper functions.

# Simple demonstration of 'any' type processing

streams:
  any_input:
    topic: any_data
    keyType: string
    valueType: json

  any_output:
    topic: processed_any_data
    keyType: string
    valueType: json

functions:
  # Helper function with 'any' parameter type
  process_data:
    type: generic
    parameters:
      - name: data
        type: any  # Accepts any type
    code: |
      return {
        "processed_data": data,
        "status": "processed"
      }
    resultType: json

  # Function that uses the helper
  process_any:
    type: valueTransformer
    code: |
      # Use helper function that accepts 'any' type
      return process_data(value)

    resultType: json

pipelines:
  process_data:
    from: any_input
    via:
      - type: transformValue
        mapper: process_any
      - type: peek
        forEach:
          code: |
            log.info("Processed any type: {} = {}",
                     value.get("input_type"), value.get("input_value"))
    to: any_output

Notation Formats

KSML uses notations to allow reading/writing different message formats to Kafka topics. Notations are specified as a prefix to the schema name.

Examples

See a working example for every data format in this tutorial:

Format Selection Guide

The choice of notation depends on your specific requirements:

If you need... Consider using...
Schema evolution and backward compatibility Avro or Protobuf
Human-readable data for debugging JSON
Integration with legacy systems XML or SOAP
Simple tabular data CSV
Compact binary format Avro or Protobuf
Raw binary data handling Binary

Avro

Avro is a binary format that supports schema evolution.

Syntax:

valueType: avro:<schema_name>
# or for schema registry lookup
valueType: avro

Example:

streams:
  sensor_readings:
    topic: sensor-data
    keyType: string
    valueType: avro:SensorData

JSON

JSON is a text-based, human-readable format for data transfer.

Syntax:

# For schemaless JSON:
valueType: json
# For JSON with a schema:
valueType: json:<schema_name>

Example:

streams:
  user_profiles:
    topic: user-profiles
    keyType: string
    valueType: json

  orders:
    topic: orders
    keyType: string
    valueType: json:Order

Python functions can return JSON by returning a dictionary:

functions:
  merge_key_value_data:
    type: valueTransformer
    expression: { 'key': key, 'value': value }
    resultType: json

JSON Schema

JSON Schema adds vendor-specific schema support to JSON serialization.

Syntax:

# For schema registry lookup:
valueType: jsonschema
# For JSON with a schema:
valueType: jsonschema:<schema_name>

Example:

streams:
  user_profiles:
    topic: user-profiles
    keyType: string
    valueType: jsonschema:UserProfile

CSV

CSV (Comma-Separated Values) is a simple tabular data format.

Syntax:

# For schemaless CSV:
valueType: csv
# For CSV with a schema:
valueType: csv:<schema_name>

Example:

streams:
  sales_data:
    topic: sales-data
    keyType: string
    valueType: csv

  inventory_data:
    topic: inventory-data
    keyType: string
    valueType: csv:InventoryRecord

XML

XML (Extensible Markup Language) is used for complex hierarchical data.

Syntax:

# For schemaless XML:
valueType: xml
# For XML with a schema:
valueType: xml:<schema_name>

Example:

streams:
  customer_data:
    topic: customer-data
    keyType: string
    valueType: xml:CustomerData

Protobuf

Protobuf is a popular encoding format developed by Google.

Syntax:

# For schema registry lookup:
valueType: protobuf
# For Protobuf with a schema:
valueType: protobuf:<schema_name>

Example:

streams:
  user_profiles:
    topic: user-profiles
    keyType: string
    valueType: protobuf:UserProfile

Binary

Binary data represents raw bytes for custom protocols.

Syntax:

valueType: binary

Example:

streams:
  binary_data:
    topic: binary-messages
    keyType: string
    valueType: binary

SOAP

SOAP (Simple Object Access Protocol) is an XML-based messaging protocol.

Syntax:

valueType: soap

Example:

streams:
  service_requests:
    topic: service-requests
    keyType: string
    valueType: soap

Schema Management

When working with structured data, it's important to manage your schemas effectively.

Examples

See a working example for every type of schema in this tutorial:

Local Files vs. Schema Registry

Local Schema Files: When a schema is specified, KSML loads the schema from a local file from the schemaDirectory. The notation determines the filename extension:

  • Avro schemas: .avsc extension
  • XML schemas: .xsd extension
  • CSV schemas: .csv extension
  • JSON schemas: .json extension
streams:
  sensor_data:
    topic: sensor-reading
    keyType: string
    valueType: avro:SensorReading  # Looks for SensorReading.avsc

Schema Registry Lookup: When no schema is specified, KSML assumes the schema is loadable from Schema Registry:

streams:
  sensor_data:
    topic: sensor-reading
    keyType: string
    valueType: avro  # Schema fetched from registry

Type Conversion

KSML handles type conversion differently depending on the context:

Context Conversion Type When to Use
Functions Automatic When resultType differs from returned value
Streams Explicit When input/output stream formats differ

Function Type Conversion (Automatic)

Functions automatically convert return values to match their declared resultType when possible:

Successful Conversions:

  • Any type → string: Always works via automatic .toString() conversion
  • String → numeric types (int, long, float, double): Works only if string contains a valid numeric value (e.g., "123" → int)
  • Numeric conversions: Work between compatible numeric types (int ↔ long, float ↔ double)
  • Complex types: Dict → JSON, lists/structs/tuples with matching schemas

Failed Conversions:

  • Invalid string → numeric: Throws exception and stops processing (e.g., "not_a_number" → int fails)
  • Incompatible complex types: Mismatched schemas or structures

Example:

functions:
  string_to_int:
    type: valueTransformer
    code: |
      result = "123"        # Valid numeric string
    expression: result
    resultType: int         # ← Succeeds: converts "123" → 123

  invalid_conversion:
    type: valueTransformer
    code: |
      result = "not_a_number"  # Invalid numeric string
    expression: result
    resultType: int         # ← Fails: throws conversion exception

Working example - Automatic type conversion in functions

Producer:

# Producer for automatic type conversion example
# Generates test data for demonstrating function result type conversion

streams:
  sensor_data:
    topic: sensor_data
    keyType: string
    valueType: json

functions:
  generate_sensor_data:
    type: generator
    globalCode: |
      import random
      counter = 0
      cities = ["Amsterdam", "Rotterdam", "Utrecht", "The Hague"]
    code: |
      global counter, cities
      counter += 1

      # Create sensor data
      data = {
        "sensor_id": f"sensor_{counter}",
        "temperature": round(random.uniform(15.0, 30.0), 1),
        "humidity": round(random.uniform(40.0, 80.0), 1),
        "city": random.choice(cities),
        "timestamp": counter * 1000
      }

      return (data["sensor_id"], data)
    resultType: (string, json)

producers:
  sensor_generator:
    generator: generate_sensor_data
    interval: 2s
    to: sensor_data

Processor:

# Demonstrates automatic type conversion in KSML functions
# 
# This example shows how KSML automatically converts function return values
# to match the declared resultType, without requiring explicit conversion code.

streams:
  sensor_input:
    topic: sensor_data
    keyType: string
    valueType: json
    offsetResetPolicy: earliest

  processed_output:
    topic: processed_data
    keyType: string
    valueType: json

functions:
  # Example 1: Returns dict, automatically converted to string
  dict_to_string_auto:
    type: valueTransformer
    code: |
      # Function returns a Python dictionary
      result = {
        "sensor": key,
        "temp_fahrenheit": value.get("temperature", 0) * 1.8 + 32,
        "humidity_percent": value.get("humidity", 0),
        "location": value.get("city", "unknown").upper()
      }
      # Note: We return a dict, not a string
    expression: result
    resultType: string  # ← Automatic conversion: dict → JSON string

  # Example 2: String parsing with valid numeric conversion
  valid_string_to_int:
    type: valueTransformer
    code: |
      # Extract temperature and convert to string representation
      temp_str = str(int(value.get("temperature", 20)))
      result = temp_str  # Return valid numeric string like "20"
    expression: result
    resultType: int  # ← Automatic conversion: "20" → 20

  # Example 3: Integer to string conversion  
  int_to_string:
    type: valueTransformer
    code: |
      # At this point, value is an integer from previous conversion
      # We'll just multiply it by 2 to show we're working with an int
      result = value * 2  # Return an integer
    expression: result
    resultType: string  # ← Automatic conversion: 40 → "40"

pipelines:
  demonstrate_auto_conversion:
    from: sensor_input
    via:
      # Test automatic dict to string conversion
      - type: transformValue
        mapper: dict_to_string_auto

      - type: peek
        forEach:
          code: |
            # Value is now a string (automatic conversion happened)
            log.info("After auto dict→string: type={}, content={}", 
                     type(value).__name__, value)

      # Parse the JSON string back to dict for next operations
      - type: transformValue
        mapper:
          type: valueTransformer
          code: |
            import json
            result = json.loads(value)  # Parse JSON string back to dict
          expression: result
          resultType: json

      # Test valid string to int conversion
      - type: transformValue
        mapper: valid_string_to_int

      - type: peek
        forEach:
          code: |
            # Value is now an integer (automatic conversion happened)
            log.info("After auto string→int: type={}, value={}", 
                     type(value).__name__, value)

      # Test int to string conversion
      - type: transformValue
        mapper: int_to_string

      - type: peek
        forEach:
          code: |
            # Value is now a string (automatic conversion happened)  
            log.info("After auto int→string: type={}, value={}", 
                     type(value).__name__, value)

      # Wrap in JSON for output
      - type: transformValue
        mapper:
          type: valueTransformer
          code: |
            result = {
              "sensor_id": key,
              "converted_value": value,
              "processed": True
            }
          expression: result
          resultType: json

    to: processed_output

Stream Format Conversion (Explicit)

Streams require explicit convertValue operations when formats differ:

pipelines:
  example_pipeline:
    from: json_input      # JSON format
    via:
      - type: convertValue
        into: string      # Must explicitly convert
    to: string_output     # String format

Without convertValue, KSML will fail with a type mismatch error.

Working example - Explicit stream conversion

Producer:

# Producer for explicit format conversion example
# Generates JSON messages that will be converted to different formats

streams:
  json_messages:
    topic: json_messages
    keyType: string
    valueType: json

functions:
  generate_json_data:
    type: generator
    globalCode: |
      import random
      import time
      counter = 0
      cities = ["Amsterdam", "Rotterdam", "Utrecht", "The Hague", "Eindhoven"]
      types = ["temperature", "humidity", "pressure"]
    code: |
      global counter, cities, types
      counter += 1

      sensor_data = {
        "sensor_id": f"sensor_{counter % 10}",
        "city": random.choice(cities),
        "type": random.choice(types),
        "value": round(random.uniform(0, 100), 2),
        "unit": "%" if types == "humidity" else "°C" if types == "temperature" else "hPa",
        "timestamp": int(time.time())
      }

      return (sensor_data["sensor_id"], sensor_data)
    resultType: (string, json)

producers:
  json_generator:
    generator: generate_json_data
    interval: 2s
    to: json_messages

Processor:

# Demonstrates explicit format conversion in KSML
# 
# This example shows that KSML requires explicit conversion operations when
# outputting to a stream with a different format type. Without explicit conversion,
# KSML will fail with a type mismatch error at the pipeline sink.

streams:
  json_input:
    topic: json_messages
    keyType: string
    valueType: json
    offsetResetPolicy: earliest

  # Output stream with different format - requires explicit conversion
  string_output:
    topic: string_messages
    keyType: string
    valueType: string

pipelines:
  explicit_conversion_pipeline:
    from: json_input
    via:
      # Log the incoming JSON data
      - type: peek
        forEach:
          code: |
            log.info("JSON input - sensor: {}, city: {}, type: {}, value: {}{}",
                     value.get("sensor_id"), value.get("city"), 
                     value.get("type"), value.get("value"), value.get("unit"))

      # REQUIRED: Explicit conversion from JSON to string
      # Without this, KSML will fail with:
      # "Target topic valueType is expected to be type json, but found string"
      - type: convertValue
        into: string

      # Log the converted string data
      - type: peek
        forEach:
          code: |
            log.info("String output - sensor: {}, data: {}", key, value[:100] if len(value) > 100 else value)

    # Output to string format stream
    to: string_output

Chaining Multiple Conversions

Chain convertValue operations for complex transformations:

pipelines:
  multi_conversion:
    from: json_stream
    via:
      - type: convertValue
        into: string      # JSON → String
      - type: convertValue  
        into: json        # String → JSON
    to: json_output
Working example - Chained conversions

Producer:

# Producer for multiple format conversions example
# Generates JSON data that will be converted through multiple formats

streams:
  multi_format_data:
    topic: multi_format_data
    keyType: string
    valueType: json

functions:
  generate_data:
    type: generator
    globalCode: |
      import random
      import time
      counter = 0
    code: |
      global counter
      counter += 1

      data = {
        "id": counter,
        "product": f"product_{counter}",
        "price": round(random.uniform(10.0, 500.0), 2),
        "quantity": random.randint(1, 10),
        "available": counter % 2 == 0,
        "timestamp": int(time.time())
      }

      return (f"item_{counter}", data)
    resultType: (string, json)

producers:
  data_generator:
    generator: generate_data
    interval: 2s
    to: multi_format_data

Processor:

# Demonstrates chaining multiple format conversions in KSML
# 
# This example shows how to chain multiple convertValue operations
# to transform data through several formats in a single pipeline.

streams:
  json_input:
    topic: multi_format_data
    keyType: string
    valueType: json
    offsetResetPolicy: earliest

  final_output:
    topic: final_output
    keyType: string
    valueType: string

pipelines:
  multi_format_conversion:
    from: json_input
    via:
      # Starting with JSON format
      - type: peek
        forEach:
          code: |
            log.info("Step 1 - JSON input: id={}, product={}, price=${}", 
                     value.get("id"), value.get("product"), value.get("price"))

      # Convert JSON to String (serialization)
      - type: convertValue
        into: string

      - type: peek
        forEach:
          code: |
            log.info("Step 2 - Converted to String: {}", value[:80])

      # Convert String back to JSON (parsing)
      - type: convertValue
        into: json

      - type: peek
        forEach:
          code: |
            log.info("Step 3 - Parsed back to JSON: available={}, quantity={}", 
                     value.get("available"), value.get("quantity"))

      # Transform the data while in JSON format
      - type: transformValue
        mapper:
          type: valueTransformer
          code: |
            # Calculate total value and add metadata
            result = {
              "product_id": f"PRD-{value.get('id'):04d}",
              "name": value.get("product", "").upper(),
              "total_value": value.get("price", 0) * value.get("quantity", 0),
              "in_stock": value.get("available", False),
              "processed_at": int(time.time())
            }
          expression: result
          resultType: json
          globalCode: |
            import time

      - type: peek
        forEach:
          code: |
            log.info("Step 4 - Transformed data: product_id={}, total_value=${}", 
                     value.get("product_id"), value.get("total_value"))

      # Final conversion to String for output
      - type: convertValue
        into: string

      - type: peek
        forEach:
          code: |
            log.info("Step 5 - Final String output: {}", value)

    # Output as string format
    to: final_output

Key Takeaway: Functions convert automatically, streams need explicit conversion.

Working with Multiple Formats in a Single Pipeline

Process different data formats within one KSML definition using separate pipelines.

This producer generates both JSON config data and Avro sensor data:

Producer definition (click to expand)
functions:
  generate_device_config:
    type: generator
    globalCode: |
      import time
      import random
      configCounter = 0
    code: |
      global configCounter

      device_id = "sensor" + str(configCounter)
      configCounter = (configCounter + 1) % 10

      # Generate device configuration as JSON
      config = {
        "device_id": device_id,
        "threshold": random.randrange(50, 90),
        "alert_level": random.choice(["LOW", "MEDIUM", "HIGH"]),
        "calibration_factor": round(random.uniform(0.8, 1.2), 2),
        "last_maintenance": str(round(time.time() * 1000))
      }
    expression: (device_id, config)
    resultType: (string, json)

  generate_sensor_reading:
    type: generator
    globalCode: |
      import time
      import random
      sensorCounter = 0
    code: |
      global sensorCounter

      key = "sensor" + str(sensorCounter)
      sensorCounter = (sensorCounter + 1) % 10

      # Generate sensor reading data that will be output as Avro
      reading = {
        "name": key,
        "timestamp": str(round(time.time() * 1000)),
        "type": random.choice(["TEMPERATURE", "HUMIDITY", "PRESSURE"]),
        "unit": random.choice(["C", "F", "%", "Pa"]),
        "value": str(random.randrange(0, 100)),
        "color": random.choice(["black", "blue", "red", "yellow", "white"]),
        "owner": random.choice(["Alice", "Bob", "Charlie", "Dave", "Evan"]),
        "city": random.choice(["Amsterdam", "Utrecht", "Rotterdam", "The Hague", "Eindhoven"])
      }
    expression: (key, reading)
    resultType: (string, json)  # Generate as JSON, output as Avro

producers:
  # Produce JSON device configuration every 10 seconds
  device_config_producer:
    generator: generate_device_config
    interval: 10s
    to:
      topic: device_config
      keyType: string
      valueType: json

  # Produce Avro sensor readings every 3 seconds
  sensor_reading_producer:
    generator: generate_sensor_reading
    interval: 3s
    to:
      topic: sensor_readings
      keyType: string
      valueType: avro:SensorData

This processor shows two pipelines handling different formats (Avro and JSON) and combining results:

Processor definition for working with multiple formats in a single pipeline (click to expand)
streams:
  avro_sensor_stream:
    topic: sensor_readings
    keyType: string
    valueType: avro:SensorData
    offsetResetPolicy: latest

  json_config_stream:
    topic: device_config
    keyType: string
    valueType: json
    offsetResetPolicy: latest

  combined_output:
    topic: combined_sensor_data
    keyType: string
    valueType: json

pipelines:
  # Pipeline 1: Process Avro data and convert to JSON
  avro_processing:
    from: avro_sensor_stream
    via:
      # Log Avro input
      - type: peek
        forEach:
          code: |
            log.info("Avro sensor: name={}, type={}, value={}{}",
                    value.get("name"), value.get("type"),
                    value.get("value"), value.get("unit"))

      # Add a source field to identify the format
      - type: transformValue
        mapper:
          type: valueTransformer
          code: |
            result = dict(value) if value else {}
            result["source_format"] = "Avro"
          expression: result
          resultType: json

    to: combined_output

  # Pipeline 2: Process JSON config data
  json_processing:
    from: json_config_stream
    via:
      # Log JSON input
      - type: peek
        forEach:
          code: |
            log.info("JSON config: device={}, threshold={}, alert={}",
                    key, value.get("threshold"), value.get("alert_level"))

      # Transform to sensor-like format with source field
      - type: transformValue
        mapper:
          type: valueTransformer
          code: |
            result = {
              "name": key,
              "type": "CONFIG",
              "threshold": value.get("threshold"),
              "alert_level": value.get("alert_level"),
              "source_format": "JSON"
            }
          expression: result
          resultType: json

    to: combined_output

Type Definition Quoting Rules

In KSML, quotes around type definitions are always optional. KSML can parse all type expressions correctly whether they have quotes or not. The choice to use quotes is purely a matter of style and preference.

All Type Expressions Work Without Quotes:

# Basic types
keyType: string
valueType: json
resultType: int

# Function-style types
valueType: enum(PENDING, PROCESSING, SHIPPED)
valueType: map(string)
keyType: windowed(string)
resultType: list(int)
resultType: tuple(string, json)

# Complex expressions
valueType: union(null, string)
resultType: list(tuple(string, json)) # [(string, json)] also valid
resultType: (string, json)

# Notation prefixes (with colons)
valueType: avro:SensorData
keyType: protobuf:UserProfile

# With quotes (also valid)
resultType: list(tuple(string, json)) # [(string, json)] also valid
valueType: enum(PENDING, SHIPPED)

YAML Syntax Highlighting Note

Some YAML syntax highlighters may incorrectly interpret bracket notation like [(string, json)], expecting proper array syntax.

For better highlighting, use quotes "[(string, json)]" or the cleaner list(tuple(string, json)) syntax.

Summary:

All type expressions work without quotes in KSML. Use quotes only if you prefer them for style, but they are never functionally required. For bracket notation, consider using the list() function syntax for cleaner, more readable code.