FLOW MASON

SDK - Custom Components

Build custom nodes, operators, and control flow components for FlowMason

FlowMason SDK

Extend FlowMason with custom components tailored to your specific workflows.

Extensibility Without Core Access

FlowMason's core is closed source, but the SDK provides everything you need to build powerful custom components. Create nodes, operators, and control flow components that integrate seamlessly with the FlowMason ecosystem.

Component Types

FlowMason has three component types, each with a specific purpose:

Nodes

AI-powered components that use LLM providers. Use for text generation, analysis, reasoning, and intelligent processing.

@node decorator

Operators

Deterministic utility components for data transformation, API calls, validation, and integration tasks.

@operator decorator

Control Flow

Components that manage execution flow: conditionals, loops, error handling, and routing.

@control_flow decorator

Installation

Install the FlowMason SDK:

pip install flowmason

Creating Custom Nodes

Nodes are AI-powered components that have access to LLM providers through the execution context. They're perfect for tasks requiring natural language understanding, generation, or reasoning.

Basic Node Structure

from flowmason_core.core.types import NodeInput, NodeOutput, Field
from flowmason_core.core.decorators import node

@node(
    name="summarizer",           # Unique identifier (snake_case)
    category="reasoning",        # For grouping in UI
    description="Summarize text content",
    icon="file-text",           # Lucide icon name
    color="#8B5CF6",            # Violet for AI nodes
    version="1.0.0",
    tags=["summarization", "text", "llm"],
    recommended_providers={
        "anthropic": {
            "model": "claude-3-5-sonnet-20241022",
            "temperature": 0.3,
        },
        "openai": {
            "model": "gpt-4o",
            "temperature": 0.3,
        },
    },
    default_provider="anthropic",
)
class SummarizerNode:
    """Summarize text content using AI."""

    class Input(NodeInput):
        text: str = Field(
            description="The text to summarize",
            examples=["A long article about climate change..."]
        )
        max_length: int = Field(
            default=200,
            ge=50,
            le=1000,
            description="Maximum summary length in words"
        )
        style: str = Field(
            default="concise",
            description="Summary style: 'concise', 'detailed', or 'bullet'"
        )

    class Output(NodeOutput):
        summary: str = Field(description="The generated summary")
        word_count: int = Field(description="Number of words in summary")

    class Config:
        requires_llm: bool = True
        timeout_seconds: int = 60
        max_retries: int = 3

    async def execute(self, input: Input, context) -> Output:
        """Execute the summarization."""

        # Access the LLM through context
        llm = context.llm

        prompt = f"""Summarize the following text in a {input.style} style.
Keep it under {input.max_length} words.

Text to summarize:
{input.text}

Summary:"""

        response = await llm.generate_async(
            prompt=prompt,
            system_prompt="You are a professional summarizer.",
            temperature=0.3,
        )

        summary = response.content.strip()
        word_count = len(summary.split())

        return self.Output(
            summary=summary,
            word_count=word_count
        )

Key Node Features

  • Input class - Inherits from NodeInput, defines typed inputs with validation
  • Output class - Inherits from NodeOutput, defines the return schema
  • execute method - Async method that receives input and context
  • context.llm - Access to configured LLM provider for AI operations
  • context.logger - Logging utilities
  • context.run_id - Current pipeline run identifier

Creating Custom Operators

Operators are deterministic components that don't use AI. They're ideal for data transformation, API integration, validation, and any operation where the same input always produces the same output.

Basic Operator Structure

from typing import Dict, Any, Optional, List
from flowmason_core.core.types import OperatorInput, OperatorOutput, Field
from flowmason_core.core.decorators import operator

@operator(
    name="json_transformer",
    category="transformers",
    description="Transform JSON data using JSONPath expressions",
    icon="code",
    color="#3B82F6",     # Blue for operators
    version="1.0.0",
    tags=["json", "transform", "data"],
)
class JsonTransformerOperator:
    """Transform JSON data structures."""

    class Input(OperatorInput):
        data: Dict[str, Any] = Field(
            description="Input JSON data to transform"
        )
        mapping: Dict[str, str] = Field(
            description="Field mapping: {output_key: input_path}"
        )
        defaults: Optional[Dict[str, Any]] = Field(
            default=None,
            description="Default values for missing fields"
        )

    class Output(OperatorOutput):
        result: Dict[str, Any] = Field(
            description="Transformed data"
        )
        fields_mapped: int = Field(
            description="Number of fields successfully mapped"
        )

    class Config:
        deterministic: bool = True    # Same input = same output
        timeout_seconds: int = 30

    async def execute(self, input: Input, context) -> Output:
        """Execute the transformation."""
        result = {}
        fields_mapped = 0

        for output_key, input_path in input.mapping.items():
            value = self._get_nested_value(input.data, input_path)

            if value is not None:
                result[output_key] = value
                fields_mapped += 1
            elif input.defaults and output_key in input.defaults:
                result[output_key] = input.defaults[output_key]

        return self.Output(
            result=result,
            fields_mapped=fields_mapped
        )

    def _get_nested_value(self, data: dict, path: str):
        """Get value from nested dict using dot notation."""
        keys = path.split('.')
        value = data
        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                return None
        return value

HTTP Request Operator Example

from typing import Dict, Any, Optional, Literal
from flowmason_core.core.types import OperatorInput, OperatorOutput, Field
from flowmason_core.core.decorators import operator

@operator(
    name="api_client",
    category="connectors",
    description="Make HTTP requests to external APIs",
    icon="globe",
    color="#3B82F6",
    tags=["http", "api", "rest", "integration"],
)
class ApiClientOperator:
    """Make HTTP requests to APIs."""

    class Input(OperatorInput):
        url: str = Field(description="API endpoint URL")
        method: Literal["GET", "POST", "PUT", "DELETE"] = Field(
            default="GET"
        )
        headers: Optional[Dict[str, str]] = Field(default=None)
        body: Optional[Dict[str, Any]] = Field(default=None)
        timeout: int = Field(default=30, ge=1, le=300)

    class Output(OperatorOutput):
        status_code: int
        body: Any
        success: bool

    async def execute(self, input: Input, context) -> Output:
        import httpx

        async with httpx.AsyncClient(timeout=input.timeout) as client:
            response = await client.request(
                method=input.method,
                url=input.url,
                headers=input.headers,
                json=input.body,
            )

            try:
                body = response.json()
            except:
                body = response.text

            return self.Output(
                status_code=response.status_code,
                body=body,
                success=response.is_success,
            )

Creating Control Flow Components

Control flow components manage pipeline execution flow. They return directives that tell the executor which stages to execute, skip, or loop over.

Conditional Component Example

from typing import List, Optional, Any
from flowmason_core.core.types import (
    ControlFlowInput, ControlFlowOutput,
    ControlFlowDirective, ControlFlowType, Field
)
from flowmason_core.core.decorators import control_flow

@control_flow(
    name="conditional",
    description="Execute branches based on a condition",
    control_flow_type="conditional",
    icon="git-branch",
    color="#EC4899",    # Pink for control flow
    tags=["branching", "if-else", "conditional"],
)
class ConditionalComponent:
    """Branch execution based on conditions."""

    class Input(ControlFlowInput):
        condition: bool = Field(
            description="The condition to evaluate"
        )
        true_branch: List[str] = Field(
            description="Stage IDs to execute if true"
        )
        false_branch: List[str] = Field(
            default_factory=list,
            description="Stage IDs to execute if false"
        )

    class Output(ControlFlowOutput):
        branch_taken: str = Field(
            description="Which branch was taken"
        )
        directive: ControlFlowDirective

    async def execute(self, input: Input, context) -> Output:
        if input.condition:
            # Execute true branch, skip false branch
            return self.Output(
                branch_taken="true",
                directive=ControlFlowDirective(
                    directive_type=ControlFlowType.CONDITIONAL,
                    branch_taken="true",
                    execute_stages=input.true_branch,
                    skip_stages=input.false_branch,
                )
            )
        else:
            # Execute false branch, skip true branch
            return self.Output(
                branch_taken="false",
                directive=ControlFlowDirective(
                    directive_type=ControlFlowType.CONDITIONAL,
                    branch_taken="false",
                    execute_stages=input.false_branch,
                    skip_stages=input.true_branch,
                )
            )

Field Configuration

The Field() function provides rich configuration for input/output fields:

from flowmason_core.core.types import NodeInput, Field

class Input(NodeInput):
    # Required field with description
    text: str = Field(
        description="Text to process",
        examples=["Hello world", "Sample text"]
    )

    # Optional field with default
    language: str = Field(
        default="en",
        description="Language code"
    )

    # Numeric constraints
    temperature: float = Field(
        default=0.7,
        ge=0.0,      # Greater than or equal
        le=2.0,      # Less than or equal
        description="Sampling temperature"
    )

    max_tokens: int = Field(
        default=1000,
        gt=0,        # Greater than
        lt=100000,   # Less than
    )

    # String constraints
    name: str = Field(
        min_length=1,
        max_length=100,
        pattern=r"^[a-zA-Z]+$",   # Regex pattern
    )

    # List with factory default
    tags: List[str] = Field(
        default_factory=list,
        description="Optional tags"
    )

Execution Context

The context parameter in execute() provides access to runtime utilities:

Property Available In Description
context.llm Nodes only LLM provider for AI operations
context.logger All Logging interface
context.run_id All Current pipeline run ID
context.stage_id All Current stage identifier
context.variables All Pipeline variables dict
context.secrets All Secure secrets access

Packaging Components

Package your custom components as .fmpkg files for distribution:

Package Structure

my-components/
├── package.json          # Package metadata
├── nodes/
│   ├── __init__.py
│   └── my_node.py
├── operators/
│   ├── __init__.py
│   └── my_operator.py
└── requirements.txt      # Python dependencies

package.json

{
  "name": "my-components",
  "version": "1.0.0",
  "description": "Custom FlowMason components",
  "author": "Your Name",
  "components": {
    "nodes": ["nodes.my_node.MyNode"],
    "operators": ["operators.my_operator.MyOperator"]
  }
}

Build Package

fm package build my-components/
# Creates: my-components-1.0.0.fmpkg

fm package install my-components-1.0.0.fmpkg

Best Practices

Do

  • • Use descriptive names and descriptions
  • • Add examples to Field definitions
  • • Include validation constraints
  • • Handle errors gracefully
  • • Use context.logger for debugging
  • • Keep nodes focused on single tasks

Don't

  • • Use sync operations in async execute
  • • Store state between executions
  • • Access files outside working directory
  • • Ignore timeout configurations
  • • Skip input validation
  • • Use global variables

Testing Components

FlowMason provides testing utilities for component development:

import pytest
from flowmason_core.testing import MockContext, run_component

from my_components.nodes.summarizer import SummarizerNode

@pytest.mark.asyncio
async def test_summarizer_node():
    # Create mock context with mock LLM
    context = MockContext(
        llm_response="This is a summary of the text."
    )

    # Create input
    input_data = SummarizerNode.Input(
        text="A very long article about technology...",
        max_length=100,
        style="concise"
    )

    # Run the component
    node = SummarizerNode()
    output = await node.execute(input_data, context)

    # Assertions
    assert output.summary == "This is a summary of the text."
    assert output.word_count > 0

Ready to Build?

Start building your custom components and extend FlowMason to fit your specific needs.