FLOW MASON

Build this pipeline yourself

Open in the interactive wizard to customize and export

Open in Wizard
12

Multi-Service API Orchestration

Coordinate multiple APIs with parallel calls, data transformation, and AI enrichment

intermediate Integration & APIs

Components Used

http_request schema_validate json_transform generator logger
Multi-Service API Orchestration Pipeline Visualization

The Problem

Building a “customer 360” view requires data from 5+ sources:

  • Salesforce - Account details, deal stage
  • Stripe - Payment history, subscription status
  • Intercom - Support tickets, conversation history
  • Your database - Usage metrics, feature flags
  • Enrichment API - Company data, firmographics

Writing this integration means:

  • 400 lines of Python with nested try/catch
  • Each API has different auth (OAuth, API key, JWT)
  • Rate limits hit at different thresholds
  • If Stripe is slow, the whole thing times out
  • When Intercom changes their API, the script breaks silently

Pain Points We’re Solving

  • Brittle integrations - One API change breaks everything
  • No observability - Which step failed? What did it return?
  • Sequential calls - 5 APIs × 200ms = 1 second (could be 200ms)
  • Scattered logic - Data transformation mixed with API calls

Thinking Process

flowchart TB
    subgraph Strategy["Design Strategy"]
        S1["1. Validate input schema"]
        S2["2. Parallel API calls (independent)"]
        S3["3. Transform each response"]
        S4["4. Merge into unified model"]
        S5["5. AI enrichment (optional)"]
        S6["6. Validate output schema"]
    end

    S1 --> S2 --> S3 --> S4 --> S5 --> S6

Key Insight: Declarative Integration

Instead of imperative code (fetch, parse, transform, merge), declare what you need and let FlowMason handle the orchestration, retries, and parallelization.

Solution Architecture

flowchart TB
    subgraph Input["Input"]
        I1["customer_id"]
    end

    subgraph Parallel["Parallel Fetch (200ms total)"]
        direction LR
        P1["Salesforce"]
        P2["Stripe"]
        P3["Intercom"]
        P4["Internal DB"]
    end

    subgraph Transform["Transform"]
        direction TB
        T1["Normalize fields"]
        T2["Calculate metrics"]
        T3["Merge responses"]
    end

    subgraph Enrich["AI Enrich"]
        E1["Summarize history"]
        E2["Risk assessment"]
    end

    subgraph Output["Output"]
        O1["Customer 360 JSON"]
    end

    Input --> Parallel
    Parallel --> Transform
    Transform --> Enrich
    Enrich --> Output

Pipeline Stages

Stage 1: Validate Input

Catch bad requests before making API calls:

{
  "id": "validate-input",
  "component": "schema-validate",
  "config": {
    "data": "{{input}}",
    "schema": {
      "type": "object",
      "required": ["customer_id"],
      "properties": {
        "customer_id": {
          "type": "string",
          "pattern": "^cust_[a-zA-Z0-9]{14}$"
        },
        "include_history": {
          "type": "boolean",
          "default": true
        },
        "enrichment_level": {
          "type": "string",
          "enum": ["basic", "full"],
          "default": "basic"
        }
      }
    }
  }
}

Stage 2: Fetch Configuration

Get the integration config (endpoints, field mappings):

{
  "id": "fetch-config",
  "component": "http-request",
  "depends_on": ["validate-input"],
  "config": {
    "url": "{{input.config_api}}/integrations/customer-360",
    "method": "GET",
    "headers": {
      "Authorization": "Bearer {{secrets.CONFIG_TOKEN}}"
    },
    "timeout": 5000
  }
}

Stages 3-6: Parallel API Calls

All four APIs called simultaneously:

{
  "id": "fetch-salesforce",
  "component": "http-request",
  "depends_on": ["validate-input"],
  "config": {
    "url": "{{secrets.SALESFORCE_URL}}/services/data/v58.0/sobjects/Account/{{input.customer_id}}",
    "method": "GET",
    "headers": {
      "Authorization": "Bearer {{secrets.SALESFORCE_TOKEN}}"
    },
    "timeout": 10000,
    "retry": {
      "max_attempts": 3,
      "delay_seconds": 1,
      "backoff_multiplier": 2
    }
  }
}
{
  "id": "fetch-stripe",
  "component": "http-request",
  "depends_on": ["validate-input"],
  "config": {
    "url": "https://api.stripe.com/v1/customers/{{input.customer_id}}",
    "method": "GET",
    "headers": {
      "Authorization": "Bearer {{secrets.STRIPE_SECRET_KEY}}"
    },
    "query_params": {
      "expand[]": ["subscriptions", "invoice_settings"]
    },
    "timeout": 10000,
    "retry": {
      "max_attempts": 3,
      "delay_seconds": 1,
      "condition": "{{output.status_code == 429}}"
    }
  }
}
{
  "id": "fetch-intercom",
  "component": "http-request",
  "depends_on": ["validate-input"],
  "config": {
    "url": "https://api.intercom.io/contacts/{{input.customer_id}}",
    "method": "GET",
    "headers": {
      "Authorization": "Bearer {{secrets.INTERCOM_TOKEN}}",
      "Intercom-Version": "2.10"
    },
    "timeout": 10000,
    "retry": {
      "max_attempts": 3,
      "delay_seconds": 2
    }
  }
}
{
  "id": "fetch-internal",
  "component": "http-request",
  "depends_on": ["validate-input"],
  "config": {
    "url": "{{input.internal_api}}/customers/{{input.customer_id}}/usage",
    "method": "GET",
    "headers": {
      "Authorization": "Bearer {{secrets.INTERNAL_TOKEN}}"
    },
    "timeout": 5000
  }
}

Stages 7-10: Transform Each Response

Normalize to a common schema:

{
  "id": "transform-salesforce",
  "component": "json_transform",
  "depends_on": ["fetch-salesforce"],
  "config": {
    "data": "{{stages.fetch-salesforce.output.body}}",
    "expression": "{source: 'salesforce', company_name: Name, industry: Industry, employee_count: NumberOfEmployees, annual_revenue: AnnualRevenue, deal_stage: StageName, owner: Owner.Name, created_at: CreatedDate}"
  }
}
{
  "id": "transform-stripe",
  "component": "json_transform",
  "depends_on": ["fetch-stripe"],
  "config": {
    "data": "{{stages.fetch-stripe.output.body}}",
    "expression": "{source: 'stripe', email: email, currency: currency, balance: balance, subscriptions: subscriptions.data[*].{id: id, status: status, plan: plan.nickname, amount: plan.amount, interval: plan.interval}, total_spent: to_number(metadata.lifetime_value) || `0`, payment_method: invoice_settings.default_payment_method}"
  }
}
{
  "id": "transform-intercom",
  "component": "json_transform",
  "depends_on": ["fetch-intercom"],
  "config": {
    "data": "{{stages.fetch-intercom.output.body}}",
    "expression": "{source: 'intercom', last_seen: last_seen_at, total_conversations: statistics.total_conversations, avg_response_time: statistics.avg_first_response_time, open_tickets: statistics.open_tickets, tags: tags.data[*].name, custom_attributes: custom_attributes}"
  }
}
{
  "id": "transform-internal",
  "component": "json_transform",
  "depends_on": ["fetch-internal"],
  "config": {
    "data": "{{stages.fetch-internal.output.body}}",
    "expression": "{source: 'internal', monthly_active_days: usage.active_days_30d, feature_usage: usage.features, api_calls_30d: usage.api_calls, last_login: usage.last_login, account_health_score: computed.health_score}"
  }
}

Stage 11: Merge All Data

Combine into unified customer model:

{
  "id": "merge-responses",
  "component": "json_transform",
  "depends_on": ["transform-salesforce", "transform-stripe", "transform-intercom", "transform-internal"],
  "config": {
    "data": {
      "salesforce": "{{stages.transform-salesforce.output}}",
      "stripe": "{{stages.transform-stripe.output}}",
      "intercom": "{{stages.transform-intercom.output}}",
      "internal": "{{stages.transform-internal.output}}"
    },
    "expression": "{customer_id: '{{input.customer_id}}', profile: {company: salesforce.company_name, industry: salesforce.industry, size: salesforce.employee_count, email: stripe.email}, financial: {subscription_status: stripe.subscriptions[0].status, mrr: stripe.subscriptions[0].amount, total_spent: stripe.total_spent, currency: stripe.currency}, engagement: {last_seen: intercom.last_seen, support_tickets: intercom.open_tickets, health_score: internal.account_health_score, active_days: internal.monthly_active_days}, sales: {deal_stage: salesforce.deal_stage, owner: salesforce.owner}, sources_fetched: ['salesforce', 'stripe', 'intercom', 'internal'], fetched_at: '{{now()}}'}"
  }
}

Stage 12: AI Enrichment

Add intelligent insights:

{
  "id": "enrich-data",
  "component": "generator",
  "depends_on": ["merge-responses"],
  "config": {
    "model": "gpt-4",
    "temperature": 0.3,
    "system_prompt": "You are a customer success analyst. Given customer data, provide: 1) Risk level (low/medium/high) with reasoning, 2) Top 3 engagement opportunities, 3) Churn indicators if any. Be concise and actionable.",
    "prompt": "Analyze this customer:\n\n{{stages.merge-responses.output | tojson}}\n\nProvide your assessment:"
  }
}

Stage 13: Validate Output

Ensure we return a valid response:

{
  "id": "final-validation",
  "component": "schema-validate",
  "depends_on": ["merge-responses", "enrich-data"],
  "config": {
    "data": {
      "customer": "{{stages.merge-responses.output}}",
      "insights": "{{stages.enrich-data.output}}"
    },
    "schema": {
      "type": "object",
      "required": ["customer", "insights"],
      "properties": {
        "customer": {
          "type": "object",
          "required": ["customer_id", "profile", "financial", "engagement"]
        },
        "insights": {
          "type": "object"
        }
      }
    }
  }
}

Stage 14: Log Completion

{
  "id": "log-completion",
  "component": "logger",
  "depends_on": ["final-validation"],
  "config": {
    "level": "info",
    "message": "Customer 360 built for {{input.customer_id}}: health_score={{stages.merge-responses.output.engagement.health_score}}, risk={{stages.enrich-data.output.risk_level}}"
  }
}

Execution Flow

gantt
    title API Orchestration Timeline
    dateFormat X
    axisFormat %L

    section Validate
    validate-input    :0, 10

    section Fetch (parallel)
    fetch-salesforce  :10, 210
    fetch-stripe      :10, 180
    fetch-intercom    :10, 150
    fetch-internal    :10, 80

    section Transform (parallel)
    transform-sf      :210, 215
    transform-stripe  :210, 215
    transform-ic      :210, 215
    transform-int     :210, 215

    section Merge
    merge-responses   :215, 220

    section Enrich
    ai-enrich         :220, 1500

    section Validate
    final-validation  :1500, 1505

Total: ~1.5 seconds (mostly AI enrichment). Without AI: ~220ms.

Sample Input

{
  "customer_id": "cust_abc123def456gh",
  "include_history": true,
  "enrichment_level": "full",
  "config_api": "https://config.internal.company.com",
  "internal_api": "https://api.internal.company.com"
}

Expected Output

{
  "customer": {
    "customer_id": "cust_abc123def456gh",
    "profile": {
      "company": "Acme Corp",
      "industry": "Technology",
      "size": 250,
      "email": "[email protected]"
    },
    "financial": {
      "subscription_status": "active",
      "mrr": 49900,
      "total_spent": 598800,
      "currency": "usd"
    },
    "engagement": {
      "last_seen": "2024-01-14T18:30:00Z",
      "support_tickets": 2,
      "health_score": 78,
      "active_days": 22
    },
    "sales": {
      "deal_stage": "Customer",
      "owner": "Sarah Johnson"
    },
    "sources_fetched": ["salesforce", "stripe", "intercom", "internal"],
    "fetched_at": "2024-01-15T10:30:00Z"
  },
  "insights": {
    "risk_level": "medium",
    "risk_reasoning": "2 open support tickets and health score declined from 85 to 78",
    "opportunities": [
      "Schedule QBR - no exec engagement in 90 days",
      "Offer training - feature adoption at 45%",
      "Address support backlog - 2 tickets open > 5 days"
    ],
    "churn_indicators": [
      "Health score trending down (-7 points in 30 days)",
      "Login frequency decreased 20% vs previous month"
    ]
  }
}

Key Learnings

1. Parallel API Pattern

Sequential:   200 + 180 + 150 + 80 = 610ms
Parallel:     max(200, 180, 150, 80) = 200ms

3x faster with no code changes - just no depends_on between fetches.

2. Transformation Strategy

Raw APITransformed
Salesforce: 50+ fields8 relevant fields
Stripe: nested subscriptionsFlat subscription array
Intercom: statistics objectKey metrics extracted

JMESPath handles the transformation declaratively.

3. Error Handling Per Source

Each API has its own retry config:

  • Stripe: Retry on 429 (rate limit)
  • Salesforce: Standard 3x retry
  • Internal: Fast timeout (it should be fast)

If one source fails, others still return data.

Common Patterns

ETL Pipeline

flowchart LR
    E[Extract] --> T[Transform] --> L[Load]
    E --> |"parallel"| API1
    E --> |"parallel"| API2
    E --> |"parallel"| API3

Webhook Handler

flowchart LR
    R[Receive] --> V[Validate] --> Route
    Route --> A[Process Type A]
    Route --> B[Process Type B]
    Route --> C[Process Type C]

Data Sync

flowchart LR
    Detect[Detect Changes] --> Map[Map Fields] --> Update[Update Targets] --> Log

Try It Yourself

fm run pipelines/devops-api-orchestration.pipeline.json \
  --input inputs/customer-lookup.json