Build this pipeline yourself
Open in the interactive wizard to customize and export
Multi-Service API Orchestration
Coordinate multiple APIs with parallel calls, data transformation, and AI enrichment
Components Used
The Problem
Building a “customer 360” view requires data from 5+ sources:
- Salesforce - Account details, deal stage
- Stripe - Payment history, subscription status
- Intercom - Support tickets, conversation history
- Your database - Usage metrics, feature flags
- Enrichment API - Company data, firmographics
Writing this integration means:
- 400 lines of Python with nested try/catch
- Each API has different auth (OAuth, API key, JWT)
- Rate limits hit at different thresholds
- If Stripe is slow, the whole thing times out
- When Intercom changes their API, the script breaks silently
Pain Points We’re Solving
- Brittle integrations - One API change breaks everything
- No observability - Which step failed? What did it return?
- Sequential calls - 5 APIs × 200ms = 1 second (could be 200ms)
- Scattered logic - Data transformation mixed with API calls
Thinking Process
flowchart TB
subgraph Strategy["Design Strategy"]
S1["1. Validate input schema"]
S2["2. Parallel API calls (independent)"]
S3["3. Transform each response"]
S4["4. Merge into unified model"]
S5["5. AI enrichment (optional)"]
S6["6. Validate output schema"]
end
S1 --> S2 --> S3 --> S4 --> S5 --> S6
Key Insight: Declarative Integration
Instead of imperative code (fetch, parse, transform, merge), declare what you need and let FlowMason handle the orchestration, retries, and parallelization.
Solution Architecture
flowchart TB
subgraph Input["Input"]
I1["customer_id"]
end
subgraph Parallel["Parallel Fetch (200ms total)"]
direction LR
P1["Salesforce"]
P2["Stripe"]
P3["Intercom"]
P4["Internal DB"]
end
subgraph Transform["Transform"]
direction TB
T1["Normalize fields"]
T2["Calculate metrics"]
T3["Merge responses"]
end
subgraph Enrich["AI Enrich"]
E1["Summarize history"]
E2["Risk assessment"]
end
subgraph Output["Output"]
O1["Customer 360 JSON"]
end
Input --> Parallel
Parallel --> Transform
Transform --> Enrich
Enrich --> Output
Pipeline Stages
Stage 1: Validate Input
Catch bad requests before making API calls:
{
"id": "validate-input",
"component": "schema-validate",
"config": {
"data": "{{input}}",
"schema": {
"type": "object",
"required": ["customer_id"],
"properties": {
"customer_id": {
"type": "string",
"pattern": "^cust_[a-zA-Z0-9]{14}$"
},
"include_history": {
"type": "boolean",
"default": true
},
"enrichment_level": {
"type": "string",
"enum": ["basic", "full"],
"default": "basic"
}
}
}
}
}
Stage 2: Fetch Configuration
Get the integration config (endpoints, field mappings):
{
"id": "fetch-config",
"component": "http-request",
"depends_on": ["validate-input"],
"config": {
"url": "{{input.config_api}}/integrations/customer-360",
"method": "GET",
"headers": {
"Authorization": "Bearer {{secrets.CONFIG_TOKEN}}"
},
"timeout": 5000
}
}
Stages 3-6: Parallel API Calls
All four APIs called simultaneously:
{
"id": "fetch-salesforce",
"component": "http-request",
"depends_on": ["validate-input"],
"config": {
"url": "{{secrets.SALESFORCE_URL}}/services/data/v58.0/sobjects/Account/{{input.customer_id}}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{secrets.SALESFORCE_TOKEN}}"
},
"timeout": 10000,
"retry": {
"max_attempts": 3,
"delay_seconds": 1,
"backoff_multiplier": 2
}
}
}
{
"id": "fetch-stripe",
"component": "http-request",
"depends_on": ["validate-input"],
"config": {
"url": "https://api.stripe.com/v1/customers/{{input.customer_id}}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{secrets.STRIPE_SECRET_KEY}}"
},
"query_params": {
"expand[]": ["subscriptions", "invoice_settings"]
},
"timeout": 10000,
"retry": {
"max_attempts": 3,
"delay_seconds": 1,
"condition": "{{output.status_code == 429}}"
}
}
}
{
"id": "fetch-intercom",
"component": "http-request",
"depends_on": ["validate-input"],
"config": {
"url": "https://api.intercom.io/contacts/{{input.customer_id}}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{secrets.INTERCOM_TOKEN}}",
"Intercom-Version": "2.10"
},
"timeout": 10000,
"retry": {
"max_attempts": 3,
"delay_seconds": 2
}
}
}
{
"id": "fetch-internal",
"component": "http-request",
"depends_on": ["validate-input"],
"config": {
"url": "{{input.internal_api}}/customers/{{input.customer_id}}/usage",
"method": "GET",
"headers": {
"Authorization": "Bearer {{secrets.INTERNAL_TOKEN}}"
},
"timeout": 5000
}
}
Stages 7-10: Transform Each Response
Normalize to a common schema:
{
"id": "transform-salesforce",
"component": "json_transform",
"depends_on": ["fetch-salesforce"],
"config": {
"data": "{{stages.fetch-salesforce.output.body}}",
"expression": "{source: 'salesforce', company_name: Name, industry: Industry, employee_count: NumberOfEmployees, annual_revenue: AnnualRevenue, deal_stage: StageName, owner: Owner.Name, created_at: CreatedDate}"
}
}
{
"id": "transform-stripe",
"component": "json_transform",
"depends_on": ["fetch-stripe"],
"config": {
"data": "{{stages.fetch-stripe.output.body}}",
"expression": "{source: 'stripe', email: email, currency: currency, balance: balance, subscriptions: subscriptions.data[*].{id: id, status: status, plan: plan.nickname, amount: plan.amount, interval: plan.interval}, total_spent: to_number(metadata.lifetime_value) || `0`, payment_method: invoice_settings.default_payment_method}"
}
}
{
"id": "transform-intercom",
"component": "json_transform",
"depends_on": ["fetch-intercom"],
"config": {
"data": "{{stages.fetch-intercom.output.body}}",
"expression": "{source: 'intercom', last_seen: last_seen_at, total_conversations: statistics.total_conversations, avg_response_time: statistics.avg_first_response_time, open_tickets: statistics.open_tickets, tags: tags.data[*].name, custom_attributes: custom_attributes}"
}
}
{
"id": "transform-internal",
"component": "json_transform",
"depends_on": ["fetch-internal"],
"config": {
"data": "{{stages.fetch-internal.output.body}}",
"expression": "{source: 'internal', monthly_active_days: usage.active_days_30d, feature_usage: usage.features, api_calls_30d: usage.api_calls, last_login: usage.last_login, account_health_score: computed.health_score}"
}
}
Stage 11: Merge All Data
Combine into unified customer model:
{
"id": "merge-responses",
"component": "json_transform",
"depends_on": ["transform-salesforce", "transform-stripe", "transform-intercom", "transform-internal"],
"config": {
"data": {
"salesforce": "{{stages.transform-salesforce.output}}",
"stripe": "{{stages.transform-stripe.output}}",
"intercom": "{{stages.transform-intercom.output}}",
"internal": "{{stages.transform-internal.output}}"
},
"expression": "{customer_id: '{{input.customer_id}}', profile: {company: salesforce.company_name, industry: salesforce.industry, size: salesforce.employee_count, email: stripe.email}, financial: {subscription_status: stripe.subscriptions[0].status, mrr: stripe.subscriptions[0].amount, total_spent: stripe.total_spent, currency: stripe.currency}, engagement: {last_seen: intercom.last_seen, support_tickets: intercom.open_tickets, health_score: internal.account_health_score, active_days: internal.monthly_active_days}, sales: {deal_stage: salesforce.deal_stage, owner: salesforce.owner}, sources_fetched: ['salesforce', 'stripe', 'intercom', 'internal'], fetched_at: '{{now()}}'}"
}
}
Stage 12: AI Enrichment
Add intelligent insights:
{
"id": "enrich-data",
"component": "generator",
"depends_on": ["merge-responses"],
"config": {
"model": "gpt-4",
"temperature": 0.3,
"system_prompt": "You are a customer success analyst. Given customer data, provide: 1) Risk level (low/medium/high) with reasoning, 2) Top 3 engagement opportunities, 3) Churn indicators if any. Be concise and actionable.",
"prompt": "Analyze this customer:\n\n{{stages.merge-responses.output | tojson}}\n\nProvide your assessment:"
}
}
Stage 13: Validate Output
Ensure we return a valid response:
{
"id": "final-validation",
"component": "schema-validate",
"depends_on": ["merge-responses", "enrich-data"],
"config": {
"data": {
"customer": "{{stages.merge-responses.output}}",
"insights": "{{stages.enrich-data.output}}"
},
"schema": {
"type": "object",
"required": ["customer", "insights"],
"properties": {
"customer": {
"type": "object",
"required": ["customer_id", "profile", "financial", "engagement"]
},
"insights": {
"type": "object"
}
}
}
}
}
Stage 14: Log Completion
{
"id": "log-completion",
"component": "logger",
"depends_on": ["final-validation"],
"config": {
"level": "info",
"message": "Customer 360 built for {{input.customer_id}}: health_score={{stages.merge-responses.output.engagement.health_score}}, risk={{stages.enrich-data.output.risk_level}}"
}
}
Execution Flow
gantt
title API Orchestration Timeline
dateFormat X
axisFormat %L
section Validate
validate-input :0, 10
section Fetch (parallel)
fetch-salesforce :10, 210
fetch-stripe :10, 180
fetch-intercom :10, 150
fetch-internal :10, 80
section Transform (parallel)
transform-sf :210, 215
transform-stripe :210, 215
transform-ic :210, 215
transform-int :210, 215
section Merge
merge-responses :215, 220
section Enrich
ai-enrich :220, 1500
section Validate
final-validation :1500, 1505
Total: ~1.5 seconds (mostly AI enrichment). Without AI: ~220ms.
Sample Input
{
"customer_id": "cust_abc123def456gh",
"include_history": true,
"enrichment_level": "full",
"config_api": "https://config.internal.company.com",
"internal_api": "https://api.internal.company.com"
}
Expected Output
{
"customer": {
"customer_id": "cust_abc123def456gh",
"profile": {
"company": "Acme Corp",
"industry": "Technology",
"size": 250,
"email": "[email protected]"
},
"financial": {
"subscription_status": "active",
"mrr": 49900,
"total_spent": 598800,
"currency": "usd"
},
"engagement": {
"last_seen": "2024-01-14T18:30:00Z",
"support_tickets": 2,
"health_score": 78,
"active_days": 22
},
"sales": {
"deal_stage": "Customer",
"owner": "Sarah Johnson"
},
"sources_fetched": ["salesforce", "stripe", "intercom", "internal"],
"fetched_at": "2024-01-15T10:30:00Z"
},
"insights": {
"risk_level": "medium",
"risk_reasoning": "2 open support tickets and health score declined from 85 to 78",
"opportunities": [
"Schedule QBR - no exec engagement in 90 days",
"Offer training - feature adoption at 45%",
"Address support backlog - 2 tickets open > 5 days"
],
"churn_indicators": [
"Health score trending down (-7 points in 30 days)",
"Login frequency decreased 20% vs previous month"
]
}
}
Key Learnings
1. Parallel API Pattern
Sequential: 200 + 180 + 150 + 80 = 610ms
Parallel: max(200, 180, 150, 80) = 200ms
3x faster with no code changes - just no depends_on between fetches.
2. Transformation Strategy
| Raw API | Transformed |
|---|---|
| Salesforce: 50+ fields | 8 relevant fields |
| Stripe: nested subscriptions | Flat subscription array |
| Intercom: statistics object | Key metrics extracted |
JMESPath handles the transformation declaratively.
3. Error Handling Per Source
Each API has its own retry config:
- Stripe: Retry on 429 (rate limit)
- Salesforce: Standard 3x retry
- Internal: Fast timeout (it should be fast)
If one source fails, others still return data.
Common Patterns
ETL Pipeline
flowchart LR
E[Extract] --> T[Transform] --> L[Load]
E --> |"parallel"| API1
E --> |"parallel"| API2
E --> |"parallel"| API3
Webhook Handler
flowchart LR
R[Receive] --> V[Validate] --> Route
Route --> A[Process Type A]
Route --> B[Process Type B]
Route --> C[Process Type C]
Data Sync
flowchart LR
Detect[Detect Changes] --> Map[Map Fields] --> Update[Update Targets] --> Log
Try It Yourself
fm run pipelines/devops-api-orchestration.pipeline.json \
--input inputs/customer-lookup.json