Pipeline Cookbook. 12 recipes.
Copy-pasteable pipeline JSON for the patterns FlowMason production pipelines hit most often. Each recipe is a complete, runnable stages[] slice.
Index
- Recipe 1. Retry with provider fallback
- Recipe 2. Fan-out / fan-in (ParallelExecutor)
- Recipe 3. RAG over Knowledge Base
- Recipe 4. Critic / improver loop
- Recipe 5. ForEach with parallel item processing
- Recipe 6. Async signal-wait (human approval)
- Recipe 7. TryCatch with compensating transaction
- Recipe 8. Circuit breaker (buffered mode)
- Recipe 9. Trigger-driven enrichment
- Recipe 10. Sub-pipeline reuse
- Recipe 11. Output routing (multi-destination)
- Recipe 12. Cached lookup
Recipe 1. Retry with provider fallback
Wrap an LLM call in retry + provider fallback. First three attempts on Anthropic; if all fail, drop to OpenAI.
{
"name": "summarise",
"type": "llm_summarizer",
"depends_on": ["fetch"],
"retry_config": { "max_retries": 3, "backoff_ms": 1000 },
"config": {
"prompt": "Summarise: {{ stages.fetch.records }}",
"providerFallback": ["anthropic", "openai"],
"outputSchema": { "type": "string" }
}
} __meta.providerUsed + __meta.providerAttempts surface which provider answered + how many tries.
Recipe 2. Fan-out / fan-in (ParallelExecutor)
Run three branches concurrently. Extract entities + sentiment + topic, then merge.
[
{ "name": "extract_entities", "type": "llm_extractor", "depends_on": [] },
{ "name": "extract_sentiment", "type": "llm_classifier", "depends_on": [] },
{ "name": "extract_topic", "type": "llm_classifier", "depends_on": [] },
{
"name": "fanout",
"type": "parallel",
"depends_on": [],
"config": {
"branches": ["extract_entities","extract_sentiment","extract_topic"],
"async": true,
"waitForAll": true
}
},
{
"name": "merge",
"type": "merge",
"depends_on": ["fanout"],
"config": {
"sources": [
"{{ stages.extract_entities.content }}",
"{{ stages.extract_sentiment.content }}",
"{{ stages.extract_topic.content }}"
],
"strategy": "deep"
}
}
] Async mode splits each branch into its own Queueable. Joins via ExecutionState.asyncTokens.
Recipe 3. RAG over Knowledge Base
Vector retrieve → grounded Q&A. Standard RAG shape.
[
{ "name": "embed", "type": "llm_embedder", "depends_on": [],
"config": { "model": "text-embedding-3-small", "input": "{{ input.question }}" } },
{ "name": "retrieve", "type": "rag_retriever", "depends_on": ["embed"],
"config": { "indexName": "kb_articles", "vector": "{{ stages.embed.embedding }}", "topK": 5 } },
{ "name": "answer", "type": "llm_qa", "depends_on": ["retrieve"],
"config": { "question": "{{ input.question }}", "context": "{{ stages.retrieve.results }}" } }
] llm_qa returns {content, citations}. Surface citations in your LWC for traceability.
Recipe 4. Critic / improver loop
LLM produces a draft; critic scores it; improver revises if score is below threshold.
[
{ "name": "draft", "type": "llm_generator",
"config": { "prompt": "Draft a reply to: {{ input.email }}" } },
{ "name": "critique", "type": "llm_critic", "depends_on": ["draft"],
"config": { "content": "{{ stages.draft.content }}", "rubric": "Helpful, on-brand. 1-10." } },
{ "name": "improve", "type": "llm_improver", "depends_on": ["critique"],
"if": "stages.critique.score < 8",
"config": { "content": "{{ stages.draft.content }}", "feedback": "{{ stages.critique.feedback }}" } }
] Output: prefer improve.content when the if fired, else draft.content.
Recipe 5. ForEach with parallel item processing
Iterate a list of cases, classify each in parallel, collect results.
[
{ "name": "fetch_cases", "type": "soql_query",
"config": { "query": "SELECT Id, Subject FROM Case WHERE Status = 'New' LIMIT 50" } },
{ "name": "loop", "type": "foreach", "depends_on": ["fetch_cases"],
"config": {
"items": "{{ stages.fetch_cases.records }}",
"item_variable": "case",
"parallelism": 5,
"loop_stages": ["classify_one"],
"collect_results": true
} },
{ "name": "classify_one", "type": "llm_classifier",
"config": {
"prompt": "Classify this case: {{ context.case.Subject }}",
"labels": ["urgent","normal","low"]
} }
] forEachMaxItems (default 500) hard-caps the input list size.
Recipe 6. Async signal-wait (human approval)
Pipeline pauses; external system delivers a signal; pipeline resumes with the signal payload threaded into context.
[
{ "name": "draft_amendment", "type": "llm_generator",
"config": { "prompt": "Draft an amendment for: {{ input.contractId }}" } },
{ "name": "wait_for_legal", "type": "wait_for_signal", "depends_on": ["draft_amendment"],
"config": {
"signal": "approval.{{ input.contractId }}",
"timeoutMs": 86400000,
"expected": { "approved": true }
} },
{ "name": "send", "type": "email_sender", "depends_on": ["wait_for_legal"],
"if": "stages.wait_for_legal.payload.approved == true",
"config": {
"to": ["{{ input.customerEmail }}"],
"subject": "Amendment for review",
"body": "{{ stages.draft_amendment.content }}"
} }
] Signal delivered via FM_Signal__c insert from any external trigger. 24-hour timeout in this example.
Recipe 7-12. More patterns
The following recipes follow the same shape — see the source markdown for full JSON:
- Recipe 7. TryCatch with compensating transaction — DML rollback on failure via
catch_stages. - Recipe 8. Circuit breaker (buffered mode) — fail-fast or buffer to
FM_Circuit_Queue__c; replayed by drainer. - Recipe 9. Trigger-driven enrichment — wire pipeline to fire after Lead insert via
FM_Trigger_Binding__mdt; bulkified. - Recipe 10. Sub-pipeline reuse — compose larger pipelines from smaller via
SubPipelinew/ optional context isolation. - Recipe 11. Output routing (multi-destination) — same pipeline result fans out to webhook + Platform Event + DML via
output_router. - Recipe 12. Cached lookup — memoise an expensive upstream call within a 1-hour window via
cache_operatoror stage-levelcacheable: true.
Where to take these next
- Compose recipes into pipelines and ship them as
Pipeline_Template__mdtfor the Packages browser. - Wire trigger / event / schedule bindings via
FM_Trigger_Binding__mdt,FM_Event_Binding__mdt,FM_Schedule_Binding__mdt. - For Org Chat custom flows that benefit from these patterns, see Org Chat.
Related
- Component Catalog — every type referenced above
- Pipeline Studio — visual authoring
- Pipeline Authoring (AI-assisted) — LLM generates pipeline JSON