Skip to main content
Workflows are DAG-based automation pipelines that execute a fixed sequence of blocks (LLM calls, code execution, conditions, agent runs). Unlike agents that decide what to do dynamically, workflows follow a predetermined graph. They can be executed directly, streamed, or triggered externally via webhooks.

Common Patterns

Create a workflow, define its graph with PUT /api/workflows/{id}/graph, then execute. For external triggers, deploy the workflow and arm it to get a webhook URL. Webhooks are single-use — re-arm after each trigger. Use the streaming endpoint for real-time block execution updates.

CRUD

GET /api/workflows

List workflows.
limit
integer
Max results
offset
integer
Pagination offset
requests.get(f"{BASE_URL}/api/workflows", headers=headers)

POST /api/workflows

Create a workflow.
{ "name": "My Workflow" }
requests.post(f"{BASE_URL}/api/workflows", headers=headers, json={"name": "My Workflow"})

GET /api/workflows/

Get workflow with blocks and edges.
id
string
required
Workflow ID
requests.get(f"{BASE_URL}/api/workflows/{wf_id}", headers=headers)

PATCH /api/workflows/

Update workflow metadata.
id
string
required
Workflow ID
requests.patch(f"{BASE_URL}/api/workflows/{wf_id}", headers=headers, json={"name": "Renamed"})

DELETE /api/workflows/

Delete a workflow.
id
string
required
Workflow ID
requests.delete(f"{BASE_URL}/api/workflows/{wf_id}", headers=headers)

Graph

PUT /api/workflows//graph

Save the complete graph — blocks and edges.
id
string
required
Workflow ID
{
  "blocks": [
    { "id": "start", "type": "starter", "config": {}, "position": {"x": 0, "y": 0} },
    { "id": "out", "type": "response", "config": {}, "position": {"x": 300, "y": 0} }
  ],
  "edges": [
    { "source": "start", "target": "out" }
  ]
}
requests.put(f"{BASE_URL}/api/workflows/{wf_id}/graph", headers=headers, json={"blocks": [...], "edges": [...]})

Deploy

POST /api/workflows//deploy

Deploy the workflow (enables webhook triggering).
id
string
required
Workflow ID
requests.post(f"{BASE_URL}/api/workflows/{wf_id}/deploy", headers=headers)

POST /api/workflows//undeploy

Undeploy the workflow.
id
string
required
Workflow ID
requests.post(f"{BASE_URL}/api/workflows/{wf_id}/undeploy", headers=headers)

POST /api/workflows//arm

Arm the workflow’s webhook for a single external trigger. Opens a 10-minute window during which the webhook accepts exactly one call; after it fires or expires, re-arm.
id
string
required
Workflow ID
The response is {"ok": true, "armed_until": "<iso8601>"}not a webhook id/secret. The webhook_id and webhook_secret live on the webhook block’s config in the saved graph; fetch them with GET /api/workflows/{id} and read from the block whose type == "webhook".
response = requests.post(f"{BASE_URL}/api/workflows/{wf_id}/arm", headers=headers)
print(response.json())  # {"ok": true, "armed_until": "2026-01-01T00:00:00Z"}

Execution

POST /api/workflows//execute

Execute the workflow synchronously.
id
string
required
Workflow ID
variables
object
Map of input variable names → values. Defaults to {} when omitted.
variables is the canonical key. The route also accepts input as a legacy alias (data.get("variables", data.get("input", {}))) — prefer variables in new code.
{ "variables": { "text": "..." } }
requests.post(f"{BASE_URL}/api/workflows/{wf_id}/execute", headers=headers, json={"variables": {"text": "..."}})

POST /api/workflows//execute/stream

Execute with streaming SSE. Same body shape as /execute — pass variables (canonical) or input (legacy alias).
id
string
required
Workflow ID
variables
object
Map of input variable names → values. Defaults to {} when omitted.
requests.post(f"{BASE_URL}/api/workflows/{wf_id}/execute/stream", headers=headers, json={"variables": {"text": "..."}}, stream=True)

GET /api/workflows//executions

List execution history.
id
string
required
Workflow ID
requests.get(f"{BASE_URL}/api/workflows/{wf_id}/executions", headers=headers)

GET /api/workflows//executions//logs

Get per-block execution logs.
id
string
required
Workflow ID
eid
string
required
Execution ID
requests.get(f"{BASE_URL}/api/workflows/{wf_id}/executions/{exec_id}/logs", headers=headers)

Error Responses

Workflow routes return one of two body shapes:
  • Plain {"error": "<message>"} — list/create/patch/delete and the deploy/undeploy/arm endpoints.
  • Structured {"error": "<message>", "error_code": "<UPPER_SNAKE_CODE>"} (via the shared error_response helper) — GET /workflows/{id}, PUT /workflows/{id}/graph, and the synchronous /execute endpoint.
Streaming endpoints (/execute/stream) return HTTP 200 and emit errors as SSE data: {"type": "error", ...} events; only the timeout event includes error_code (EXECUTION_TIMEOUT).
Statuserror_codeDescription
400Missing required field (e.g. name, no fields to update, no JSON body)
400VALIDATION_ERRORGraph save: each block needs id/type, each edge needs source/target, block types must be in the engine registry, and edges must reference blocks in the same graph
404WORKFLOW_NOT_FOUNDNo workflow exists with the given ID (GET /workflows/{id}, PUT /graph, POST /execute)
404No workflow or execution exists with the given ID (other endpoints)
504EXECUTION_TIMEOUTSynchronous /execute exceeded its timeout
500EXECUTION_FAILEDSynchronous /execute raised an error