Docs / Pipeline Cookbook

Pipeline Cookbook. 12 recipes.

Copy-pasteable pipeline JSON for the patterns FlowMason production pipelines hit most often. Each recipe is a complete, runnable stages[] slice.

Index

  1. Recipe 1. Retry with provider fallback
  2. Recipe 2. Fan-out / fan-in (ParallelExecutor)
  3. Recipe 3. RAG over Knowledge Base
  4. Recipe 4. Critic / improver loop
  5. Recipe 5. ForEach with parallel item processing
  6. Recipe 6. Async signal-wait (human approval)
  7. Recipe 7. TryCatch with compensating transaction
  8. Recipe 8. Circuit breaker (buffered mode)
  9. Recipe 9. Trigger-driven enrichment
  10. Recipe 10. Sub-pipeline reuse
  11. Recipe 11. Output routing (multi-destination)
  12. Recipe 12. Cached lookup

Recipe 1. Retry with provider fallback

Wrap an LLM call in retry + provider fallback. First three attempts on Anthropic; if all fail, drop to OpenAI.

{
  "name": "summarise",
  "type": "llm_summarizer",
  "depends_on": ["fetch"],
  "retry_config": { "max_retries": 3, "backoff_ms": 1000 },
  "config": {
    "prompt": "Summarise: {{ stages.fetch.records }}",
    "providerFallback": ["anthropic", "openai"],
    "outputSchema": { "type": "string" }
  }
}

__meta.providerUsed + __meta.providerAttempts surface which provider answered + how many tries.

Recipe 2. Fan-out / fan-in (ParallelExecutor)

Run three branches concurrently. Extract entities + sentiment + topic, then merge.

[
  { "name": "extract_entities",  "type": "llm_extractor",  "depends_on": [] },
  { "name": "extract_sentiment", "type": "llm_classifier", "depends_on": [] },
  { "name": "extract_topic",     "type": "llm_classifier", "depends_on": [] },
  {
    "name": "fanout",
    "type": "parallel",
    "depends_on": [],
    "config": {
      "branches": ["extract_entities","extract_sentiment","extract_topic"],
      "async": true,
      "waitForAll": true
    }
  },
  {
    "name": "merge",
    "type": "merge",
    "depends_on": ["fanout"],
    "config": {
      "sources": [
        "{{ stages.extract_entities.content }}",
        "{{ stages.extract_sentiment.content }}",
        "{{ stages.extract_topic.content }}"
      ],
      "strategy": "deep"
    }
  }
]

Async mode splits each branch into its own Queueable. Joins via ExecutionState.asyncTokens.

Recipe 3. RAG over Knowledge Base

Vector retrieve → grounded Q&A. Standard RAG shape.

[
  { "name": "embed",    "type": "llm_embedder", "depends_on": [],
    "config": { "model": "text-embedding-3-small", "input": "{{ input.question }}" } },
  { "name": "retrieve", "type": "rag_retriever", "depends_on": ["embed"],
    "config": { "indexName": "kb_articles", "vector": "{{ stages.embed.embedding }}", "topK": 5 } },
  { "name": "answer",   "type": "llm_qa", "depends_on": ["retrieve"],
    "config": { "question": "{{ input.question }}", "context": "{{ stages.retrieve.results }}" } }
]

llm_qa returns {content, citations}. Surface citations in your LWC for traceability.

Recipe 4. Critic / improver loop

LLM produces a draft; critic scores it; improver revises if score is below threshold.

[
  { "name": "draft", "type": "llm_generator",
    "config": { "prompt": "Draft a reply to: {{ input.email }}" } },
  { "name": "critique", "type": "llm_critic", "depends_on": ["draft"],
    "config": { "content": "{{ stages.draft.content }}", "rubric": "Helpful, on-brand. 1-10." } },
  { "name": "improve", "type": "llm_improver", "depends_on": ["critique"],
    "if": "stages.critique.score < 8",
    "config": { "content": "{{ stages.draft.content }}", "feedback": "{{ stages.critique.feedback }}" } }
]

Output: prefer improve.content when the if fired, else draft.content.

Recipe 5. ForEach with parallel item processing

Iterate a list of cases, classify each in parallel, collect results.

[
  { "name": "fetch_cases", "type": "soql_query",
    "config": { "query": "SELECT Id, Subject FROM Case WHERE Status = 'New' LIMIT 50" } },
  { "name": "loop", "type": "foreach", "depends_on": ["fetch_cases"],
    "config": {
      "items": "{{ stages.fetch_cases.records }}",
      "item_variable": "case",
      "parallelism": 5,
      "loop_stages": ["classify_one"],
      "collect_results": true
    } },
  { "name": "classify_one", "type": "llm_classifier",
    "config": {
      "prompt": "Classify this case: {{ context.case.Subject }}",
      "labels": ["urgent","normal","low"]
    } }
]

forEachMaxItems (default 500) hard-caps the input list size.

Recipe 6. Async signal-wait (human approval)

Pipeline pauses; external system delivers a signal; pipeline resumes with the signal payload threaded into context.

[
  { "name": "draft_amendment", "type": "llm_generator",
    "config": { "prompt": "Draft an amendment for: {{ input.contractId }}" } },
  { "name": "wait_for_legal", "type": "wait_for_signal", "depends_on": ["draft_amendment"],
    "config": {
      "signal": "approval.{{ input.contractId }}",
      "timeoutMs": 86400000,
      "expected": { "approved": true }
    } },
  { "name": "send", "type": "email_sender", "depends_on": ["wait_for_legal"],
    "if": "stages.wait_for_legal.payload.approved == true",
    "config": {
      "to": ["{{ input.customerEmail }}"],
      "subject": "Amendment for review",
      "body": "{{ stages.draft_amendment.content }}"
    } }
]

Signal delivered via FM_Signal__c insert from any external trigger. 24-hour timeout in this example.

Recipe 7-12. More patterns

The following recipes follow the same shape — see the source markdown for full JSON:

  • Recipe 7. TryCatch with compensating transaction — DML rollback on failure via catch_stages.
  • Recipe 8. Circuit breaker (buffered mode) — fail-fast or buffer to FM_Circuit_Queue__c; replayed by drainer.
  • Recipe 9. Trigger-driven enrichment — wire pipeline to fire after Lead insert via FM_Trigger_Binding__mdt; bulkified.
  • Recipe 10. Sub-pipeline reuse — compose larger pipelines from smaller via SubPipeline w/ optional context isolation.
  • Recipe 11. Output routing (multi-destination) — same pipeline result fans out to webhook + Platform Event + DML via output_router.
  • Recipe 12. Cached lookup — memoise an expensive upstream call within a 1-hour window via cache_operator or stage-level cacheable: true.

Where to take these next

  • Compose recipes into pipelines and ship them as Pipeline_Template__mdt for the Packages browser.
  • Wire trigger / event / schedule bindings via FM_Trigger_Binding__mdt, FM_Event_Binding__mdt, FM_Schedule_Binding__mdt.
  • For Org Chat custom flows that benefit from these patterns, see Org Chat.

Related