FLOW MASON

Real-time Collaboration

Edit pipelines together with live cursors, presence awareness, chat, and synchronized changes.

Beta Feature - This feature is currently under heavy testing. For early access, contact [email protected].

FlowMason Studio supports real-time collaborative editing of pipelines with multiple users, similar to Google Docs or Figma.

Overview

Real-time Collaboration provides:

  • Live Cursors - See other users’ cursor positions in real-time
  • Presence Awareness - Know who’s online and what they’re editing
  • Synchronized Editing - Changes propagate instantly to all participants
  • Conflict Resolution - Automatic handling of concurrent edits
  • Element Locking - Prevent conflicts on specific elements
  • Chat & Comments - In-context communication
  • Undo/Redo - Per-user undo history
  • Activity Log - Track all session changes

Quick Start

Create a Session

POST /api/v1/collaboration/sessions
Content-Type: application/json

{
  "pipeline_id": "my-pipeline-id",
  "max_participants": 10,
  "auto_save": true
}

Response:

{
  "session": {
    "id": "session_abc123",
    "pipeline_id": "my-pipeline-id",
    "is_active": true,
    "participants": [
      {
        "user_id": "user_owner",
        "username": "Pipeline Owner",
        "role": "owner",
        "status": "online",
        "color": "#ef4444"
      }
    ]
  },
  "join_url": "/collaborate/session_abc123",
  "invite_code": "ABC12345"
}

Join a Session

POST /api/v1/collaboration/sessions/join
Content-Type: application/json

{
  "invite_code": "ABC12345",
  "username": "Collaborator"
}

Response:

{
  "session": {...},
  "user": {
    "user_id": "user_123",
    "username": "Collaborator",
    "role": "editor",
    "color": "#3b82f6"
  },
  "token": "ws_token_..."
}

WebSocket Connection

Connect for real-time updates:

ws://localhost:8999/api/v1/collaboration/ws/session_abc123

Message Protocol

Send Cursor Update:

{
  "type": "cursor",
  "x": 250,
  "y": 150,
  "selected_stage": "generator_1"
}

Send Edit:

{
  "type": "edit",
  "operation": "update_stage",
  "target_id": "generator_1",
  "data": {"config": {"prompt": "New prompt"}},
  "base_version": 5
}

Send Chat:

{
  "type": "chat",
  "content": "Let's add a filter stage here",
  "mentions": ["user_123"]
}

Events Received

User Joined:

{
  "type": "user_joined",
  "user_id": "user_456",
  "data": {
    "user": {
      "username": "New User",
      "color": "#22c55e"
    }
  }
}

Cursor Move:

{
  "type": "cursor_move",
  "user_id": "user_456",
  "data": {
    "cursor": {"x": 300, "y": 200},
    "selected_stage": "filter_1"
  }
}

Edit Applied:

{
  "type": "edit",
  "user_id": "user_456",
  "data": {
    "change": {
      "operation": "update_stage",
      "target_id": "generator_1",
      "version": 6
    }
  }
}

Presence

Get Online Users

GET /api/v1/collaboration/sessions/{session_id}/users
[
  {
    "user_id": "user_owner",
    "username": "Pipeline Owner",
    "role": "owner",
    "status": "online",
    "cursor": {
      "position": {"x": 100, "y": 200},
      "selected_stage": "generator_1"
    },
    "color": "#ef4444"
  }
]

Edit Operations

OperationDescription
add_stageAdd a new stage
remove_stageRemove a stage
update_stageUpdate stage config
move_stageChange stage position
add_connectionAdd a dependency
remove_connectionRemove a dependency
update_pipelineUpdate metadata

Send an Edit

POST /api/v1/collaboration/sessions/{session_id}/edits
Content-Type: application/json

{
  "operation": "add_stage",
  "data": {
    "stage": {
      "id": "new_stage_1",
      "component_type": "filter",
      "config": {}
    }
  },
  "base_version": 5
}

Conflict Handling

If two users edit the same element:

{
  "success": false,
  "conflict": {
    "change_a": {...},
    "change_b": {...},
    "conflict_type": "concurrent_edit"
  }
}

Locking

Prevent conflicts by locking elements:

Acquire Lock

POST /api/v1/collaboration/sessions/{session_id}/locks
Content-Type: application/json

{
  "target_id": "generator_1",
  "target_type": "stage",
  "duration": 300,
  "reason": "Editing configuration"
}

Release Lock

DELETE /api/v1/collaboration/sessions/{session_id}/locks/generator_1

Chat & Comments

Send Chat Message

POST /api/v1/collaboration/sessions/{session_id}/chat
Content-Type: application/json

{
  "content": "Should we add error handling here?",
  "mentions": ["user_owner"]
}

Add Comment to Stage

POST /api/v1/collaboration/sessions/{session_id}/comments
Content-Type: application/json

{
  "target_type": "stage",
  "target_id": "generator_1",
  "content": "This prompt needs to be more specific"
}

Reply to Comment

POST /api/v1/collaboration/sessions/{session_id}/comments/{id}/replies
Content-Type: application/json

{
  "content": "Good point, I'll update it"
}

Resolve Comment

POST /api/v1/collaboration/sessions/{session_id}/comments/{id}/resolve

Invitations

POST /api/v1/collaboration/sessions/{session_id}/invites
Content-Type: application/json

{
  "email": "[email protected]",
  "role": "editor",
  "message": "Join me to work on this pipeline!"
}

Activity Log

GET /api/v1/collaboration/sessions/{session_id}/activity?limit=50
[
  {
    "activity_type": "session_created",
    "description": "Pipeline Owner created the session",
    "timestamp": "2024-01-15T10:00:00Z"
  },
  {
    "activity_type": "user_joined",
    "description": "Collaborator joined",
    "timestamp": "2024-01-15T10:05:00Z"
  },
  {
    "activity_type": "stage_updated",
    "description": "Collaborator updated generator_1",
    "timestamp": "2024-01-15T10:10:00Z"
  }
]

User Roles

RoleCapabilities
ownerFull access, manage roles, end session
editorEdit pipeline, chat, comment
viewerRead-only, chat and comment

React Integration

function CollaborativeEditor({ sessionId }) {
  const ws = useRef<WebSocket>();
  const [users, setUsers] = useState([]);
  const [cursors, setCursors] = useState({});

  useEffect(() => {
    ws.current = new WebSocket(
      `ws://localhost:8999/api/v1/collaboration/ws/${sessionId}`
    );

    ws.current.onmessage = (event) => {
      const msg = JSON.parse(event.data);

      switch (msg.type) {
        case 'user_joined':
          setUsers(prev => [...prev, msg.data.user]);
          break;
        case 'cursor_move':
          setCursors(prev => ({
            ...prev,
            [msg.user_id]: msg.data.cursor
          }));
          break;
        case 'edit':
          applyEdit(msg.data.change);
          break;
      }
    };

    return () => ws.current?.close();
  }, [sessionId]);

  return (
    <div className="collaborative-editor">
      {/* User avatars */}
      <div className="user-list">
        {users.map(user => (
          <div
            key={user.user_id}
            style={{ borderColor: user.color }}
          >
            {user.username[0]}
          </div>
        ))}
      </div>

      {/* Canvas with cursors */}
      <div className="canvas">
        {Object.entries(cursors).map(([userId, cursor]) => (
          <div
            key={userId}
            className="cursor"
            style={{
              left: cursor.position.x,
              top: cursor.position.y,
              backgroundColor: cursor.color
            }}
          />
        ))}
      </div>
    </div>
  );
}

Best Practices

  1. Use locking for complex edits - Prevent conflicts on stages you’re actively editing
  2. Communicate via chat - Coordinate changes with team members
  3. Use comments for async feedback - Leave notes for offline collaborators
  4. Review activity log - Understand what changed and when
  5. Save frequently - Auto-save is enabled by default