Pipelines
Learn how to define and structure AI pipelines in FlowMason.
Pipelines are the core building blocks of FlowMason. They define a directed acyclic graph (DAG) of stages that process data through your AI workflow.
Pipeline Structure
A pipeline is defined in a .pipeline.json file:
{
"name": "content-summarizer",
"version": "1.0.0",
"description": "Fetch and summarize web content",
"input": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "URL to summarize" }
},
"required": ["url"]
},
"output": {
"type": "object",
"properties": {
"summary": { "type": "string" }
}
},
"stages": [
{
"id": "fetch",
"component": "http-request",
"config": {
"url": "{{input.url}}",
"method": "GET"
}
},
{
"id": "extract",
"component": "json-transform",
"depends_on": ["fetch"],
"config": {
"expression": "body.content"
}
},
{
"id": "summarize",
"component": "generator",
"depends_on": ["extract"],
"config": {
"prompt": "Summarize this content:\n{{stages.extract.output}}"
}
}
]
}
Stages
Each stage represents a single step in your pipeline:
{
"id": "stage-id", // Unique identifier
"component": "component-name", // Component to execute
"depends_on": ["other-stage"], // Dependencies (optional)
"config": { // Component configuration
"key": "value"
}
}
Dependencies
Use depends_on to define execution order:
{
"stages": [
{ "id": "a", "component": "..." },
{ "id": "b", "component": "...", "depends_on": ["a"] },
{ "id": "c", "component": "...", "depends_on": ["a"] },
{ "id": "d", "component": "...", "depends_on": ["b", "c"] }
]
}
This creates the execution flow:
a
/ \
b c
\ /
d
Stages without dependencies run in parallel when possible.
Template Expressions
Reference data using template expressions:
Input Values
"{{input.fieldName}}"
Stage Outputs
"{{stages.stageId.output}}"
"{{stages.stageId.output.propertyName}}"
Examples
{
"id": "summarize",
"component": "generator",
"config": {
"prompt": "Summarize in {{input.max_length}} words:\n{{stages.fetch.output.text}}"
}
}
Input/Output Schemas
Define schemas using JSON Schema:
{
"input": {
"type": "object",
"properties": {
"url": {
"type": "string",
"format": "uri",
"description": "The URL to fetch"
},
"options": {
"type": "object",
"properties": {
"maxLength": { "type": "integer", "default": 500 }
}
}
},
"required": ["url"]
}
}
Running Pipelines
CLI
# Run with inline input
fm run pipeline.json --input '{"url": "https://example.com"}'
# Run with input file
fm run pipeline.json --input-file input.json
# Run in debug mode
fm run pipeline.json --debug
VSCode
- Press F5 to run the current pipeline
- Use the Run button in the editor toolbar
- Right-click on a
.pipeline.jsonfile
API
curl -X POST http://localhost:8999/api/v1/runs \
-H "Content-Type: application/json" \
-d '{
"pipeline": "pipelines/content-summarizer.pipeline.json",
"input": {"url": "https://example.com"}
}'
Best Practices
- Keep stages focused - Each stage should do one thing well
- Use meaningful IDs - Stage IDs should describe their purpose
- Validate inputs - Use JSON Schema to validate pipeline inputs
- Handle errors - Use TryCatch for stages that might fail
- Test thoroughly - Write tests for your pipelines