FLOW MASON
advanced 35 min

Working with Components

Create custom nodes and operators to extend FlowMason's capabilities for your specific needs.

In this final tutorial, you’ll create custom components - both AI-powered nodes and deterministic operators - to extend FlowMason for your specific use cases.

What You’ll Learn

Understanding Component Types

TypePurposeUses LLM
NodeAI-powered operationsYes
OperatorDeterministic utilitiesNo
Control FlowExecution logicNo

Part 1: Creating a Custom Node

Let’s create a keyword extractor node that uses AI to analyze text.

Step 1: Create the Node File

Create components/nodes/keyword_extractor.py:

from flowmason_core import node, NodeInput, NodeOutput, Field
from typing import Optional

@node(
    name="keyword-extractor",
    description="Extract keywords from text using AI",
    category="analysis",
    timeout=30
)
class KeywordExtractorNode:
    """AI-powered keyword extraction component."""

    class Input(NodeInput):
        text: str = Field(
            description="Text to analyze for keywords"
        )
        max_keywords: int = Field(
            default=10,
            ge=1,
            le=50,
            description="Maximum number of keywords to extract"
        )
        include_scores: bool = Field(
            default=False,
            description="Include relevance scores"
        )

    class Output(NodeOutput):
        keywords: list[str]
        scores: Optional[dict[str, float]] = None

    async def execute(self, input: Input, context) -> Output:
        prompt = f"""Extract the {input.max_keywords} most important keywords from this text.

Text: {input.text}

Return as JSON: {{"keywords": ["word1", "word2"], "scores": {{"word1": 0.95, "word2": 0.87}}}}"""

        response = await context.llm.generate(prompt=prompt)
        data = response.parse_json()

        return self.Output(
            keywords=data["keywords"][:input.max_keywords],
            scores=data.get("scores") if input.include_scores else None
        )

Step 2: Register the Component

The component is automatically registered based on your flowmason.json:

{
  "components": {
    "nodes": ["components/nodes/**/*.py"]
  }
}

Step 3: Use in a Pipeline

{
  "id": "extract-keywords",
  "component": "keyword-extractor",
  "config": {
    "text": "{{stages.fetch.output.content}}",
    "max_keywords": 5,
    "include_scores": true
  }
}

Part 2: Creating a Custom Operator

Operators are for deterministic operations without AI.

Step 1: Create the Operator File

Create components/operators/text_cleaner.py:

from flowmason_core import operator, OperatorInput, OperatorOutput, Field
import re
from typing import Optional

@operator(
    name="text-cleaner",
    description="Clean and normalize text content",
    category="text",
    timeout=10
)
class TextCleanerOperator:
    """Deterministic text cleaning utility."""

    class Input(OperatorInput):
        text: str = Field(description="Raw text to clean")
        remove_html: bool = Field(default=True)
        remove_urls: bool = Field(default=True)
        normalize_whitespace: bool = Field(default=True)
        max_length: Optional[int] = Field(
            default=None,
            description="Truncate to max length"
        )

    class Output(OperatorOutput):
        cleaned_text: str
        original_length: int
        final_length: int

    async def execute(self, input: Input, context) -> Output:
        result = input.text
        original_length = len(result)

        # Remove HTML tags
        if input.remove_html:
            result = re.sub(r'<[^>]+>', '', result)

        # Remove URLs
        if input.remove_urls:
            result = re.sub(r'https?://\S+', '', result)

        # Normalize whitespace
        if input.normalize_whitespace:
            result = ' '.join(result.split())

        # Truncate if needed
        if input.max_length and len(result) > input.max_length:
            result = result[:input.max_length] + '...'

        return self.Output(
            cleaned_text=result.strip(),
            original_length=original_length,
            final_length=len(result)
        )

Step 2: Use in a Pipeline

{
  "id": "clean-content",
  "component": "text-cleaner",
  "depends_on": ["fetch"],
  "config": {
    "text": "{{stages.fetch.output.body}}",
    "remove_html": true,
    "max_length": 5000
  }
}

Part 3: Built-in Components Reference

AI Nodes

ComponentPurpose
generatorGeneral text generation
criticEvaluate and score content
improverRefine based on feedback
selectorChoose from options
synthesizerCombine multiple sources

Operators

ComponentPurpose
http-requestMake HTTP calls
json-transformTransform with JMESPath
filterFilter collections
schema-validateValidate data
variable-setSet variables
loggerLog messages

Control Flow

ComponentPurpose
conditionalIf/else branching
routerSwitch/case routing
foreachLoop over collections
trycatchError handling
subpipelineNested pipelines
returnEarly exit

Part 4: Testing Your Components

Create a test file tests/test_keyword_extractor.py:

import pytest
from components.nodes.keyword_extractor import KeywordExtractorNode

@pytest.mark.asyncio
async def test_extracts_keywords():
    node = KeywordExtractorNode()

    # Mock context with LLM
    class MockLLM:
        async def generate(self, prompt):
            class Response:
                def parse_json(self):
                    return {
                        "keywords": ["ai", "machine", "learning"],
                        "scores": {"ai": 0.95, "machine": 0.85}
                    }
            return Response()

    class MockContext:
        llm = MockLLM()

    input = node.Input(
        text="AI and machine learning are transforming...",
        max_keywords=3,
        include_scores=True
    )

    output = await node.execute(input, MockContext())

    assert len(output.keywords) == 3
    assert "ai" in output.keywords
    assert output.scores is not None

Part 5: Packaging Components

Share your components as a package:

fm pack --name my-components --version 1.0.0

This creates my-components-1.0.0.fmpkg containing:

Others can install it:

fm install my-components-1.0.0.fmpkg

Best Practices

For Nodes

  1. Clear prompts - Write explicit, structured prompts
  2. Parse carefully - Validate AI responses
  3. Handle failures - AI can return unexpected formats
  4. Set timeouts - AI calls can be slow

For Operators

  1. Keep deterministic - Same input = same output
  2. Validate inputs - Use Field constraints
  3. Handle edge cases - Empty strings, null values
  4. Log appropriately - Help with debugging

General

  1. Document well - Add descriptions to fields
  2. Use types - Leverage Python type hints
  3. Test thoroughly - Unit test your components
  4. Version carefully - Semantic versioning matters

Congratulations!

You’ve completed the FlowMason tutorial series! You now know how to:

Next Steps

Thank you for learning FlowMason!