FLOW MASON

Component Types

Understanding the three types of components in FlowMason - Nodes, Operators, and Control Flow.

FlowMason uses three distinct component types, each designed for specific purposes in your pipelines.

flowchart TD
    subgraph Components["18 Built-in Components"]
        direction TB
        subgraph Nodes["5 AI Nodes"]
            G["generator"]
            C["critic"]
            I["improver"]
            SE["selector"]
            SY["synthesizer"]
        end
        subgraph Operators["7 Operators"]
            HR["http-request"]
            JT["json-transform"]
            F["filter"]
            SV["schema-validate"]
            VS["variable-set"]
            L["logger"]
            LE["loop"]
        end
        subgraph Control["6 Control Flow"]
            CO["conditional"]
            FE["foreach"]
            TC["trycatch"]
            RO["router"]
            SP["subpipeline"]
            RE["return"]
        end
    end

Nodes (AI-Powered)

Nodes are components that leverage Large Language Models (LLMs) for intelligent processing. Use nodes when you need:

  • Text generation or completion
  • Content analysis and extraction
  • Decision making based on context
  • Creative or reasoning tasks

Defining a Node

from flowmason_core import node, NodeInput, NodeOutput, Field

@node(
    name="summarizer",
    description="Summarize text content",
    category="reasoning",
    timeout=60
)
class SummarizerNode:
    class Input(NodeInput):
        text: str = Field(description="Text to summarize")
        max_length: int = Field(default=200, ge=50, le=1000)

    class Output(NodeOutput):
        summary: str
        word_count: int

    async def execute(self, input: Input, context) -> Output:
        response = await context.llm.generate(
            prompt=f"Summarize in {input.max_length} characters:\n{input.text}"
        )
        return self.Output(
            summary=response.text,
            word_count=len(response.text.split())
        )

Built-in Nodes

FlowMason includes 5 pre-built AI nodes:

generator

General-purpose text generation for any prompt.

{
  "id": "generate-content",
  "component": "generator",
  "config": {
    "prompt": "Write a product description for {{input.product}}",
    "max_tokens": 500,
    "temperature": 0.7
  }
}

critic

Evaluate and score content with structured feedback.

{
  "id": "review-content",
  "component": "critic",
  "config": {
    "content": "{{stages.generate.output.text}}",
    "criteria": ["clarity", "accuracy", "engagement"],
    "scoring": "1-10"
  }
}

improver

Refine content based on feedback or criteria.

{
  "id": "improve-draft",
  "component": "improver",
  "config": {
    "content": "{{stages.generate.output.text}}",
    "feedback": "{{stages.critic.output.feedback}}",
    "focus": "clarity and conciseness"
  }
}

selector

Choose the best option from multiple alternatives.

{
  "id": "select-best",
  "component": "selector",
  "config": {
    "options": "{{stages.generate-variants.output.variants}}",
    "criteria": "Most engaging and accurate",
    "return_reasoning": true
  }
}

synthesizer

Combine multiple inputs into a cohesive output.

{
  "id": "combine-sources",
  "component": "synthesizer",
  "config": {
    "sources": [
      "{{stages.research.output.findings}}",
      "{{stages.interview.output.quotes}}",
      "{{stages.data.output.statistics}}"
    ],
    "format": "executive summary"
  }
}

Operators (Deterministic)

Operators perform deterministic operations without LLM calls. Use operators for:

  • Data transformation
  • HTTP requests
  • Validation
  • Filtering and formatting

Defining an Operator

from flowmason_core import operator, OperatorInput, OperatorOutput, Field

@operator(
    name="json-transform",
    description="Transform JSON using JMESPath",
    timeout=10
)
class JsonTransformOperator:
    class Input(OperatorInput):
        data: dict = Field(description="Input data")
        expression: str = Field(description="JMESPath expression")

    class Output(OperatorOutput):
        result: Any

    async def execute(self, input: Input, context) -> Output:
        import jmespath
        result = jmespath.search(input.expression, input.data)
        return self.Output(result=result)

Built-in Operators

FlowMason includes 7 operators for data manipulation:

http-request

Make HTTP requests to external APIs.

{
  "id": "fetch-data",
  "component": "http-request",
  "config": {
    "url": "{{input.api_url}}",
    "method": "GET",
    "headers": {
      "Authorization": "Bearer {{env.API_KEY}}"
    },
    "timeout": 30
  }
}

json-transform

Transform data using JMESPath expressions.

{
  "id": "extract-items",
  "component": "json-transform",
  "config": {
    "data": "{{stages.fetch.output.body}}",
    "expression": "data.items[?status=='active']"
  }
}

filter

Filter collections based on conditions.

{
  "id": "filter-results",
  "component": "filter",
  "config": {
    "items": "{{stages.fetch.output.items}}",
    "condition": "item.score > 0.8",
    "limit": 10
  }
}

schema-validate

Validate data against JSON Schema.

{
  "id": "validate-input",
  "component": "schema-validate",
  "config": {
    "data": "{{input}}",
    "schema": {
      "type": "object",
      "required": ["url", "format"],
      "properties": {
        "url": { "type": "string", "format": "uri" },
        "format": { "enum": ["json", "xml"] }
      }
    }
  }
}

variable-set

Set or update pipeline variables.

{
  "id": "set-config",
  "component": "variable-set",
  "config": {
    "variables": {
      "processed_count": "{{stages.filter.output.count}}",
      "timestamp": "{{now()}}"
    }
  }
}

logger

Log messages for debugging and monitoring.

{
  "id": "log-progress",
  "component": "logger",
  "config": {
    "level": "info",
    "message": "Processed {{stages.transform.output.count}} items",
    "data": {
      "stage": "transform",
      "duration": "{{stages.transform.duration_ms}}"
    }
  }
}

loop

Execute stages in a loop with counters.

{
  "id": "retry-loop",
  "component": "loop",
  "config": {
    "count": 3,
    "stage": "attempt-fetch",
    "break_on": "{{current.output.success}}"
  }
}

Control Flow

Control flow components manage pipeline execution logic. FlowMason provides 6 built-in control flow types:

flowchart LR
    subgraph Conditional
        A1[condition] -->|true| B1[then_stage]
        A1 -->|false| C1[else_stage]
    end

    subgraph ForEach
        A2[items] --> B2[stage 1]
        B2 --> C2[stage 2]
        C2 --> D2[stage N]
    end

    subgraph TryCatch
        A3[try_stage] -->|success| B3[continue]
        A3 -->|error| C3[catch_stage]
    end

Conditional

{
  "id": "check-length",
  "component": "conditional",
  "config": {
    "condition": "{{stages.extract.output.length}} > 1000",
    "then_stage": "summarize",
    "else_stage": "format"
  }
}

ForEach

{
  "id": "process-items",
  "component": "foreach",
  "config": {
    "items": "{{stages.fetch.output.items}}",
    "stage": "process-item",
    "parallel": true,
    "max_concurrency": 10
  }
}

TryCatch

{
  "id": "safe-fetch",
  "component": "trycatch",
  "config": {
    "try_stage": "fetch-data",
    "catch_stage": "handle-error"
  }
}

Router

{
  "id": "route-request",
  "component": "router",
  "config": {
    "expression": "{{input.type}}",
    "routes": {
      "article": "process-article",
      "video": "process-video",
      "default": "process-generic"
    }
  }
}

SubPipeline

Run a nested pipeline as a stage:

{
  "id": "nested-process",
  "component": "subpipeline",
  "config": {
    "pipeline": "pipelines/child.pipeline.json",
    "input": { "data": "{{stages.prepare.output}}" }
  }
}

Return

Early exit from pipeline execution:

{
  "id": "early-exit",
  "component": "return",
  "config": {
    "output": "{{stages.cached.output}}"
  }
}

Custom Control Flow

Define custom control flow with the @control_flow decorator:

from flowmason_core import control_flow, ControlFlowInput, ControlFlowOutput

@control_flow(
    name="retry-with-backoff",
    description="Retry a stage with exponential backoff"
)
class RetryWithBackoffControl:
    class Input(ControlFlowInput):
        stage: str
        max_retries: int = 3
        base_delay: float = 1.0

    class Output(ControlFlowOutput):
        attempts: int
        succeeded: bool

    async def execute(self, input: Input, context) -> Output:
        # Custom retry logic with backoff
        ...

Choosing the Right Component

Use CaseComponent Type
Generate or analyze textNode
Transform dataOperator
Make API callsOperator
Branch executionControl Flow
Loop over itemsControl Flow
Handle errorsControl Flow