SDK - Custom Components
Build custom nodes, operators, and control flow components for FlowMason
FlowMason SDK
Extend FlowMason with custom components tailored to your specific workflows.
Extensibility Without Core Access
FlowMason's core is closed source, but the SDK provides everything you need to build powerful custom components. Create nodes, operators, and control flow components that integrate seamlessly with the FlowMason ecosystem.
Component Types
FlowMason has three component types, each with a specific purpose:
Nodes
AI-powered components that use LLM providers. Use for text generation, analysis, reasoning, and intelligent processing.
@node decorator Operators
Deterministic utility components for data transformation, API calls, validation, and integration tasks.
@operator decorator Control Flow
Components that manage execution flow: conditionals, loops, error handling, and routing.
@control_flow decorator Installation
Install the FlowMason SDK:
pip install flowmason Creating Custom Nodes
Nodes are AI-powered components that have access to LLM providers through the execution context. They're perfect for tasks requiring natural language understanding, generation, or reasoning.
Basic Node Structure
from flowmason_core.core.types import NodeInput, NodeOutput, Field
from flowmason_core.core.decorators import node
@node(
name="summarizer", # Unique identifier (snake_case)
category="reasoning", # For grouping in UI
description="Summarize text content",
icon="file-text", # Lucide icon name
color="#8B5CF6", # Violet for AI nodes
version="1.0.0",
tags=["summarization", "text", "llm"],
recommended_providers={
"anthropic": {
"model": "claude-3-5-sonnet-20241022",
"temperature": 0.3,
},
"openai": {
"model": "gpt-4o",
"temperature": 0.3,
},
},
default_provider="anthropic",
)
class SummarizerNode:
"""Summarize text content using AI."""
class Input(NodeInput):
text: str = Field(
description="The text to summarize",
examples=["A long article about climate change..."]
)
max_length: int = Field(
default=200,
ge=50,
le=1000,
description="Maximum summary length in words"
)
style: str = Field(
default="concise",
description="Summary style: 'concise', 'detailed', or 'bullet'"
)
class Output(NodeOutput):
summary: str = Field(description="The generated summary")
word_count: int = Field(description="Number of words in summary")
class Config:
requires_llm: bool = True
timeout_seconds: int = 60
max_retries: int = 3
async def execute(self, input: Input, context) -> Output:
"""Execute the summarization."""
# Access the LLM through context
llm = context.llm
prompt = f"""Summarize the following text in a {input.style} style.
Keep it under {input.max_length} words.
Text to summarize:
{input.text}
Summary:"""
response = await llm.generate_async(
prompt=prompt,
system_prompt="You are a professional summarizer.",
temperature=0.3,
)
summary = response.content.strip()
word_count = len(summary.split())
return self.Output(
summary=summary,
word_count=word_count
) Key Node Features
- Input class - Inherits from
NodeInput, defines typed inputs with validation - Output class - Inherits from
NodeOutput, defines the return schema - execute method - Async method that receives input and context
- context.llm - Access to configured LLM provider for AI operations
- context.logger - Logging utilities
- context.run_id - Current pipeline run identifier
Creating Custom Operators
Operators are deterministic components that don't use AI. They're ideal for data transformation, API integration, validation, and any operation where the same input always produces the same output.
Basic Operator Structure
from typing import Dict, Any, Optional, List
from flowmason_core.core.types import OperatorInput, OperatorOutput, Field
from flowmason_core.core.decorators import operator
@operator(
name="json_transformer",
category="transformers",
description="Transform JSON data using JSONPath expressions",
icon="code",
color="#3B82F6", # Blue for operators
version="1.0.0",
tags=["json", "transform", "data"],
)
class JsonTransformerOperator:
"""Transform JSON data structures."""
class Input(OperatorInput):
data: Dict[str, Any] = Field(
description="Input JSON data to transform"
)
mapping: Dict[str, str] = Field(
description="Field mapping: {output_key: input_path}"
)
defaults: Optional[Dict[str, Any]] = Field(
default=None,
description="Default values for missing fields"
)
class Output(OperatorOutput):
result: Dict[str, Any] = Field(
description="Transformed data"
)
fields_mapped: int = Field(
description="Number of fields successfully mapped"
)
class Config:
deterministic: bool = True # Same input = same output
timeout_seconds: int = 30
async def execute(self, input: Input, context) -> Output:
"""Execute the transformation."""
result = {}
fields_mapped = 0
for output_key, input_path in input.mapping.items():
value = self._get_nested_value(input.data, input_path)
if value is not None:
result[output_key] = value
fields_mapped += 1
elif input.defaults and output_key in input.defaults:
result[output_key] = input.defaults[output_key]
return self.Output(
result=result,
fields_mapped=fields_mapped
)
def _get_nested_value(self, data: dict, path: str):
"""Get value from nested dict using dot notation."""
keys = path.split('.')
value = data
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return None
return value HTTP Request Operator Example
from typing import Dict, Any, Optional, Literal
from flowmason_core.core.types import OperatorInput, OperatorOutput, Field
from flowmason_core.core.decorators import operator
@operator(
name="api_client",
category="connectors",
description="Make HTTP requests to external APIs",
icon="globe",
color="#3B82F6",
tags=["http", "api", "rest", "integration"],
)
class ApiClientOperator:
"""Make HTTP requests to APIs."""
class Input(OperatorInput):
url: str = Field(description="API endpoint URL")
method: Literal["GET", "POST", "PUT", "DELETE"] = Field(
default="GET"
)
headers: Optional[Dict[str, str]] = Field(default=None)
body: Optional[Dict[str, Any]] = Field(default=None)
timeout: int = Field(default=30, ge=1, le=300)
class Output(OperatorOutput):
status_code: int
body: Any
success: bool
async def execute(self, input: Input, context) -> Output:
import httpx
async with httpx.AsyncClient(timeout=input.timeout) as client:
response = await client.request(
method=input.method,
url=input.url,
headers=input.headers,
json=input.body,
)
try:
body = response.json()
except:
body = response.text
return self.Output(
status_code=response.status_code,
body=body,
success=response.is_success,
) Creating Control Flow Components
Control flow components manage pipeline execution flow. They return directives that tell the executor which stages to execute, skip, or loop over.
Conditional Component Example
from typing import List, Optional, Any
from flowmason_core.core.types import (
ControlFlowInput, ControlFlowOutput,
ControlFlowDirective, ControlFlowType, Field
)
from flowmason_core.core.decorators import control_flow
@control_flow(
name="conditional",
description="Execute branches based on a condition",
control_flow_type="conditional",
icon="git-branch",
color="#EC4899", # Pink for control flow
tags=["branching", "if-else", "conditional"],
)
class ConditionalComponent:
"""Branch execution based on conditions."""
class Input(ControlFlowInput):
condition: bool = Field(
description="The condition to evaluate"
)
true_branch: List[str] = Field(
description="Stage IDs to execute if true"
)
false_branch: List[str] = Field(
default_factory=list,
description="Stage IDs to execute if false"
)
class Output(ControlFlowOutput):
branch_taken: str = Field(
description="Which branch was taken"
)
directive: ControlFlowDirective
async def execute(self, input: Input, context) -> Output:
if input.condition:
# Execute true branch, skip false branch
return self.Output(
branch_taken="true",
directive=ControlFlowDirective(
directive_type=ControlFlowType.CONDITIONAL,
branch_taken="true",
execute_stages=input.true_branch,
skip_stages=input.false_branch,
)
)
else:
# Execute false branch, skip true branch
return self.Output(
branch_taken="false",
directive=ControlFlowDirective(
directive_type=ControlFlowType.CONDITIONAL,
branch_taken="false",
execute_stages=input.false_branch,
skip_stages=input.true_branch,
)
) Field Configuration
The Field() function provides rich configuration for input/output fields:
from flowmason_core.core.types import NodeInput, Field
class Input(NodeInput):
# Required field with description
text: str = Field(
description="Text to process",
examples=["Hello world", "Sample text"]
)
# Optional field with default
language: str = Field(
default="en",
description="Language code"
)
# Numeric constraints
temperature: float = Field(
default=0.7,
ge=0.0, # Greater than or equal
le=2.0, # Less than or equal
description="Sampling temperature"
)
max_tokens: int = Field(
default=1000,
gt=0, # Greater than
lt=100000, # Less than
)
# String constraints
name: str = Field(
min_length=1,
max_length=100,
pattern=r"^[a-zA-Z]+$", # Regex pattern
)
# List with factory default
tags: List[str] = Field(
default_factory=list,
description="Optional tags"
) Execution Context
The context parameter in execute() provides access to runtime utilities:
| Property | Available In | Description |
|---|---|---|
context.llm | Nodes only | LLM provider for AI operations |
context.logger | All | Logging interface |
context.run_id | All | Current pipeline run ID |
context.stage_id | All | Current stage identifier |
context.variables | All | Pipeline variables dict |
context.secrets | All | Secure secrets access |
Packaging Components
Package your custom components as .fmpkg files for distribution:
Package Structure
my-components/
├── package.json # Package metadata
├── nodes/
│ ├── __init__.py
│ └── my_node.py
├── operators/
│ ├── __init__.py
│ └── my_operator.py
└── requirements.txt # Python dependencies package.json
{
"name": "my-components",
"version": "1.0.0",
"description": "Custom FlowMason components",
"author": "Your Name",
"components": {
"nodes": ["nodes.my_node.MyNode"],
"operators": ["operators.my_operator.MyOperator"]
}
} Build Package
fm package build my-components/
# Creates: my-components-1.0.0.fmpkg
fm package install my-components-1.0.0.fmpkg Best Practices
Do
- • Use descriptive names and descriptions
- • Add examples to Field definitions
- • Include validation constraints
- • Handle errors gracefully
- • Use context.logger for debugging
- • Keep nodes focused on single tasks
Don't
- • Use sync operations in async execute
- • Store state between executions
- • Access files outside working directory
- • Ignore timeout configurations
- • Skip input validation
- • Use global variables
Testing Components
FlowMason provides testing utilities for component development:
import pytest
from flowmason_core.testing import MockContext, run_component
from my_components.nodes.summarizer import SummarizerNode
@pytest.mark.asyncio
async def test_summarizer_node():
# Create mock context with mock LLM
context = MockContext(
llm_response="This is a summary of the text."
)
# Create input
input_data = SummarizerNode.Input(
text="A very long article about technology...",
max_length=100,
style="concise"
)
# Run the component
node = SummarizerNode()
output = await node.execute(input_data, context)
# Assertions
assert output.summary == "This is a summary of the text."
assert output.word_count > 0 Ready to Build?
Start building your custom components and extend FlowMason to fit your specific needs.