Skip to content
Cloudflare Docs

Run Workflows

Integrate Cloudflare Workflows with Agents for durable, multi-step background processing while Agents handle real-time communication.

Quick start

1. Define a Workflow

Extend AgentWorkflow for typed access to the originating Agent:

JavaScript
import { AgentWorkflow } from "agents/workflows";
export class ProcessingWorkflow extends AgentWorkflow {
async run(event, step) {
const params = event.payload;
const result = await step.do("process-data", async () => {
return processData(params.data);
});
// Non-durable: progress reporting (may repeat on retry)
await this.reportProgress({
step: "process",
status: "complete",
percent: 0.5,
});
// Broadcast to connected WebSocket clients
this.broadcastToClients({ type: "update", taskId: params.taskId });
await step.do("save-results", async () => {
// Call Agent methods via RPC
await this.agent.saveResult(params.taskId, result);
});
// Durable: idempotent, won't repeat on retry
await step.reportComplete(result);
return result;
}
}

2. Start a Workflow from an Agent

Use runWorkflow() to start and track workflows:

JavaScript
import { Agent } from "agents";
export class MyAgent extends Agent {
async startTask(taskId, data) {
const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", {
taskId,
data,
});
return { instanceId };
}
async onWorkflowProgress(workflowName, instanceId, progress) {
this.broadcast(JSON.stringify({ type: "workflow-progress", progress }));
}
async onWorkflowComplete(workflowName, instanceId, result) {
console.log(`Workflow completed:`, result);
}
async saveResult(taskId, result) {
this
.sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`;
}
}

3. Configure Wrangler

{
"name": "my-app",
"main": "src/index.ts",
"compatibility_date": "2025-02-11",
"durable_objects": {
"bindings": [{ "name": "MY_AGENT", "class_name": "MyAgent" }],
},
"workflows": [
{
"name": "processing-workflow",
"binding": "PROCESSING_WORKFLOW",
"class_name": "ProcessingWorkflow",
},
],
"migrations": [{ "tag": "v1", "new_sqlite_classes": ["MyAgent"] }],
}

AgentWorkflow class

Base class for Workflows that integrate with Agents.

Type parameters

ParameterDescription
AgentTypeThe Agent class type for typed RPC
ParamsParameters passed to the workflow
ProgressTypeType for progress reporting (defaults to DefaultProgress)
EnvEnvironment type (defaults to Cloudflare.Env)

Properties

PropertyTypeDescription
agentStubTyped stub for calling Agent methods
instanceIdstringThe workflow instance ID
workflowNamestringThe workflow binding name
envEnvEnvironment bindings

Instance methods (non-durable)

These methods may repeat on retry. Use for lightweight, frequent updates.

reportProgress(progress)

Report progress to the Agent. Triggers onWorkflowProgress callback.

JavaScript
await this.reportProgress({
step: "processing",
status: "running",
percent: 0.5,
});

broadcastToClients(message)

Broadcast a message to all WebSocket clients connected to the Agent.

JavaScript
this.broadcastToClients({ type: "update", data: result });

waitForApproval(step, options?)

Wait for an approval event. Throws WorkflowRejectedError if rejected.

JavaScript
const approval = await this.waitForApproval(step, {
timeout: "7 days",
});

Step methods (durable)

These methods are idempotent and will not repeat on retry. Use for state changes that must persist.

MethodDescription
step.reportComplete(result?)Report successful completion
step.reportError(error)Report an error
step.sendEvent(event)Send a custom event to the Agent
step.updateAgentState(state)Replace Agent state (broadcasts to clients)
step.mergeAgentState(partial)Merge into Agent state (broadcasts to clients)
step.resetAgentState()Reset Agent state to initialState

DefaultProgress type

TypeScript
type DefaultProgress = {
step?: string;
status?: "pending" | "running" | "complete" | "error";
message?: string;
percent?: number;
[key: string]: unknown;
};

Agent workflow methods

Methods available on the Agent class for Workflow management.

runWorkflow(workflowName, params, options?)

Start a workflow instance and track it in the Agent database.

Parameters:

ParameterTypeDescription
workflowNamestringWorkflow binding name from env
paramsobjectParameters to pass to the workflow
options.idstringCustom workflow ID (auto-generated if not provided)
options.metadataobjectMetadata stored for querying (not passed to workflow)
options.agentBindingstringAgent binding name (auto-detected if not provided)

Returns: Promise<string> - Workflow instance ID

JavaScript
const instanceId = await this.runWorkflow(
"MY_WORKFLOW",
{ taskId: "123" },
{
metadata: { userId: "user-456", priority: "high" },
},
);

sendWorkflowEvent(workflowName, instanceId, event)

Send an event to a running workflow.

JavaScript
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
type: "custom-event",
payload: { action: "proceed" },
});

getWorkflowStatus(workflowName, instanceId)

Get the status of a workflow and update the tracking record.

JavaScript
const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);
// { status: 'running', output: null, error: null }

getWorkflow(instanceId)

Get a tracked workflow by ID.

JavaScript
const workflow = this.getWorkflow(instanceId);
// { instanceId, workflowName, status, metadata, error, createdAt, ... }

getWorkflows(criteria?)

Query tracked workflows with cursor-based pagination. Returns a WorkflowPage with workflows, total count, and cursor for the next page.

JavaScript
// Get running workflows (default limit is 50, max is 100)
const { workflows, total } = this.getWorkflows({ status: "running" });
// Filter by metadata
const { workflows: userWorkflows } = this.getWorkflows({
metadata: { userId: "user-456" },
});
// Pagination with cursor
const page1 = this.getWorkflows({
status: ["complete", "errored"],
limit: 20,
orderBy: "desc",
});
console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);
// Get next page using cursor
if (page1.nextCursor) {
const page2 = this.getWorkflows({
status: ["complete", "errored"],
limit: 20,
orderBy: "desc",
cursor: page1.nextCursor,
});
}

The WorkflowPage type:

TypeScript
type WorkflowPage = {
workflows: WorkflowInfo[];
total: number; // Total matching workflows
nextCursor: string | null; // null when no more pages
};

deleteWorkflow(instanceId)

Delete a single workflow instance tracking record. Returns true if deleted, false if not found.

deleteWorkflows(criteria?)

Delete workflow instance tracking records matching criteria.

JavaScript
// Delete completed workflow instances older than 7 days
this.deleteWorkflows({
status: "complete",
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
});
// Delete all errored and terminated workflows
this.deleteWorkflows({
status: ["errored", "terminated"],
});

terminateWorkflow(instanceId)

Terminate a running workflow immediately. Sets status to "terminated".

JavaScript
await this.terminateWorkflow(instanceId);

pauseWorkflow(instanceId)

Pause a running workflow. The workflow can be resumed later with resumeWorkflow().

JavaScript
await this.pauseWorkflow(instanceId);

resumeWorkflow(instanceId)

Resume a paused workflow.

JavaScript
await this.resumeWorkflow(instanceId);

restartWorkflow(instanceId, options?)

Restart a workflow instance from the beginning with the same ID.

JavaScript
// Reset tracking (default) - clears timestamps and error fields
await this.restartWorkflow(instanceId);
// Preserve original timestamps
await this.restartWorkflow(instanceId, { resetTracking: false });

approveWorkflow(instanceId, options?)

Approve a waiting workflow. Use with waitForApproval() in the workflow.

JavaScript
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});

rejectWorkflow(instanceId, options?)

Reject a waiting workflow. Causes waitForApproval() to throw WorkflowRejectedError.

JavaScript
await this.rejectWorkflow(instanceId, { reason: "Request denied" });

migrateWorkflowBinding(oldName, newName)

Migrate tracked workflows after renaming a workflow binding.

JavaScript
class MyAgent extends Agent {
async onStart() {
this.migrateWorkflowBinding("OLD_WORKFLOW", "NEW_WORKFLOW");
}
}

Lifecycle callbacks

Override these methods in your Agent to handle workflow events:

CallbackParametersDescription
onWorkflowProgressworkflowName, instanceId, progressCalled when workflow reports progress
onWorkflowCompleteworkflowName, instanceId, result?Called when workflow completes
onWorkflowErrorworkflowName, instanceId, errorCalled when workflow errors
onWorkflowEventworkflowName, instanceId, eventCalled when workflow sends an event
onWorkflowCallbackcallback: WorkflowCallbackCalled for all callback types
JavaScript
class MyAgent extends Agent {
async onWorkflowProgress(workflowName, instanceId, progress) {
this.broadcast(
JSON.stringify({ type: "progress", workflowName, instanceId, progress }),
);
}
async onWorkflowComplete(workflowName, instanceId, result) {
console.log(`${workflowName}/${instanceId} completed`);
}
async onWorkflowError(workflowName, instanceId, error) {
console.error(`${workflowName}/${instanceId} failed:`, error);
}
}

Workflow tracking

Workflows started with runWorkflow() are automatically tracked in the Agent's internal database. You can query, filter, and manage workflows using the methods described above (getWorkflow(), getWorkflows(), deleteWorkflow(), etc.).

Status values

StatusDescription
queuedWaiting to start
runningCurrently executing
pausedPaused by user
waitingWaiting for event
completeFinished successfully
erroredFailed with error
terminatedManually terminated

Use the metadata option in runWorkflow() to store queryable information (like user IDs or task types) that you can filter on later with getWorkflows().

Examples

Human-in-the-loop approval

JavaScript
import { AgentWorkflow } from "agents/workflows";
export class ApprovalWorkflow extends AgentWorkflow {
async run(event, step) {
const request = await step.do("prepare", async () => {
return { ...event.payload, preparedAt: Date.now() };
});
await this.reportProgress({
step: "approval",
status: "pending",
message: "Awaiting approval",
});
// Throws WorkflowRejectedError if rejected
const approval = await this.waitForApproval(step, {
timeout: "7 days",
});
console.log("Approved by:", approval?.approvedBy);
const result = await step.do("execute", async () => {
return executeRequest(request);
});
await step.reportComplete(result);
return result;
}
}
class MyAgent extends Agent {
async handleApproval(instanceId, userId) {
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});
}
async handleRejection(instanceId, reason) {
await this.rejectWorkflow(instanceId, { reason });
}
}

Retry with backoff

JavaScript
import { AgentWorkflow } from "agents/workflows";
export class ResilientWorkflow extends AgentWorkflow {
async run(event, step) {
const result = await step.do(
"call-api",
{
retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },
timeout: "5 minutes",
},
async () => {
const response = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(event.payload),
});
if (!response.ok) throw new Error(`API error: ${response.status}`);
return response.json();
},
);
await step.reportComplete(result);
return result;
}
}

State synchronization

Workflows can update Agent state durably via step, which automatically broadcasts to all connected clients:

JavaScript
import { AgentWorkflow } from "agents/workflows";
export class StatefulWorkflow extends AgentWorkflow {
async run(event, step) {
// Replace entire state (durable, broadcasts to clients)
await step.updateAgentState({
currentTask: {
id: event.payload.taskId,
status: "processing",
startedAt: Date.now(),
},
});
const result = await step.do("process", async () =>
processTask(event.payload),
);
// Merge partial state (durable, keeps existing fields)
await step.mergeAgentState({
currentTask: { status: "complete", result, completedAt: Date.now() },
});
await step.reportComplete(result);
return result;
}
}

Custom progress types

Define custom progress types for domain-specific reporting:

JavaScript
import { AgentWorkflow } from "agents/workflows";
// Custom progress type for data pipeline
// Workflow with custom progress type (3rd type parameter)
export class ETLWorkflow extends AgentWorkflow {
async run(event, step) {
await this.reportProgress({
stage: "extract",
recordsProcessed: 0,
totalRecords: 1000,
currentTable: "users",
});
// ... processing
}
}
// Agent receives typed progress
class MyAgent extends Agent {
async onWorkflowProgress(workflowName, instanceId, progress) {
const p = progress;
console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`);
}
}

Cleanup strategy

The internal cf_agents_workflows table can grow unbounded, so implement a retention policy:

JavaScript
class MyAgent extends Agent {
// Option 1: Delete on completion
async onWorkflowComplete(workflowName, instanceId, result) {
// Process result first, then delete
this.deleteWorkflow(instanceId);
}
// Option 2: Scheduled cleanup (keep recent history)
async cleanupOldWorkflows() {
this.deleteWorkflows({
status: ["complete", "errored"],
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
});
}
// Option 3: Keep all history for compliance/auditing
// Don't call deleteWorkflows() - query historical data as needed
}

Bidirectional communication

Workflow to Agent

JavaScript
// Direct RPC call (typed)
await this.agent.updateTaskStatus(taskId, "processing");
const data = await this.agent.getData(taskId);
// Non-durable callbacks (may repeat on retry, use for frequent updates)
await this.reportProgress({ step: "process", percent: 0.5 });
this.broadcastToClients({ type: "update", data });
// Durable callbacks via step (idempotent, won't repeat on retry)
await step.reportComplete(result);
await step.reportError("Something went wrong");
await step.sendEvent({ type: "custom", data: {} });
// Durable state synchronization via step (broadcasts to clients)
await step.updateAgentState({ status: "processing" });
await step.mergeAgentState({ progress: 0.5 });

Agent to Workflow

JavaScript
// Send event to waiting workflow
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
type: "custom-event",
payload: { action: "proceed" },
});
// Approve/reject workflows using convenience methods
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});
await this.rejectWorkflow(instanceId, { reason: "Request denied" });

Best practices

  1. Keep workflows focused — One workflow per logical task
  2. Use meaningful step names — Helps with debugging and observability
  3. Report progress regularly — Keeps users informed
  4. Handle errors gracefully — Use reportError() before throwing
  5. Clean up completed workflows — Implement a retention policy for the tracking table
  6. Handle workflow binding renames — Use migrateWorkflowBinding() when renaming workflow bindings in wrangler.jsonc

Limitations

ConstraintLimit
Maximum steps1,024 per workflow
State size10 MB per workflow
Event wait time1 year maximum
Step execution time30 minutes per step

Workflows cannot open WebSocket connections directly. Use broadcastToClients() to communicate with connected clients through the Agent.