PipelineRunner & the Apex API
Everything you need to execute pipelines from Apex — triggers, batch jobs, REST resources, scheduled classes, or anywhere else Apex runs. One class, four execution shapes.
The four execution shapes
PipelineRunner is the single entry point for all pipeline execution. Pick the shape that fits your context:
// 1. Sync — run immediately, return the output
ExecutionResult result = PipelineRunner.run('account-summarize-v1', input);
// 2. Async fire-and-forget — returns execution Id
String executionId = PipelineRunner.runAsync('account-summarize-v1', input);
// 3. Async tracked — poll status and retrieve output
AsyncExecution exec = PipelineRunner.runAsyncTracked('account-summarize-v1', input);
// 4. Direct JSON — for tests or dynamic construction
ExecutionResult result = PipelineRunner.execute(pipelineJson, input); When to use which:
- Use
runfrom triggers and small synchronous flows. It blocks until the pipeline completes. - Use
runAsyncwhen the pipeline contains LLM calls, HTTP callouts, or heavy SOQL. It enqueues a Queueable and returns immediately. - Use
runAsyncTrackedwhen you need to poll for progress or retrieve the final output from a Lightning component. - Use
execute(json, input)in tests and dynamic scenarios — pass the pipeline JSON directly instead of a stored pipeline Id.
Building the input map
The second argument to every run method is a Map<String, Object>. Anything you put in this map is available inside the pipeline as {{input.key}}. Values can be primitives, lists, or nested maps.
Map<String, Object> input = new Map<String, Object>{
'recordId' => someAccount.Id,
'userId' => UserInfo.getUserId(),
'threshold' => 0.75,
'tags' => new List<String>{ 'vip', 'renewal' }
};
ExecutionResult result = PipelineRunner.run('score-account-v2', input); Working with ExecutionResult
ExecutionResult is what run and execute return. Check isSuccess() first, then read either the top-level output or drill into individual stageOutputs.
ExecutionResult result = PipelineRunner.run('summarize-v1', input);
if (result.isSuccess()) {
// Access the output from the terminal stage
String summary = (String) result.output;
// Or drill into named stage outputs
Map<String, Object> outputs = result.stageOutputs;
String classified = (String) outputs.get('classifier');
} else {
// result.errorMessage has the human-readable error
// result.errorType has the exception class name
System.debug('Pipeline failed: ' + result.errorMessage);
} Tracking async executions
AsyncExecution wraps a PipelineExecution__c row and handles polling, progress, and cancellation. This is the pattern to use when a Lightning component needs to display real-time progress on a long-running pipeline.
AsyncExecution exec = PipelineRunner.runAsyncTracked('long-pipeline', input);
// Polling in a Lightning component via @AuraEnabled:
@AuraEnabled
public static Map<String, Object> pollExecution(Id executionId) {
AsyncExecution exec = AsyncExecution.byId(executionId);
return new Map<String, Object>{
'status' => exec.getStatus(),
'progress' => exec.getProgress(),
'done' => exec.isDone(),
'output' => exec.isDone() ? exec.getOutput() : null
};
}
// Status values: 'Pending' | 'Running' | 'Completed' | 'Failed' | 'Cancelled'
// Progress: 0.0 to 1.0 (fraction of completed stages)
// Cancel (requires ownership or FlowMason_Execution_Admin permission):
exec.cancel(); Building pipelines in code
The PipelineBuilder fluent API constructs valid pipeline JSON without hand-crafting it. This is useful for tests, dynamic generation, and packaged-app scenarios where the pipeline shape depends on runtime input. The output of build() is identical to a Studio export — you can save it to a Custom Metadata record or feed it to PipelineRunner.execute().
String json = new PipelineBuilder('analyze-case')
.setName('Classify and route case')
.addStage('fetch', 'soql_query')
.withConfig('query', 'SELECT Subject, Description FROM Case WHERE Id = :caseId')
.end()
.addStage('classify', 'classify')
.dependsOn('fetch')
.withConfig('prompt', 'Classify this case: {{stages.fetch.records[0].Subject}}')
.withConfig('categories', 'billing,technical,onboarding,other')
.end()
.addStage('notify', 'http_callout')
.dependsOn('classify')
.withConfig('url', 'https://hooks.slack.com/services/...')
.withConfig('body', '{"text": "Case routed to {{stages.classify.category}}"}')
.end()
.setOutput('classify')
.build();
ExecutionResult result = PipelineRunner.execute(json, new Map<String, Object>{
'caseId' => incomingCase.Id
}); Exception handling
All FlowMason errors extend FMException. Catch selectively to make recovery logic precise — different exceptions call for different responses.
try {
ExecutionResult r = PipelineRunner.run(pipelineId, input);
} catch (PipelineNotFoundException e) {
// Pipeline id not found — config mistake, surface to the user
} catch (ProviderAuthException e) {
// API key rejected — page the admin, keys need rotation
} catch (ProviderTimeoutException e) {
// LLM timed out after all retries — try async or reduce input
} catch (GovernorLimitException e) {
// Governor cap hit — defer to async, shrink batch, or reduce input size
} catch (SecurityDeniedException e) {
// FLS/CRUD check failed or cross-user exec access denied
} catch (FMException e) {
// Anything else FlowMason-specific
} catch (Exception e) {
// Unrelated platform error
} | Exception | Thrown when |
|---|---|
PipelineNotFoundException | Pipeline Id not found in CMT or custom object |
ProviderTimeoutException | LLM provider timed out after all configured retries |
ProviderAuthException | Provider rejected the API key (HTTP 401 / 403) |
ValidationFailedException | PipelineValidator returned errors before execution |
GovernorLimitException | ForEach cap, callout exhaustion, context size cap, chain depth cap |
SecurityDeniedException | FLS/CRUD check failed or cross-user execution access denied |
Governor limit patterns
PipelineRunner calls GovernorMonitor between stages automatically — you don't need to worry about this for normal pipeline usage. But if you're extending the SDK with a custom long-running stage, call these guards yourself before expensive operations.
// PipelineRunner calls GovernorMonitor between stages automatically.
// Call these yourself only when extending the SDK with custom long-running operations.
if (!GovernorMonitor.canMakeCallout()) {
// 80% of the callout limit consumed — yield
return null;
}
if (GovernorMonitor.shouldYield()) {
// Combined CPU/heap/chain-depth check — defer remaining work
System.enqueueJob(new MyContinuationJob(state));
return;
} | Method | Threshold config key (default) |
|---|---|
canMakeCallout() | governorCalloutThreshold (0.8 = 80% of limit) |
canMakeSoql() | governorSoqlThreshold (0.8) |
canMakeDml() | governorDmlThreshold (0.8) |
hasHeapRoom() | governorHeapThreshold (0.8) |
shouldYield() | Combined CPU / heap / chain depth check |
Configuration
Every tunable in the SDK flows through FM_Config__mdt. Read with typed accessors that take a default fallback:
Integer maxTokens = FMConfig.getInteger('defaultMaxTokens', 2000);
String provider = FMConfig.getString('defaultProvider', 'edenai');
Decimal temp = FMConfig.getDecimal('defaultTemperature', 0.7);
Boolean strict = FMConfig.getBoolean('strictDependencyResolution', true);
// Override at runtime (requires FlowMason_Config_Admin custom permission):
FMConfigAdmin.upsertConfig(new Map<String, Object>{
'key' => 'defaultMaxTokens',
'value' => '1500',
'valueType' => 'number',
'category' => 'provider'
});
// In tests — no permission needed:
FMConfig.setForTest('defaultMaxTokens', '1500', 'number'); The full list of seeded keys is in the Provider Configuration guide. You can also browse them at Setup → Custom Metadata Types → FM Config → Manage Records.