FLOW MASON

Pipelines

Learn how to define and structure AI pipelines in FlowMason.

Pipelines are the core building blocks of FlowMason. They define a directed acyclic graph (DAG) of stages that process data through your AI workflow.

Pipeline Structure

A pipeline is defined in a .pipeline.json file:

{
  "name": "content-summarizer",
  "version": "1.0.0",
  "description": "Fetch and summarize web content",
  "input": {
    "type": "object",
    "properties": {
      "url": { "type": "string", "description": "URL to summarize" }
    },
    "required": ["url"]
  },
  "output": {
    "type": "object",
    "properties": {
      "summary": { "type": "string" }
    }
  },
  "stages": [
    {
      "id": "fetch",
      "component": "http-request",
      "config": {
        "url": "{{input.url}}",
        "method": "GET"
      }
    },
    {
      "id": "extract",
      "component": "json-transform",
      "depends_on": ["fetch"],
      "config": {
        "expression": "body.content"
      }
    },
    {
      "id": "summarize",
      "component": "generator",
      "depends_on": ["extract"],
      "config": {
        "prompt": "Summarize this content:\n{{stages.extract.output}}"
      }
    }
  ]
}

Stages

Each stage represents a single step in your pipeline:

{
  "id": "stage-id",           // Unique identifier
  "component": "component-name", // Component to execute
  "depends_on": ["other-stage"], // Dependencies (optional)
  "config": {                  // Component configuration
    "key": "value"
  }
}

Dependencies

Use depends_on to define execution order:

{
  "stages": [
    { "id": "a", "component": "..." },
    { "id": "b", "component": "...", "depends_on": ["a"] },
    { "id": "c", "component": "...", "depends_on": ["a"] },
    { "id": "d", "component": "...", "depends_on": ["b", "c"] }
  ]
}

This creates the execution flow:

    a
   / \
  b   c
   \ /
    d

Stages without dependencies run in parallel when possible.

Template Expressions

Reference data using template expressions:

Input Values

"{{input.fieldName}}"

Stage Outputs

"{{stages.stageId.output}}"
"{{stages.stageId.output.propertyName}}"

Examples

{
  "id": "summarize",
  "component": "generator",
  "config": {
    "prompt": "Summarize in {{input.max_length}} words:\n{{stages.fetch.output.text}}"
  }
}

Input/Output Schemas

Define schemas using JSON Schema:

{
  "input": {
    "type": "object",
    "properties": {
      "url": {
        "type": "string",
        "format": "uri",
        "description": "The URL to fetch"
      },
      "options": {
        "type": "object",
        "properties": {
          "maxLength": { "type": "integer", "default": 500 }
        }
      }
    },
    "required": ["url"]
  }
}

Running Pipelines

CLI

# Run with inline input
fm run pipeline.json --input '{"url": "https://example.com"}'

# Run with input file
fm run pipeline.json --input-file input.json

# Run in debug mode
fm run pipeline.json --debug

VSCode

  • Press F5 to run the current pipeline
  • Use the Run button in the editor toolbar
  • Right-click on a .pipeline.json file

API

curl -X POST http://localhost:8999/api/v1/runs \
  -H "Content-Type: application/json" \
  -d '{
    "pipeline": "pipelines/content-summarizer.pipeline.json",
    "input": {"url": "https://example.com"}
  }'

Best Practices

  1. Keep stages focused - Each stage should do one thing well
  2. Use meaningful IDs - Stage IDs should describe their purpose
  3. Validate inputs - Use JSON Schema to validate pipeline inputs
  4. Handle errors - Use TryCatch for stages that might fail
  5. Test thoroughly - Write tests for your pipelines