Run Workflows
Integrate Cloudflare Workflows with Agents for durable, multi-step background processing while Agents handle real-time communication.
Extend AgentWorkflow for typed access to the originating Agent:
import { AgentWorkflow } from "agents/workflows";export class ProcessingWorkflow extends AgentWorkflow { async run(event, step) { const params = event.payload;
const result = await step.do("process-data", async () => { return processData(params.data); });
// Non-durable: progress reporting (may repeat on retry) await this.reportProgress({ step: "process", status: "complete", percent: 0.5, });
// Broadcast to connected WebSocket clients this.broadcastToClients({ type: "update", taskId: params.taskId });
await step.do("save-results", async () => { // Call Agent methods via RPC await this.agent.saveResult(params.taskId, result); });
// Durable: idempotent, won't repeat on retry await step.reportComplete(result); return result; }}import { AgentWorkflow } from "agents/workflows";import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";import type { MyAgent } from "./agent";
type TaskParams = { taskId: string; data: string };
export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> { async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) { const params = event.payload;
const result = await step.do("process-data", async () => { return processData(params.data); });
// Non-durable: progress reporting (may repeat on retry) await this.reportProgress({ step: "process", status: "complete", percent: 0.5, });
// Broadcast to connected WebSocket clients this.broadcastToClients({ type: "update", taskId: params.taskId });
await step.do("save-results", async () => { // Call Agent methods via RPC await this.agent.saveResult(params.taskId, result); });
// Durable: idempotent, won't repeat on retry await step.reportComplete(result); return result; }}Use runWorkflow() to start and track workflows:
import { Agent } from "agents";
export class MyAgent extends Agent { async startTask(taskId, data) { const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", { taskId, data, }); return { instanceId }; }
async onWorkflowProgress(workflowName, instanceId, progress) { this.broadcast(JSON.stringify({ type: "workflow-progress", progress })); }
async onWorkflowComplete(workflowName, instanceId, result) { console.log(`Workflow completed:`, result); }
async saveResult(taskId, result) { this .sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`; }}import { Agent } from "agents";
export class MyAgent extends Agent { async startTask(taskId: string, data: string) { const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", { taskId, data, }); return { instanceId }; }
async onWorkflowProgress( workflowName: string, instanceId: string, progress: unknown, ) { this.broadcast(JSON.stringify({ type: "workflow-progress", progress })); }
async onWorkflowComplete( workflowName: string, instanceId: string, result?: unknown, ) { console.log(`Workflow completed:`, result); }
async saveResult(taskId: string, result: unknown) { this .sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`; }}{ "name": "my-app", "main": "src/index.ts", "compatibility_date": "2025-02-11", "durable_objects": { "bindings": [{ "name": "MY_AGENT", "class_name": "MyAgent" }], }, "workflows": [ { "name": "processing-workflow", "binding": "PROCESSING_WORKFLOW", "class_name": "ProcessingWorkflow", }, ], "migrations": [{ "tag": "v1", "new_sqlite_classes": ["MyAgent"] }],}name = "my-app"main = "src/index.ts"compatibility_date = "2025-02-11"
[[durable_objects.bindings]]name = "MY_AGENT"class_name = "MyAgent"
[[workflows]]name = "processing-workflow"binding = "PROCESSING_WORKFLOW"class_name = "ProcessingWorkflow"
[[migrations]]tag = "v1"new_sqlite_classes = [ "MyAgent" ]Base class for Workflows that integrate with Agents.
| Parameter | Description |
|---|---|
AgentType | The Agent class type for typed RPC |
Params | Parameters passed to the workflow |
ProgressType | Type for progress reporting (defaults to DefaultProgress) |
Env | Environment type (defaults to Cloudflare.Env) |
| Property | Type | Description |
|---|---|---|
agent | Stub | Typed stub for calling Agent methods |
instanceId | string | The workflow instance ID |
workflowName | string | The workflow binding name |
env | Env | Environment bindings |
These methods may repeat on retry. Use for lightweight, frequent updates.
Report progress to the Agent. Triggers onWorkflowProgress callback.
await this.reportProgress({ step: "processing", status: "running", percent: 0.5,});await this.reportProgress({ step: "processing", status: "running", percent: 0.5,});Broadcast a message to all WebSocket clients connected to the Agent.
this.broadcastToClients({ type: "update", data: result });this.broadcastToClients({ type: "update", data: result });Wait for an approval event. Throws WorkflowRejectedError if rejected.
const approval = await this.waitForApproval(step, { timeout: "7 days",});const approval = await this.waitForApproval<{ approvedBy: string }>(step, { timeout: "7 days",});These methods are idempotent and will not repeat on retry. Use for state changes that must persist.
| Method | Description |
|---|---|
step.reportComplete(result?) | Report successful completion |
step.reportError(error) | Report an error |
step.sendEvent(event) | Send a custom event to the Agent |
step.updateAgentState(state) | Replace Agent state (broadcasts to clients) |
step.mergeAgentState(partial) | Merge into Agent state (broadcasts to clients) |
step.resetAgentState() | Reset Agent state to initialState |
type DefaultProgress = { step?: string; status?: "pending" | "running" | "complete" | "error"; message?: string; percent?: number; [key: string]: unknown;};Methods available on the Agent class for Workflow management.
Start a workflow instance and track it in the Agent database.
Parameters:
| Parameter | Type | Description |
|---|---|---|
workflowName | string | Workflow binding name from env |
params | object | Parameters to pass to the workflow |
options.id | string | Custom workflow ID (auto-generated if not provided) |
options.metadata | object | Metadata stored for querying (not passed to workflow) |
options.agentBinding | string | Agent binding name (auto-detected if not provided) |
Returns: Promise<string> - Workflow instance ID
const instanceId = await this.runWorkflow( "MY_WORKFLOW", { taskId: "123" }, { metadata: { userId: "user-456", priority: "high" }, },);const instanceId = await this.runWorkflow( "MY_WORKFLOW", { taskId: "123" }, { metadata: { userId: "user-456", priority: "high" }, },);Send an event to a running workflow.
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, { type: "custom-event", payload: { action: "proceed" },});await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, { type: "custom-event", payload: { action: "proceed" },});Get the status of a workflow and update the tracking record.
const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);// { status: 'running', output: null, error: null }const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);// { status: 'running', output: null, error: null }Get a tracked workflow by ID.
const workflow = this.getWorkflow(instanceId);// { instanceId, workflowName, status, metadata, error, createdAt, ... }const workflow = this.getWorkflow(instanceId);// { instanceId, workflowName, status, metadata, error, createdAt, ... }Query tracked workflows with cursor-based pagination. Returns a WorkflowPage with workflows, total count, and cursor for the next page.
// Get running workflows (default limit is 50, max is 100)const { workflows, total } = this.getWorkflows({ status: "running" });
// Filter by metadataconst { workflows: userWorkflows } = this.getWorkflows({ metadata: { userId: "user-456" },});
// Pagination with cursorconst page1 = this.getWorkflows({ status: ["complete", "errored"], limit: 20, orderBy: "desc",});
console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);
// Get next page using cursorif (page1.nextCursor) { const page2 = this.getWorkflows({ status: ["complete", "errored"], limit: 20, orderBy: "desc", cursor: page1.nextCursor, });}// Get running workflows (default limit is 50, max is 100)const { workflows, total } = this.getWorkflows({ status: "running" });
// Filter by metadataconst { workflows: userWorkflows } = this.getWorkflows({ metadata: { userId: "user-456" },});
// Pagination with cursorconst page1 = this.getWorkflows({ status: ["complete", "errored"], limit: 20, orderBy: "desc",});
console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);
// Get next page using cursorif (page1.nextCursor) { const page2 = this.getWorkflows({ status: ["complete", "errored"], limit: 20, orderBy: "desc", cursor: page1.nextCursor, });}The WorkflowPage type:
type WorkflowPage = { workflows: WorkflowInfo[]; total: number; // Total matching workflows nextCursor: string | null; // null when no more pages};Delete a single workflow instance tracking record. Returns true if deleted, false if not found.
Delete workflow instance tracking records matching criteria.
// Delete completed workflow instances older than 7 daysthis.deleteWorkflows({ status: "complete", createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),});
// Delete all errored and terminated workflowsthis.deleteWorkflows({ status: ["errored", "terminated"],});// Delete completed workflow instances older than 7 daysthis.deleteWorkflows({ status: "complete", createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),});
// Delete all errored and terminated workflowsthis.deleteWorkflows({ status: ["errored", "terminated"],});Terminate a running workflow immediately. Sets status to "terminated".
await this.terminateWorkflow(instanceId);await this.terminateWorkflow(instanceId);Pause a running workflow. The workflow can be resumed later with resumeWorkflow().
await this.pauseWorkflow(instanceId);await this.pauseWorkflow(instanceId);Resume a paused workflow.
await this.resumeWorkflow(instanceId);await this.resumeWorkflow(instanceId);Restart a workflow instance from the beginning with the same ID.
// Reset tracking (default) - clears timestamps and error fieldsawait this.restartWorkflow(instanceId);
// Preserve original timestampsawait this.restartWorkflow(instanceId, { resetTracking: false });// Reset tracking (default) - clears timestamps and error fieldsawait this.restartWorkflow(instanceId);
// Preserve original timestampsawait this.restartWorkflow(instanceId, { resetTracking: false });Approve a waiting workflow. Use with waitForApproval() in the workflow.
await this.approveWorkflow(instanceId, { reason: "Approved by admin", metadata: { approvedBy: userId },});await this.approveWorkflow(instanceId, { reason: "Approved by admin", metadata: { approvedBy: userId },});Reject a waiting workflow. Causes waitForApproval() to throw WorkflowRejectedError.
await this.rejectWorkflow(instanceId, { reason: "Request denied" });await this.rejectWorkflow(instanceId, { reason: "Request denied" });Migrate tracked workflows after renaming a workflow binding.
class MyAgent extends Agent { async onStart() { this.migrateWorkflowBinding("OLD_WORKFLOW", "NEW_WORKFLOW"); }}class MyAgent extends Agent { async onStart() { this.migrateWorkflowBinding("OLD_WORKFLOW", "NEW_WORKFLOW"); }}Override these methods in your Agent to handle workflow events:
| Callback | Parameters | Description |
|---|---|---|
onWorkflowProgress | workflowName, instanceId, progress | Called when workflow reports progress |
onWorkflowComplete | workflowName, instanceId, result? | Called when workflow completes |
onWorkflowError | workflowName, instanceId, error | Called when workflow errors |
onWorkflowEvent | workflowName, instanceId, event | Called when workflow sends an event |
onWorkflowCallback | callback: WorkflowCallback | Called for all callback types |
class MyAgent extends Agent { async onWorkflowProgress(workflowName, instanceId, progress) { this.broadcast( JSON.stringify({ type: "progress", workflowName, instanceId, progress }), ); }
async onWorkflowComplete(workflowName, instanceId, result) { console.log(`${workflowName}/${instanceId} completed`); }
async onWorkflowError(workflowName, instanceId, error) { console.error(`${workflowName}/${instanceId} failed:`, error); }}class MyAgent extends Agent { async onWorkflowProgress( workflowName: string, instanceId: string, progress: unknown, ) { this.broadcast( JSON.stringify({ type: "progress", workflowName, instanceId, progress }), ); }
async onWorkflowComplete( workflowName: string, instanceId: string, result?: unknown, ) { console.log(`${workflowName}/${instanceId} completed`); }
async onWorkflowError( workflowName: string, instanceId: string, error: string, ) { console.error(`${workflowName}/${instanceId} failed:`, error); }}Workflows started with runWorkflow() are automatically tracked in the Agent's internal database. You can query, filter, and manage workflows using the methods described above (getWorkflow(), getWorkflows(), deleteWorkflow(), etc.).
| Status | Description |
|---|---|
queued | Waiting to start |
running | Currently executing |
paused | Paused by user |
waiting | Waiting for event |
complete | Finished successfully |
errored | Failed with error |
terminated | Manually terminated |
Use the metadata option in runWorkflow() to store queryable information (like user IDs or task types) that you can filter on later with getWorkflows().
import { AgentWorkflow } from "agents/workflows";export class ApprovalWorkflow extends AgentWorkflow { async run(event, step) { const request = await step.do("prepare", async () => { return { ...event.payload, preparedAt: Date.now() }; });
await this.reportProgress({ step: "approval", status: "pending", message: "Awaiting approval", });
// Throws WorkflowRejectedError if rejected const approval = await this.waitForApproval(step, { timeout: "7 days", });
console.log("Approved by:", approval?.approvedBy);
const result = await step.do("execute", async () => { return executeRequest(request); });
await step.reportComplete(result); return result; }}
class MyAgent extends Agent { async handleApproval(instanceId, userId) { await this.approveWorkflow(instanceId, { reason: "Approved by admin", metadata: { approvedBy: userId }, }); }
async handleRejection(instanceId, reason) { await this.rejectWorkflow(instanceId, { reason }); }}import { AgentWorkflow } from "agents/workflows";import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
export class ApprovalWorkflow extends AgentWorkflow<MyAgent, RequestParams> { async run(event: AgentWorkflowEvent<RequestParams>, step: AgentWorkflowStep) { const request = await step.do("prepare", async () => { return { ...event.payload, preparedAt: Date.now() }; });
await this.reportProgress({ step: "approval", status: "pending", message: "Awaiting approval", });
// Throws WorkflowRejectedError if rejected const approval = await this.waitForApproval<{ approvedBy: string }>(step, { timeout: "7 days", });
console.log("Approved by:", approval?.approvedBy);
const result = await step.do("execute", async () => { return executeRequest(request); });
await step.reportComplete(result); return result; }}
class MyAgent extends Agent { async handleApproval(instanceId: string, userId: string) { await this.approveWorkflow(instanceId, { reason: "Approved by admin", metadata: { approvedBy: userId }, }); }
async handleRejection(instanceId: string, reason: string) { await this.rejectWorkflow(instanceId, { reason }); }}import { AgentWorkflow } from "agents/workflows";export class ResilientWorkflow extends AgentWorkflow { async run(event, step) { const result = await step.do( "call-api", { retries: { limit: 5, delay: "10 seconds", backoff: "exponential" }, timeout: "5 minutes", }, async () => { const response = await fetch("https://api.example.com/process", { method: "POST", body: JSON.stringify(event.payload), }); if (!response.ok) throw new Error(`API error: ${response.status}`); return response.json(); }, );
await step.reportComplete(result); return result; }}import { AgentWorkflow } from "agents/workflows";import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
export class ResilientWorkflow extends AgentWorkflow<MyAgent, TaskParams> { async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) { const result = await step.do( "call-api", { retries: { limit: 5, delay: "10 seconds", backoff: "exponential" }, timeout: "5 minutes", }, async () => { const response = await fetch("https://api.example.com/process", { method: "POST", body: JSON.stringify(event.payload), }); if (!response.ok) throw new Error(`API error: ${response.status}`); return response.json(); }, );
await step.reportComplete(result); return result; }}Workflows can update Agent state durably via step, which automatically broadcasts to all connected clients:
import { AgentWorkflow } from "agents/workflows";export class StatefulWorkflow extends AgentWorkflow { async run(event, step) { // Replace entire state (durable, broadcasts to clients) await step.updateAgentState({ currentTask: { id: event.payload.taskId, status: "processing", startedAt: Date.now(), }, });
const result = await step.do("process", async () => processTask(event.payload), );
// Merge partial state (durable, keeps existing fields) await step.mergeAgentState({ currentTask: { status: "complete", result, completedAt: Date.now() }, });
await step.reportComplete(result); return result; }}import { AgentWorkflow } from "agents/workflows";import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
export class StatefulWorkflow extends AgentWorkflow<MyAgent, TaskParams> { async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) { // Replace entire state (durable, broadcasts to clients) await step.updateAgentState({ currentTask: { id: event.payload.taskId, status: "processing", startedAt: Date.now(), }, });
const result = await step.do("process", async () => processTask(event.payload), );
// Merge partial state (durable, keeps existing fields) await step.mergeAgentState({ currentTask: { status: "complete", result, completedAt: Date.now() }, });
await step.reportComplete(result); return result; }}Define custom progress types for domain-specific reporting:
import { AgentWorkflow } from "agents/workflows";// Custom progress type for data pipeline// Workflow with custom progress type (3rd type parameter)export class ETLWorkflow extends AgentWorkflow { async run(event, step) { await this.reportProgress({ stage: "extract", recordsProcessed: 0, totalRecords: 1000, currentTable: "users", });
// ... processing }}
// Agent receives typed progressclass MyAgent extends Agent { async onWorkflowProgress(workflowName, instanceId, progress) { const p = progress; console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`); }}import { AgentWorkflow } from "agents/workflows";import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
// Custom progress type for data pipelinetype PipelineProgress = { stage: "extract" | "transform" | "load"; recordsProcessed: number; totalRecords: number; currentTable?: string;};
// Workflow with custom progress type (3rd type parameter)export class ETLWorkflow extends AgentWorkflow< MyAgent, ETLParams, PipelineProgress> { async run(event: AgentWorkflowEvent<ETLParams>, step: AgentWorkflowStep) { await this.reportProgress({ stage: "extract", recordsProcessed: 0, totalRecords: 1000, currentTable: "users", });
// ... processing }}
// Agent receives typed progressclass MyAgent extends Agent { async onWorkflowProgress( workflowName: string, instanceId: string, progress: unknown, ) { const p = progress as PipelineProgress; console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`); }}The internal cf_agents_workflows table can grow unbounded, so implement a retention policy:
class MyAgent extends Agent { // Option 1: Delete on completion async onWorkflowComplete(workflowName, instanceId, result) { // Process result first, then delete this.deleteWorkflow(instanceId); }
// Option 2: Scheduled cleanup (keep recent history) async cleanupOldWorkflows() { this.deleteWorkflows({ status: ["complete", "errored"], createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), }); }
// Option 3: Keep all history for compliance/auditing // Don't call deleteWorkflows() - query historical data as needed}class MyAgent extends Agent { // Option 1: Delete on completion async onWorkflowComplete( workflowName: string, instanceId: string, result?: unknown, ) { // Process result first, then delete this.deleteWorkflow(instanceId); }
// Option 2: Scheduled cleanup (keep recent history) async cleanupOldWorkflows() { this.deleteWorkflows({ status: ["complete", "errored"], createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), }); }
// Option 3: Keep all history for compliance/auditing // Don't call deleteWorkflows() - query historical data as needed}// Direct RPC call (typed)await this.agent.updateTaskStatus(taskId, "processing");const data = await this.agent.getData(taskId);
// Non-durable callbacks (may repeat on retry, use for frequent updates)await this.reportProgress({ step: "process", percent: 0.5 });this.broadcastToClients({ type: "update", data });
// Durable callbacks via step (idempotent, won't repeat on retry)await step.reportComplete(result);await step.reportError("Something went wrong");await step.sendEvent({ type: "custom", data: {} });
// Durable state synchronization via step (broadcasts to clients)await step.updateAgentState({ status: "processing" });await step.mergeAgentState({ progress: 0.5 });// Direct RPC call (typed)await this.agent.updateTaskStatus(taskId, "processing");const data = await this.agent.getData(taskId);
// Non-durable callbacks (may repeat on retry, use for frequent updates)await this.reportProgress({ step: "process", percent: 0.5 });this.broadcastToClients({ type: "update", data });
// Durable callbacks via step (idempotent, won't repeat on retry)await step.reportComplete(result);await step.reportError("Something went wrong");await step.sendEvent({ type: "custom", data: {} });
// Durable state synchronization via step (broadcasts to clients)await step.updateAgentState({ status: "processing" });await step.mergeAgentState({ progress: 0.5 });// Send event to waiting workflowawait this.sendWorkflowEvent("MY_WORKFLOW", instanceId, { type: "custom-event", payload: { action: "proceed" },});
// Approve/reject workflows using convenience methodsawait this.approveWorkflow(instanceId, { reason: "Approved by admin", metadata: { approvedBy: userId },});
await this.rejectWorkflow(instanceId, { reason: "Request denied" });// Send event to waiting workflowawait this.sendWorkflowEvent("MY_WORKFLOW", instanceId, { type: "custom-event", payload: { action: "proceed" },});
// Approve/reject workflows using convenience methodsawait this.approveWorkflow(instanceId, { reason: "Approved by admin", metadata: { approvedBy: userId },});
await this.rejectWorkflow(instanceId, { reason: "Request denied" });- Keep workflows focused — One workflow per logical task
- Use meaningful step names — Helps with debugging and observability
- Report progress regularly — Keeps users informed
- Handle errors gracefully — Use
reportError()before throwing - Clean up completed workflows — Implement a retention policy for the tracking table
- Handle workflow binding renames — Use
migrateWorkflowBinding()when renaming workflow bindings inwrangler.jsonc
| Constraint | Limit |
|---|---|
| Maximum steps | 1,024 per workflow |
| State size | 10 MB per workflow |
| Event wait time | 1 year maximum |
| Step execution time | 30 minutes per step |
Workflows cannot open WebSocket connections directly. Use broadcastToClients() to communicate with connected clients through the Agent.