FLOW MASON

Operators

Learn about deterministic operators for data transformation, HTTP calls, and utility tasks in FlowMason.

An Operator is a deterministic, non-AI component for data transformation and utility tasks. Operators don’t require an LLM - they perform predictable operations like HTTP calls, JSON parsing, filtering, and validation.

Characteristics

PropertyValue
Decorator@operator
Requires LLMNo
Default timeout30 seconds
DeterministicYes (same input = same output)

Built-in Operators

Core Operators

OperatorDescriptionUse Case
http-requestMake HTTP callsAPI integration
json-transformJMESPath queriesData extraction
filterFilter collectionsData filtering
loopIterate over itemsBatch processing
schema-validateJSON Schema validationInput validation
variable-setSet context variablesState management
loggerEmit logsDebugging

HTTP Request

Make HTTP calls to external APIs:

{
  "id": "fetch-data",
  "component": "http-request",
  "config": {
    "url": "https://api.example.com/data",
    "method": "GET",
    "headers": {
      "Authorization": "Bearer {{secrets.API_KEY}}"
    },
    "timeout": 30
  }
}

POST with Body

{
  "id": "create-record",
  "component": "http-request",
  "config": {
    "url": "https://api.example.com/records",
    "method": "POST",
    "headers": {
      "Content-Type": "application/json"
    },
    "body": {
      "name": "{{input.name}}",
      "email": "{{input.email}}"
    }
  }
}

JSON Transform

Transform data using JMESPath expressions:

{
  "id": "extract-users",
  "component": "json-transform",
  "config": {
    "data": "{{stages.fetch.output.body}}",
    "expression": "users[?active==`true`].{id: id, name: name}"
  }
}

Common Expressions

ExpressionDescription
items[0]First item
items[-1]Last item
items[*].nameAll names
items[?status=='active']Filter by status
{total: length(items)}Count items

Filter

Filter collections based on conditions:

{
  "id": "filter-valid",
  "component": "filter",
  "config": {
    "items": "{{input.records}}",
    "field": "score",
    "operator": "gt",
    "value": 0.8
  }
}

Filter Operators

OperatorDescription
eqEqual to
neNot equal to
gtGreater than
ltLess than
gteGreater than or equal
lteLess than or equal
containsString contains

Schema Validate

Validate data against JSON Schema:

{
  "id": "validate-input",
  "component": "schema-validate",
  "config": {
    "data": "{{input}}",
    "schema": {
      "type": "object",
      "properties": {
        "email": { "type": "string", "format": "email" },
        "age": { "type": "integer", "minimum": 0 }
      },
      "required": ["email"]
    },
    "strict": true
  }
}

Variable Set

Set context variables for later use:

{
  "id": "set-config",
  "component": "variable-set",
  "config": {
    "variables": {
      "max_retries": 3,
      "timeout": "{{input.timeout || 30}}"
    }
  }
}

Access later with {{context.variable_name}}.

Logger

Emit logs for debugging:

{
  "id": "log-result",
  "component": "logger",
  "config": {
    "level": "info",
    "message": "Processed {{len(input.items)}} items",
    "data": {
      "count": "{{len(input.items)}}",
      "status": "{{stages.process.output.status}}"
    }
  }
}

Creating Custom Operators

Define custom operators in Python:

from flowmason_core.core import operator, Field
from flowmason_core.core.types import OperatorInput, OperatorOutput
from typing import List

@operator(
    name="filter-items",
    description="Filter a list based on a condition",
    category="transform",
    timeout=30
)
class FilterItemsOperator:
    class Input(OperatorInput):
        items: List[dict] = Field(description="List of items")
        field: str = Field(description="Field to check")
        value: Any = Field(description="Value to match")
        operator: str = Field(default="eq")

    class Output(OperatorOutput):
        filtered: List[dict]
        count: int

    async def execute(self, input: Input, context) -> Output:
        filtered = [
            item for item in input.items
            if self._matches(item.get(input.field), input.value, input.operator)
        ]
        return self.Output(filtered=filtered, count=len(filtered))

Operators vs Nodes

AspectOperatorNode
LLM RequiredNoYes
DeterministicYesNo
Default Timeout30s60s
RetriesNot applicable3 by default
Use CaseData transformationAI tasks

Best Practices

  1. Keep it simple - Operators should do one thing well
  2. Handle errors - Network calls can fail
  3. Validate inputs - Use Field constraints
  4. Return useful metadata - Include timing, counts, etc.
  5. Use async - For I/O operations like HTTP calls
  6. Consider timeouts - Set appropriate limits