FLOW MASON

Build this pipeline yourself

Open in the interactive wizard to customize and export

Open in Wizard
03

Multi-API Aggregator

Parallel API calls with data transformation and aggregation

intermediate API Integration

Components Used

http_request json_transform logger

The Problem

Modern applications rely on data from multiple sources. A user profile view might need:

  • User details from the authentication service
  • Posts/activity from the content service
  • Tasks/todos from the productivity service

Making these calls sequentially wastes time. If each takes 200ms, three calls take 600ms. But they’re independent - why wait?

Pain Points We’re Solving

  • Slow page loads - Sequential API calls compound latency
  • Complex orchestration - Managing parallel requests is error-prone
  • Data silos - Information scattered across services
  • Inconsistent responses - Each API has different formats

Thinking Process

We need to design a pipeline that:

flowchart TB
    subgraph Strategy["Design Strategy"]
        S1["1. Identify independent calls"]
        S2["2. Execute in parallel"]
        S3["3. Transform each response"]
        S4["4. Aggregate results"]
    end

    S1 --> S2 --> S3 --> S4

Key Insight: Wave-Based Execution

FlowMason automatically parallelizes stages with no dependencies:

gantt
    title Sequential vs Parallel Execution
    dateFormat X
    axisFormat %L ms

    section Sequential
    Fetch User    :0, 200
    Fetch Posts   :200, 400
    Fetch Todos   :400, 600

    section Parallel
    Fetch User    :0, 200
    Fetch Posts   :0, 200
    Fetch Todos   :0, 200

Result: 3x faster with the same code!

Solution Architecture

flowchart TB
    subgraph Input["📥 Input"]
        I1["user_id: 1"]
    end

    subgraph Parallel["⚡ Parallel HTTP Requests"]
        direction LR
        H1["GET /users/1"]
        H2["GET /posts?userId=1"]
        H3["GET /todos?userId=1"]
    end

    subgraph Transform["🔄 Transform Responses"]
        direction LR
        T1["Extract user fields"]
        T2["Map post previews"]
        T3["Group todos by status"]
    end

    subgraph Aggregate["📊 Aggregate"]
        A1["Combine all data"]
        A2["Add statistics"]
        A3["Verify all succeeded"]
    end

    Input --> Parallel
    H1 --> T1
    H2 --> T2
    H3 --> T3
    T1 --> Aggregate
    T2 --> Aggregate
    T3 --> Aggregate

Pipeline Stages

Stage 1-3: Parallel HTTP Requests

All three requests execute simultaneously:

sequenceDiagram
    participant P as Pipeline
    participant U as Users API
    participant Po as Posts API
    participant T as Todos API

    par Parallel Requests
        P->>U: GET /users/1
        P->>Po: GET /posts?userId=1
        P->>T: GET /todos?userId=1
    end

    U-->>P: User data
    Po-->>P: Posts array
    T-->>P: Todos array

    Note over P: All responses received ~200ms
{
  "id": "fetch-user",
  "component": "http-request",
  "config": {
    "url": "https://jsonplaceholder.typicode.com/users/{{input.user_id}}",
    "method": "GET",
    "timeout": 30000
  }
}
{
  "id": "fetch-posts",
  "component": "http-request",
  "config": {
    "url": "https://jsonplaceholder.typicode.com/posts?userId={{input.user_id}}",
    "method": "GET",
    "timeout": 30000
  }
}
{
  "id": "fetch-todos",
  "component": "http-request",
  "config": {
    "url": "https://jsonplaceholder.typicode.com/todos?userId={{input.user_id}}",
    "method": "GET",
    "timeout": 30000
  }
}

Stage 4: Transform User Data

Extract the fields we need:

{
  "id": "transform-user",
  "component": "json_transform",
  "depends_on": ["fetch-user"],
  "config": {
    "data": "{{stages.fetch-user.output.body}}",
    "expression": "{id: id, name: name, username: username, email: email, company: company.name, city: address.city, website: website}"
  }
}

Stage 5: Transform Posts

Create preview snippets:

{
  "id": "transform-posts",
  "component": "json_transform",
  "depends_on": ["fetch-posts"],
  "config": {
    "data": "{{stages.fetch-posts.output.body}}",
    "expression": "[*].{id: id, title: title, body_preview: body[0:100]}"
  }
}

Stage 6: Group Todos

Organize by completion status:

{
  "id": "transform-todos",
  "component": "json_transform",
  "depends_on": ["fetch-todos"],
  "config": {
    "data": "{{stages.fetch-todos.output.body}}",
    "expression": "{all: @, completed: [?completed == `true`], pending: [?completed == `false`]}"
  }
}

Stage 7: Aggregate All Data

Combine into unified response:

{
  "id": "aggregate-data",
  "component": "json_transform",
  "depends_on": ["transform-user", "transform-posts", "transform-todos"],
  "config": {
    "data": {
      "user": "{{stages.transform-user.output}}",
      "posts": "{{stages.transform-posts.output}}",
      "todos": "{{stages.transform-todos.output}}"
    },
    "expression": "{user: user, posts: posts, todos: todos, summary: {post_count: length(posts), todo_count: length(todos.all), completed_todos: length(todos.completed), pending_todos: length(todos.pending)}, api_status: 'all_succeeded'}"
  }
}

Execution Timeline

flowchart LR
    subgraph Wave1["Wave 1 (0-200ms)"]
        direction TB
        W1A["fetch-user"]
        W1B["fetch-posts"]
        W1C["fetch-todos"]
    end

    subgraph Wave2["Wave 2 (200-210ms)"]
        direction TB
        W2A["transform-user"]
        W2B["transform-posts"]
        W2C["transform-todos"]
    end

    subgraph Wave3["Wave 3 (210-215ms)"]
        W3A["aggregate-data"]
    end

    subgraph Wave4["Wave 4 (215-220ms)"]
        W4A["log-summary"]
    end

    Wave1 --> Wave2 --> Wave3 --> Wave4

Sample Input

{
  "user_id": "1",
  "include_posts": true,
  "include_todos": true
}

Expected Output

{
  "user": {
    "id": 1,
    "name": "Leanne Graham",
    "username": "Bret",
    "email": "[email protected]",
    "company": "Romaguera-Crona",
    "city": "Gwenborough",
    "website": "hildegard.org"
  },
  "posts": [
    {
      "id": 1,
      "title": "sunt aut facere repellat provident...",
      "body_preview": "quia et suscipit\nsuscipit recusandae..."
    }
    // ... more posts
  ],
  "todos": {
    "all": [/* 20 todos */],
    "completed": [/* 11 completed */],
    "pending": [/* 9 pending */]
  },
  "summary": {
    "post_count": 10,
    "todo_count": 20,
    "completed_todos": 11,
    "pending_todos": 9
  },
  "api_status": "all_succeeded"
}

Key Learnings

1. Parallel Execution Pattern

flowchart LR
    A[Input] --> B[Stage 1]
    A --> C[Stage 2]
    A --> D[Stage 3]
    B --> E[Aggregate]
    C --> E
    D --> E

No depends_on between stages = automatic parallelization.

2. Response Transformation

API ResponseTransformed
Nested objectsFlattened fields
Full contentPreview snippets
Flat arrayGrouped by status

3. Error Handling Consideration

In production, wrap each HTTP request in trycatch to handle individual API failures gracefully (see Demo 5).

Try It Yourself

fm run pipelines/03-api-aggregator.pipeline.json \
  --input inputs/03-api-aggregator.json