FLOW MASON

Build this pipeline yourself

Open in the interactive wizard to customize and export

Open in Wizard
02

Data Validation & ETL

Schema validation, data normalization, filtering, and aggregation pipeline

beginner Data Processing

Components Used

schema_validate json_transform filter variable_set logger

The Problem

Raw data is messy. Whether it’s user imports, API responses, or database exports, real-world data has:

  • Missing fields that break downstream processing
  • Invalid formats (emails without @, negative ages)
  • Inconsistent values (status: “Active” vs “active” vs “1”)
  • No computed fields for business logic

Without validation and transformation, bad data propagates through your system causing errors, incorrect analytics, and poor user experiences.

Pain Points We’re Solving

  • Silent failures - Bad records slip through and cause issues later
  • Manual data cleaning - Hours spent fixing spreadsheets
  • No visibility - Which records failed? Why?
  • Complex filtering - “High-value active users” requires multiple conditions

Thinking Process

Let’s design an ETL pipeline that handles real-world data quality issues:

flowchart LR
    subgraph Validate["1. Validate"]
        V1["Check required fields"]
        V2["Validate formats"]
        V3["Enforce constraints"]
    end

    subgraph Transform["2. Transform"]
        T1["Add defaults"]
        T2["Normalize values"]
        T3["Compute fields"]
    end

    subgraph Filter["3. Filter"]
        F1["Active users"]
        F2["High-value users"]
    end

    subgraph Aggregate["4. Aggregate"]
        A1["Count totals"]
        A2["Calculate sums"]
        A3["Build summary"]
    end

    Validate --> Transform --> Filter --> Aggregate

Design Decisions

  1. Schema validation first - Fail fast on invalid data
  2. Parallel filtering - Active and high-value filters run simultaneously
  3. Variable storage - Store intermediate counts for summary
  4. JMESPath expressions - Powerful data transformation without code

Solution Architecture

flowchart TB
    subgraph Input["📥 Raw Records"]
        I1["8 user records"]
        I2["Mixed quality data"]
    end

    subgraph Validate["✅ Schema Validation"]
        V1["Required: id, name, email"]
        V2["email: valid format"]
        V3["age: 0-150"]
        V4["status: active|inactive|pending"]
        V5["score: 0-100"]
    end

    subgraph Transform["🔄 Transform & Normalize"]
        T1["Add default values"]
        T2["Add is_adult flag"]
        T3["Normalize format"]
    end

    subgraph FilterParallel["🔀 Parallel Filtering"]
        direction LR
        F1["Filter Active"]
        F2["Filter High Value<br/>score ≥ 80"]
    end

    subgraph Summary["📊 Summary Statistics"]
        S1["Total records"]
        S2["Active count"]
        S3["High value count"]
        S4["Value totals"]
    end

    Input --> Validate
    Validate --> Transform
    Transform --> FilterParallel
    FilterParallel --> Summary

Pipeline Stages

Stage 1: Validate Schema

Define strict rules for what valid data looks like:

{
  "id": "validate-records",
  "component": "schema_validate",
  "config": {
    "data": "{{input}}",
    "schema": {
      "type": "object",
      "required": ["records"],
      "properties": {
        "records": {
          "type": "array",
          "items": {
            "type": "object",
            "required": ["id", "name", "email"],
            "properties": {
              "id": { "type": "string" },
              "name": { "type": "string", "minLength": 1 },
              "email": { "type": "string", "format": "email" },
              "age": { "type": "integer", "minimum": 0, "maximum": 150 },
              "status": { "enum": ["active", "inactive", "pending"] },
              "score": { "type": "number", "minimum": 0, "maximum": 100 }
            }
          }
        }
      }
    },
    "collect_errors": true
  }
}

Stage 2: Transform & Normalize

Add computed fields and defaults:

{
  "id": "transform-records",
  "component": "json_transform",
  "depends_on": ["validate-records"],
  "config": {
    "data": "{{stages.validate-records.output.data.records}}",
    "expression": "[*].{id: id, name: name, email: email, age: age || `0`, status: status || 'pending', score: score || `0`, is_adult: age >= `18`, processed: `true`}"
  }
}

JMESPath Breakdown:

  • [*] - Iterate over all records
  • age || \0“ - Default to 0 if missing
  • is_adult: age >= \18“ - Computed boolean field

Stages 3-4: Parallel Filtering

Both filters run simultaneously for efficiency:

flowchart LR
    T["Transformed Data"]

    T --> F1["Filter: status == 'active'"]
    T --> F2["Filter: score >= 80"]

    F1 --> R1["Active Users"]
    F2 --> R2["High Value Users"]
{
  "id": "filter-active",
  "component": "filter",
  "depends_on": ["transform-records"],
  "config": {
    "items": "{{stages.transform-records.output}}",
    "condition": "item.status == 'active'",
    "mode": "filter_array"
  }
}
{
  "id": "filter-high-value",
  "component": "filter",
  "depends_on": ["transform-records"],
  "config": {
    "items": "{{stages.transform-records.output}}",
    "condition": "item.score >= 80",
    "mode": "filter_array"
  }
}

Stage 5: Store Count Variable

Save the active count for use in summary:

{
  "id": "store-active-count",
  "component": "variable_set",
  "depends_on": ["filter-active"],
  "config": {
    "variables": {
      "active_user_count": "{{stages.filter-active.output | length}}"
    }
  }
}

Stage 6: Build Summary

Aggregate everything into a final report:

{
  "id": "create-summary",
  "component": "json_transform",
  "depends_on": ["filter-active", "filter-high-value", "store-active-count"],
  "config": {
    "data": {
      "all_records": "{{stages.transform-records.output}}",
      "active": "{{stages.filter-active.output}}",
      "high_value": "{{stages.filter-high-value.output}}"
    },
    "expression": "{total_records: length(all_records), active_count: length(active), high_value_count: length(high_value), total_score: sum(all_records[*].score), high_value_score: sum(high_value[*].score), validation_status: 'passed'}"
  }
}

Data Transformation Visualization

stateDiagram-v2
    [*] --> Raw: Input Records
    Raw --> Validated: Schema Check
    Validated --> Transformed: Add Fields
    Transformed --> Filtered: Apply Filters
    Filtered --> Aggregated: Build Summary
    Aggregated --> [*]: Output

    state Validated {
        [*] --> CheckRequired
        CheckRequired --> CheckTypes
        CheckTypes --> CheckConstraints
        CheckConstraints --> [*]
    }

    state Transformed {
        [*] --> AddDefaults
        AddDefaults --> ComputeFields
        ComputeFields --> Normalize
        Normalize --> [*]
    }

Sample Input

{
  "records": [
    { "id": "USR001", "name": "Alice Johnson", "email": "[email protected]", "age": 28, "status": "active", "score": 92 },
    { "id": "USR002", "name": "Bob Smith", "email": "[email protected]", "age": 35, "status": "active", "score": 78 },
    { "id": "USR003", "name": "Carol White", "email": "[email protected]", "age": 42, "status": "inactive", "score": 85 },
    { "id": "USR004", "name": "David Brown", "email": "[email protected]", "age": 31, "status": "active", "score": 95 },
    { "id": "USR005", "name": "Eve Davis", "email": "[email protected]", "age": 25, "status": "pending", "score": 67 },
    { "id": "USR006", "name": "Frank Miller", "email": "[email protected]", "age": 55, "status": "active", "score": 88 },
    { "id": "USR007", "name": "Grace Lee", "email": "[email protected]", "age": 29, "status": "active", "score": 45 },
    { "id": "USR008", "name": "Henry Wilson", "email": "[email protected]", "age": 38, "status": "inactive", "score": 72 }
  ]
}

Expected Output

{
  "valid_records": [/* 8 transformed records */],
  "active_users": [/* USR001, USR002, USR004, USR006, USR007 */],
  "high_value_users": [/* USR001, USR003, USR004, USR006 */],
  "summary": {
    "total_records": 8,
    "active_count": 5,
    "high_value_count": 4,
    "total_score": 622,
    "high_value_score": 360,
    "validation_status": "passed"
  }
}

Key Learnings

1. JSON Schema Validation

ConstraintExamplePurpose
required["id", "name"]Fail on missing fields
format"email"Validate patterns
minimum/maximum0-150Enforce ranges
enum["active", "inactive"]Restrict values

2. JMESPath Power Features

// Default values
age || `0`

// Computed fields
is_adult: age >= `18`

// Aggregations
sum(records[*].score)
length(filtered)

3. Parallel Execution Benefits

The filter stages have no dependencies on each other, so FlowMason runs them simultaneously - cutting execution time in half.

Try It Yourself

fm run pipelines/02-data-validation-etl.pipeline.json \
  --input inputs/02-etl-records.json