Component Types
Understanding the three types of components in FlowMason - Nodes, Operators, and Control Flow.
FlowMason uses three distinct component types, each designed for specific purposes in your pipelines.
flowchart TD
subgraph Components["18 Built-in Components"]
direction TB
subgraph Nodes["5 AI Nodes"]
G["generator"]
C["critic"]
I["improver"]
SE["selector"]
SY["synthesizer"]
end
subgraph Operators["7 Operators"]
HR["http-request"]
JT["json-transform"]
F["filter"]
SV["schema-validate"]
VS["variable-set"]
L["logger"]
LE["loop"]
end
subgraph Control["6 Control Flow"]
CO["conditional"]
FE["foreach"]
TC["trycatch"]
RO["router"]
SP["subpipeline"]
RE["return"]
end
end
Nodes (AI-Powered)
Nodes are components that leverage Large Language Models (LLMs) for intelligent processing. Use nodes when you need:
- Text generation or completion
- Content analysis and extraction
- Decision making based on context
- Creative or reasoning tasks
Defining a Node
from flowmason_core import node, NodeInput, NodeOutput, Field
@node(
name="summarizer",
description="Summarize text content",
category="reasoning",
timeout=60
)
class SummarizerNode:
class Input(NodeInput):
text: str = Field(description="Text to summarize")
max_length: int = Field(default=200, ge=50, le=1000)
class Output(NodeOutput):
summary: str
word_count: int
async def execute(self, input: Input, context) -> Output:
response = await context.llm.generate(
prompt=f"Summarize in {input.max_length} characters:\n{input.text}"
)
return self.Output(
summary=response.text,
word_count=len(response.text.split())
)
Built-in Nodes
FlowMason includes 5 pre-built AI nodes:
generator
General-purpose text generation for any prompt.
{
"id": "generate-content",
"component": "generator",
"config": {
"prompt": "Write a product description for {{input.product}}",
"max_tokens": 500,
"temperature": 0.7
}
}
critic
Evaluate and score content with structured feedback.
{
"id": "review-content",
"component": "critic",
"config": {
"content": "{{stages.generate.output.text}}",
"criteria": ["clarity", "accuracy", "engagement"],
"scoring": "1-10"
}
}
improver
Refine content based on feedback or criteria.
{
"id": "improve-draft",
"component": "improver",
"config": {
"content": "{{stages.generate.output.text}}",
"feedback": "{{stages.critic.output.feedback}}",
"focus": "clarity and conciseness"
}
}
selector
Choose the best option from multiple alternatives.
{
"id": "select-best",
"component": "selector",
"config": {
"options": "{{stages.generate-variants.output.variants}}",
"criteria": "Most engaging and accurate",
"return_reasoning": true
}
}
synthesizer
Combine multiple inputs into a cohesive output.
{
"id": "combine-sources",
"component": "synthesizer",
"config": {
"sources": [
"{{stages.research.output.findings}}",
"{{stages.interview.output.quotes}}",
"{{stages.data.output.statistics}}"
],
"format": "executive summary"
}
}
Operators (Deterministic)
Operators perform deterministic operations without LLM calls. Use operators for:
- Data transformation
- HTTP requests
- Validation
- Filtering and formatting
Defining an Operator
from flowmason_core import operator, OperatorInput, OperatorOutput, Field
@operator(
name="json-transform",
description="Transform JSON using JMESPath",
timeout=10
)
class JsonTransformOperator:
class Input(OperatorInput):
data: dict = Field(description="Input data")
expression: str = Field(description="JMESPath expression")
class Output(OperatorOutput):
result: Any
async def execute(self, input: Input, context) -> Output:
import jmespath
result = jmespath.search(input.expression, input.data)
return self.Output(result=result)
Built-in Operators
FlowMason includes 7 operators for data manipulation:
http-request
Make HTTP requests to external APIs.
{
"id": "fetch-data",
"component": "http-request",
"config": {
"url": "{{input.api_url}}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{env.API_KEY}}"
},
"timeout": 30
}
}
json-transform
Transform data using JMESPath expressions.
{
"id": "extract-items",
"component": "json-transform",
"config": {
"data": "{{stages.fetch.output.body}}",
"expression": "data.items[?status=='active']"
}
}
filter
Filter collections based on conditions.
{
"id": "filter-results",
"component": "filter",
"config": {
"items": "{{stages.fetch.output.items}}",
"condition": "item.score > 0.8",
"limit": 10
}
}
schema-validate
Validate data against JSON Schema.
{
"id": "validate-input",
"component": "schema-validate",
"config": {
"data": "{{input}}",
"schema": {
"type": "object",
"required": ["url", "format"],
"properties": {
"url": { "type": "string", "format": "uri" },
"format": { "enum": ["json", "xml"] }
}
}
}
}
variable-set
Set or update pipeline variables.
{
"id": "set-config",
"component": "variable-set",
"config": {
"variables": {
"processed_count": "{{stages.filter.output.count}}",
"timestamp": "{{now()}}"
}
}
}
logger
Log messages for debugging and monitoring.
{
"id": "log-progress",
"component": "logger",
"config": {
"level": "info",
"message": "Processed {{stages.transform.output.count}} items",
"data": {
"stage": "transform",
"duration": "{{stages.transform.duration_ms}}"
}
}
}
loop
Execute stages in a loop with counters.
{
"id": "retry-loop",
"component": "loop",
"config": {
"count": 3,
"stage": "attempt-fetch",
"break_on": "{{current.output.success}}"
}
}
Control Flow
Control flow components manage pipeline execution logic. FlowMason provides 6 built-in control flow types:
flowchart LR
subgraph Conditional
A1[condition] -->|true| B1[then_stage]
A1 -->|false| C1[else_stage]
end
subgraph ForEach
A2[items] --> B2[stage 1]
B2 --> C2[stage 2]
C2 --> D2[stage N]
end
subgraph TryCatch
A3[try_stage] -->|success| B3[continue]
A3 -->|error| C3[catch_stage]
end
Conditional
{
"id": "check-length",
"component": "conditional",
"config": {
"condition": "{{stages.extract.output.length}} > 1000",
"then_stage": "summarize",
"else_stage": "format"
}
}
ForEach
{
"id": "process-items",
"component": "foreach",
"config": {
"items": "{{stages.fetch.output.items}}",
"stage": "process-item",
"parallel": true,
"max_concurrency": 10
}
}
TryCatch
{
"id": "safe-fetch",
"component": "trycatch",
"config": {
"try_stage": "fetch-data",
"catch_stage": "handle-error"
}
}
Router
{
"id": "route-request",
"component": "router",
"config": {
"expression": "{{input.type}}",
"routes": {
"article": "process-article",
"video": "process-video",
"default": "process-generic"
}
}
}
SubPipeline
Run a nested pipeline as a stage:
{
"id": "nested-process",
"component": "subpipeline",
"config": {
"pipeline": "pipelines/child.pipeline.json",
"input": { "data": "{{stages.prepare.output}}" }
}
}
Return
Early exit from pipeline execution:
{
"id": "early-exit",
"component": "return",
"config": {
"output": "{{stages.cached.output}}"
}
}
Custom Control Flow
Define custom control flow with the @control_flow decorator:
from flowmason_core import control_flow, ControlFlowInput, ControlFlowOutput
@control_flow(
name="retry-with-backoff",
description="Retry a stage with exponential backoff"
)
class RetryWithBackoffControl:
class Input(ControlFlowInput):
stage: str
max_retries: int = 3
base_delay: float = 1.0
class Output(ControlFlowOutput):
attempts: int
succeeded: bool
async def execute(self, input: Input, context) -> Output:
# Custom retry logic with backoff
...
Choosing the Right Component
| Use Case | Component Type |
|---|---|
| Generate or analyze text | Node |
| Transform data | Operator |
| Make API calls | Operator |
| Branch execution | Control Flow |
| Loop over items | Control Flow |
| Handle errors | Control Flow |