Skip to content

Data Transformation with KSML

This tutorial demonstrates how to build a data transformation pipeline using KSML. You'll learn how to convert data between different formats, enrich data with additional information, and handle complex transformations.

Introduction

Data transformation is a fundamental use case for stream processing. It allows you to:

  • Convert data between different formats (JSON, XML, Avro, etc.)
  • Normalize and clean data from various sources
  • Enrich data with additional context or reference information
  • Restructure data to meet downstream system requirements
  • Filter out unnecessary information

In this tutorial, we'll build a data transformation pipeline that processes customer data from a legacy system, transforms it into a standardized format, enriches it with additional information, and makes it available for downstream applications.

Prerequisites

Before starting this tutorial, you should:

The Use Case

Imagine you're working with a company that has acquired another business. You need to integrate customer data from the acquired company's legacy system into your modern data platform. The legacy data:

  • Is in a different format (XML) than your system (JSON)
  • Uses different field names and conventions
  • Contains some fields you don't need
  • Is missing some information that you need to add from reference data

Defining the Data Models

Source Data (XML)

The legacy system provides customer data in XML format:

<customer>
    <cust_id>12345</cust_id>
    <fname>John</fname>
    <lname>Doe</lname>
    <dob>1980-01-15</dob>
    <addr>
        <street>123 Main St</street>
        <city>Anytown</city>
        <state>CA</state>
        <zip>90210</zip>
    </addr>
    <phone>555-123-4567</phone>
    <legacy_segment>A</legacy_segment>
    <account_created>2015-03-20</account_created>
</customer>

Reference Data (JSON)

You have a reference table topic with segment code (key) mappings to segment details (value):

Key Value
A {"segment_name": "Premium", "discount_tier": "Tier 1", "marketing_group": "High Value"}
B {"segment_name": "Standard", "discount_tier": "Tier 2", "marketing_group": "Medium Value"}
C {"segment_name": "Basic", "discount_tier": "Tier 3", "marketing_group": "Growth Target"}

Target Data (JSON)

You want to transform the data into this format:

{
  "customer_id": "12345",
  "name": {
    "first": "John",
    "last": "Doe"
  },
  "contact_info": {
    "email": "john.doe@example.com",
    "phone": "555-123-4567",
    "address": {
      "street": "123 Main St",
      "city": "Anytown",
      "state": "CA",
      "postal_code": "90210",
      "country": "USA"
    }
  },
  "birth_date": "1980-01-15",
  "customer_since": "2015-03-20",
  "segment": "Premium",
  "marketing_preferences": {
    "group": "High Value",
    "discount_tier": "Tier 1"
  },
  "metadata": {
    "source": "legacy_system",
    "last_updated": "2023-07-01T12:00:00Z"
  }
}

Creating the KSML Definition

Now, let's create our KSML definition file:

Data transformation processor (click to expand)
name: "Data Transformation Use Case"
version: "1.0"
description: |
  This definition shows how to take legacy customer data in XML, join it with a customer segments table on Kafka, and
  write out the customer data in another structure as JSON.

streams:
  legacy_customers:
    topic: legacy_customer_data
    keyType: string  # customer_id
    valueType: xml  # XML customer data

  transformed_customers:
    topic: standardized_customer_data
    keyType: string  # customer_id
    valueType: json  # transformed customer data

tables:
  segment_reference:
    topic: customer_segments
    keyType: string  # segment code
    valueType: json  # segment details

functions:
  transform_customer:
    type: valueTransformer
    code: |
      import datetime

      # Extract values from XML
      customer_id = value.get("cust_id")
      first_name = value.get("fname")
      last_name = value.get("lname")
      birth_date = value.get("dob")
      phone = value.get("phone")
      legacy_segment = value.get("legacy_segment")
      customer_since = value.get("account_created")

      # Extract address
      address = value.get("addr")
      street = address.get("street")
      city = address.get("city")
      state = address.get("state")
      zip_code = address.get("zip")

      # Generate email (not in source data)
      email = f"{first_name.lower()}.{last_name.lower()}@example.com"

      # Get segment info from reference data
      segment_info = value.get("segment_info")
      segment_name = segment_info.get("segment_name") if segment_info else "Unknown"
      discount_tier = segment_info.get("discount_tier") if segment_info else "Unknown"
      marketing_group = segment_info.get("marketing_group") if segment_info else "Unknown"

      # Current timestamp for metadata
      current_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")

      # Create transformed customer object
      return {
        "customer_id": customer_id,
        "name": {
          "first": first_name,
          "last": last_name
        },
        "contact_info": {
          "email": email,
          "phone": phone,
          "address": {
            "street": street,
            "city": city,
            "state": state,
            "postal_code": zip_code,
            "country": "USA"  # Default value not in source
          }
        },
        "birth_date": birth_date,
        "customer_since": customer_since,
        "segment": segment_name,
        "marketing_preferences": {
          "group": marketing_group,
          "discount_tier": discount_tier
        },
        "metadata": {
          "source": "legacy_system",
          "last_updated": current_time
        }
      }
    resultType: json

pipelines:
  customer_transformation_pipeline:
    from: legacy_customers
    via:
      - type: peek
        forEach:
          code: |
            log.info("Processing customer: {}", key)
      - type: transformKey
        mapper:
          expression: value.get("legacy_segment")
          resultType: string
      - type: leftJoin
        table: segment_reference
        valueJoiner:
          expression: |
            {
              **value1,
              "segment_info": value2
            }
          resultType: json
      - type: transformValue
        mapper: transform_customer
      - type: transformKey
        mapper:
          expression: value.get("customer_id")
          resultType: string
      - type: peek
        forEach:
          code: |
            log.info("Transformed customer: {} = {}", key, value)
    to: transformed_customers

Setting up two producers for test data

To test out the topology above, we create two test data producers.

The first producer is a single shot producer that generates data for the customer_segments topic:

Segment data producer (click to expand)
name: "Segment Data Producer"
version: "1.0"
description: |
  This producer is part of the Data Transformation Use Case in KSML documentation. It produces customer segment data to
  a topic in JSON format.

functions:
  customer_segment_generator:
    globalCode: |
      import random
      count = 0
    code: |
      global count
      refs = {
        0: {"key": "A", "value": {"segment_name": "Premium", "discount_tier": "Tier 1", "marketing_group": "High Value"}},
        1: {"key": "B", "value": {"segment_name": "Standard", "discount_tier": "Tier 2", "marketing_group": "Medium Value"}},
        2: {"key": "C", "value": {"segment_name": "Basic", "discount_tier": "Tier 3", "marketing_group": "Growth Target" }}
      }
      key = refs.get(count)["key"]
      value = refs.get(count)["value"]
      count = (count + 1) % 3
      return (key, value)
    resultType: (string, struct)

producers:
  customer_segment_producer:
    generator: customer_segment_generator
    interval: 10s
    count: 3
    to:
      topic: customer_segments
      keyType: string  # segment code
      valueType: json  # segment details

The second producer produces a message every second to the legacy_customer_data topic, using a randomly chosen segment:

Customer data producer (click to expand)
name: "Customer Data Producer"
version: "1.0"
description: |
  This producer is part of the Data Transformation Use Case in KSML documentation. It produces legacy customer data to
  a topic in XML format.

functions:
  legacy_customer_data_generator:
    globalCode: |
      import random
    code: |
      value = {
        "cust_id": random.randrange(100000),
        "fname": "John",
        "lname": "Doe",
        "dob": "1980-01-15",
        "addr": {
          "street": "123 Main St",
          "city": "Anytown",
          "state": "CA",
          "zip": "90210"
        },
        "phone": "555-123-4567",
        "legacy_segment": random.choice(["A","B","C"]),
        "account_created": "2015-03-20"
      }
      key = value.get("cust_id")
      return (key, value)
    resultType: (string, struct)

producers:
  legacy_customer_data_producer:
    generator: legacy_customer_data_generator
    interval: 1s
    to:
      topic: legacy_customer_data
      keyType: string  # customer_id
      valueType: xml  # XML customer data

Running the Application

To run the application:

  1. Save the processor definition to data_transformation.yaml.
  2. Save the producers to segment-data-producer.yaml and customer-data-producer.yaml.
  3. Set up your ksml-runner.yaml configuration, pointing to your Kafka installation.
KSML runner configuration (click to expand)
ksml:
  enableProducers: true              # Set to true to allow producer definitions to be parsed in the KSML definitions and be executed.
  enablePipelines: true              # Set to true to allow pipeline definitions to be parsed in the KSML definitions and be executed.
  definitions:
    segment_data_producer: segment-data-producer.yaml
    customer_data_producer: customer-data-producer.yaml
    data_transformation: data-transformation.yaml
kafka:
  bootstrap.servers: broker:9093
  application.id: your.app.id
  1. Start the customer_segment_producer to produce the sample segment information to Kafka.
  2. Start the legacy_customer_data_producer to produce some sample data to the input topic.
  3. Start the data_transformation topology to initiate the continuous data transformation logic.
  4. Monitor the output topic to see the transformed data.

Advanced Transformation Techniques

Handling Missing or Invalid Data

In real-world scenarios, source data often has missing or invalid fields. You can enhance the transformation function to handle these cases:

# Check if a field exists before accessing it
birth_date = value.get("dob") if value.get("dob") is not None else "Unknown"

# Provide default value in case dictionary key does not exist
state = address.get("state", "N/A")

# Validate data
if phone and not phone.startswith("555-"):
    log.warn("Invalid phone format for customer {}: {}", customer_id, phone)
    phone = "Unknown"

Schema Evolution Handling

To handle changes in the source or target schema over time:

# Version-aware transformation
schema_version = value.get("version", "1.0")
if schema_version == "1.0":
    # Original transformation logic
    value = transform_v1(value)
elif schema_version == "2.0":
    # Updated transformation logic for new schema
    value = transform_v2(value)
else:
    log.error("Unknown schema version: {}", schema_version)

Conclusion

In this tutorial, you've learned how to:

  • Transform data between different formats (XML to JSON)
  • Restructure data to match a target schema
  • Enrich data with information from reference sources
  • Handle missing or derived fields
  • Process and validate data during transformation

Data transformation is a powerful use case for KSML, allowing you to integrate data from various sources and prepare it for downstream applications without complex coding.

Next Steps