Adding Workflows
How to add Temporal workflows and activities with determinism constraints.
Temporal workflows provide durable, fault-tolerant execution for long-running operations. This guide covers how to add new workflows, activities, and schedules to the worker.
Workflow vs Activity
| Concern | Workflows | Activities |
|---|---|---|
| Purpose | Orchestration logic | Side effects |
| Determinism | Required | Not required |
| I/O | Forbidden | Allowed |
| Replay safe | Yes | N/A |
| Location | apps/worker/src/workflows.ts | apps/worker/src/activities.ts |
Adding an Activity
Activities are normal async functions registered with the worker. They handle all side effects: database queries, API calls, file I/O. All activities are exported as methods on a single activities object:
// apps/worker/src/activities.ts
import { createLogger } from '@tx-agent-kit/logging'
import { domainEventsRepository } from '@tx-agent-kit/db'
import { Effect } from 'effect'
const logger = createLogger('tx-agent-kit-worker-activities')
export const activities = {
fetchUnprocessedEvents: async (batchSize: number): Promise<ReadonlyArray<SerializedDomainEvent>> => {
const events = await runEffect(
domainEventsRepository.fetchUnprocessed(batchSize)
)
// Serialize dates and payload for Temporal's JSON transport
return events.map((event) => ({
id: event.id,
eventType: event.eventType,
aggregateType: event.aggregateType,
aggregateId: event.aggregateId,
payload: toJsonRecord(event.payload),
correlationId: event.correlationId,
sequenceNumber: event.sequenceNumber,
status: event.status,
occurredAt: event.occurredAt.toISOString(),
processingAt: event.processingAt ? event.processingAt.toISOString() : null,
publishedAt: event.publishedAt ? event.publishedAt.toISOString() : null,
failedAt: event.failedAt ? event.failedAt.toISOString() : null,
failureReason: event.failureReason,
createdAt: event.createdAt.toISOString()
}))
},
markEventsPublished: async (eventIds: ReadonlyArray<string>): Promise<void> => {
await runEffect(domainEventsRepository.markPublished(eventIds))
},
markEventFailed: async (eventId: string, reason: string): Promise<void> => {
await runEffect(domainEventsRepository.markFailed(eventId, reason))
},
sendOrganizationWelcomeEmail: async (payload: {
organizationName: string
ownerUserId: string
ownerEmail: string
}): Promise<void> => {
// Side-effect: send email via external API
}
}Activities should be idempotent when possible. If an activity is retried due to a worker crash, it should produce the same result or detect the duplicate.
The SerializedDomainEvent interface defines the wire format that activities return and workflows consume:
// apps/worker/src/activities.ts
export interface SerializedDomainEvent {
id: string
eventType: string
aggregateType: string
aggregateId: string
payload: Record<string, unknown>
correlationId: string | null
sequenceNumber: number
status: string
occurredAt: string
processingAt: string | null
publishedAt: string | null
failedAt: string | null
failureReason: string | null
createdAt: string
}Adding a Workflow
Workflows orchestrate activities and must be fully deterministic. Use proxyActivities to create typed activity stubs with a shared retry policy:
// apps/worker/src/workflows.ts
import { WorkflowExecutionAlreadyStartedError } from '@temporalio/common'
import {
ParentClosePolicy,
startChild,
proxyActivities
} from '@temporalio/workflow'
import type { activities, SerializedDomainEvent } from './activities.js'
const {
fetchUnprocessedEvents,
markEventsPublished,
markEventFailed,
sendOrganizationWelcomeEmail
} = proxyActivities<typeof activities>({
startToCloseTimeout: '30 seconds',
retry: {
maximumAttempts: 3,
initialInterval: '1 second'
}
})All activity stubs share the same startToCloseTimeout and retry config. Individual activities can override these by creating a separate proxyActivities call with different options.
The Outbox Poller Pattern
The primary workflow pattern in the codebase is the outbox poller. A scheduled workflow fetches unprocessed domain events, validates each event's payload, dispatches a child workflow per event type, and marks dispatched events as published:
// apps/worker/src/workflows.ts
export async function outboxPollerWorkflow(batchSize: number): Promise<void> {
const events = await fetchUnprocessedEvents(batchSize)
if (events.length === 0) {
return
}
const dispatched: string[] = []
for (const event of events) {
try {
switch (event.eventType) {
case 'organization.created': {
// Validate payload before dispatching
const hasValidPayload =
typeof event.payload.organizationName === 'string'
&& typeof event.payload.ownerUserId === 'string'
&& typeof event.payload.ownerEmail === 'string'
if (!hasValidPayload) {
await markEventFailed(event.id, `Invalid organization.created payload for event ${event.id}`)
break
}
await startChild(organizationCreatedWorkflow, {
workflowId: `organization-created-${event.id}`,
args: [event],
parentClosePolicy: ParentClosePolicy.ABANDON,
workflowIdReusePolicy: 'REJECT_DUPLICATE',
workflowRunTimeout: '5 minutes'
})
dispatched.push(event.id)
break
}
default:
await markEventFailed(event.id, `Unknown event type '${event.eventType}'`)
break
}
} catch (err: unknown) {
if (err instanceof WorkflowExecutionAlreadyStartedError) {
// Already dispatched — treat as success
dispatched.push(event.id)
} else {
const message = err instanceof Error ? err.message : String(err)
await markEventFailed(event.id, `Failed to dispatch: ${message}`)
}
}
}
if (dispatched.length > 0) {
await markEventsPublished(dispatched)
}
}Key design decisions in the outbox poller:
startChilddispatches each event as an independent child workflow. TheParentClosePolicy.ABANDONoption keeps the child running even if the poller completes.workflowIdReusePolicy: 'REJECT_DUPLICATE'provides idempotency. If the same event ID has already been dispatched, Temporal throwsWorkflowExecutionAlreadyStartedError, which the poller catches and treats as success.- Payload validation happens before dispatch. Invalid payloads are marked as failed immediately, preventing child workflow failures.
- Unknown event types are marked as failed rather than silently dropped.
Domain Event Workflows
Event-driven child workflows receive a SerializedDomainEvent payload. The workflow should validate the payload fields it needs before processing:
// apps/worker/src/workflows.ts
export async function organizationCreatedWorkflow(
event: SerializedDomainEvent
): Promise<void> {
const organizationName = typeof event.payload.organizationName === 'string'
? event.payload.organizationName
: undefined
const ownerUserId = typeof event.payload.ownerUserId === 'string'
? event.payload.ownerUserId
: undefined
const ownerEmail = typeof event.payload.ownerEmail === 'string'
? event.payload.ownerEmail
: undefined
if (!organizationName || !ownerUserId) {
throw new Error(
`Invalid organization.created payload for event ${event.id}: missing organizationName or ownerUserId`
)
}
await sendOrganizationWelcomeEmail({
organizationName,
ownerUserId,
ownerEmail: ownerEmail ?? ''
})
}When adding a new domain event type:
- Register the event type string in
packages/contracts/src/literals.ts(domainEventTypes). - Add a typed
*EventPayloadinterface in the domain'sdomain/*-events.ts. - Add a matching
*EventPayloadSchemainpackages/temporal-client/src/types/domain-event.ts. - Add a
casebranch inoutboxPollerWorkflowto dispatch the new child workflow. - Implement the child workflow function that validates and processes the event.
Maintenance Workflows
The worker also includes maintenance workflows for outbox hygiene:
export async function resetStuckEventsWorkflow(
stuckThresholdMinutes: number
): Promise<ReadonlyArray<string>> {
return resetStuckProcessingEvents(stuckThresholdMinutes)
}
export async function prunePublishedEventsWorkflow(
retentionDays: number
): Promise<number> {
return prunePublishedEvents(retentionDays)
}These are thin wrappers around activities that run on a schedule to keep the outbox table healthy.
Determinism Rules
The following are strictly forbidden inside workflow code:
// FORBIDDEN in workflows:
Date.now() // Use workflow.now()
new Date() // Use workflow.now()
Math.random() // Use deterministic alternatives
setTimeout(fn, ms) // Use workflow.sleep(ms)
setInterval(fn, ms) // Not applicable
clearTimeout(id) // Not applicable
clearInterval(id) // Not applicable
import { db } from '...' // No infrastructure imports
import { fetch } from '...' // No I/O importsThese constraints exist because Temporal replays workflow code from event history. Any non-deterministic operation produces different results on replay, causing the workflow to fail.
Safe Alternatives
| Forbidden | Use Instead |
|---|---|
Date.now() | Temporal's workflow.now() |
Math.random() | Pass random values as activity results |
setTimeout | workflow.sleep('5s') |
| Side effects | Move to activities |
Registering with the Worker
Activities are passed to the worker at creation time:
const worker = await Worker.create({
connection,
namespace: env.TEMPORAL_NAMESPACE,
taskQueue: env.TEMPORAL_TASK_QUEUE,
workflowsPath: workflowSourcePath,
activities,
shutdownGraceTime: '30s'
})New workflows are automatically discovered from the workflowsPath file. Any exported async function in workflows.ts becomes a registered workflow type.
Adding a Schedule
Temporal schedules run workflows on a recurring interval. The worker registers all schedules at startup via ensureSchedule functions defined in apps/worker/src/schedules.ts.
Schedule Pattern
Each schedule follows a describe/update-or-create pattern with ScheduleOverlapPolicy.SKIP to prevent overlapping executions:
// apps/worker/src/schedules.ts
import type { Client } from '@temporalio/client'
import { isGrpcServiceError, ScheduleOverlapPolicy } from '@temporalio/client'
import { createLogger } from '@tx-agent-kit/logging'
const logger = createLogger('tx-agent-kit-worker-schedules')
const GRPC_STATUS_NOT_FOUND = 5
const GRPC_STATUS_ALREADY_EXISTS = 6
export async function ensureOutboxPollerSchedule(
client: Client,
taskQueue: string,
intervalSeconds: number,
batchSize: number
): Promise<void> {
const handle = client.schedule.getHandle('outbox-poller-schedule')
try {
// Try to describe the existing schedule, then update it
await handle.describe()
await handle.update((prev) => ({
...prev,
spec: {
intervals: [{ every: `${intervalSeconds}s` }]
},
action: {
type: 'startWorkflow' as const,
workflowType: 'outboxPollerWorkflow',
taskQueue,
args: [batchSize]
},
policies: {
...prev.policies,
overlap: ScheduleOverlapPolicy.SKIP
}
}))
logger.info('Updated outbox poller schedule.', { intervalSeconds, batchSize })
} catch (error: unknown) {
const isNotFound =
isGrpcServiceError(error) && Number(error.code) === GRPC_STATUS_NOT_FOUND
if (!isNotFound) {
throw error
}
// Schedule does not exist yet — create it
try {
await client.schedule.create({
scheduleId: 'outbox-poller-schedule',
spec: {
intervals: [{ every: `${intervalSeconds}s` }]
},
action: {
type: 'startWorkflow',
workflowType: 'outboxPollerWorkflow',
taskQueue,
args: [batchSize]
},
policies: {
overlap: ScheduleOverlapPolicy.SKIP
}
})
logger.info('Created outbox poller schedule.', { intervalSeconds, batchSize })
} catch (createError: unknown) {
// Another worker instance may have created it concurrently
const isAlreadyExists =
isGrpcServiceError(createError) && Number(createError.code) === GRPC_STATUS_ALREADY_EXISTS
if (!isAlreadyExists) {
throw createError
}
logger.info('Outbox poller schedule already created by another worker instance.')
}
}
}The describe/update-or-create flow handles three scenarios:
- Schedule exists -- update its spec and action to match the current config.
- Schedule does not exist -- create it from scratch.
- Race condition -- another worker instance created it between our check and create. The
ALREADY_EXISTSgRPC error is caught and treated as success.
Registering Schedules at Startup
Schedules are registered in the worker's run function after the worker is created but before worker.run() blocks:
// apps/worker/src/index.ts
await ensureOutboxPollerSchedule(temporalClient, env.TEMPORAL_TASK_QUEUE, 5, env.OUTBOX_POLL_BATCH_SIZE)
await ensureStuckEventsResetSchedule(temporalClient, env.TEMPORAL_TASK_QUEUE, 120, env.OUTBOX_STUCK_THRESHOLD_MINUTES)
await ensurePrunePublishedSchedule(temporalClient, env.TEMPORAL_TASK_QUEUE, 24, env.OUTBOX_PRUNE_RETENTION_DAYS)
await ensureDataRetentionSchedule(temporalClient, env.TEMPORAL_TASK_QUEUE, 24)Current Schedules
| Schedule | Interval | Workflow | Purpose |
|---|---|---|---|
outbox-poller-schedule | 5s | outboxPollerWorkflow | Fetch and dispatch pending domain events |
stuck-events-reset-schedule | 120s | resetStuckEventsWorkflow | Reset events stuck in processing state |
prune-published-events-schedule | 24h | prunePublishedEventsWorkflow | Delete old published/failed events |
data-retention-schedule | 24h | dataRetentionWorkflow | General data retention housekeeping |
Adding a New Schedule
- Define the
ensureYourSchedulefunction inapps/worker/src/schedules.tsfollowing the describe/update-or-create pattern. - Use
ScheduleOverlapPolicy.SKIPso that a slow execution does not pile up. - Call your
ensureYourSchedulefunction fromapps/worker/src/index.tsduring startup. - Add any new env vars for interval/threshold to
apps/worker/src/config/env.ts.
Integration Testing
Outbox Lifecycle Testing
The domain events outbox integration suite validates the full event lifecycle. Tests use createDbAuthContext to wire up a real database context and exercise the outbox state machine:
// packages/testkit/src/domain-events-outbox.integration.test.ts
it('supports full event lifecycle: pending -> processing -> published', async () => {
const user = await dbAuthContext.createUser({
email: 'lifecycle@example.com',
password: 'strong-pass-12345',
name: 'Lifecycle Test User'
})
const org = await dbAuthContext.createTeam({
token: user.token,
name: 'Lifecycle Org'
})
// Verify event was created with pending status
const events = await dbAuthContext.testContext.withSchemaClient(async (client) =>
queryDomainEventsByAggregate(client, 'organization', org.id)
)
expect(events).toHaveLength(1)
expect(events[0]!.status).toBe('pending')
// Claim events for processing (simulates outbox poller fetch)
await dbAuthContext.testContext.withSchemaClient(async (client) =>
claimPendingEventsForProcessing(client, 1)
)
const afterClaim = await dbAuthContext.testContext.withSchemaClient(async (client) =>
queryDomainEventById(client, events[0]!.id)
)
expect(afterClaim!.status).toBe('processing')
// Mark as published (simulates successful child workflow dispatch)
await dbAuthContext.testContext.withSchemaClient(async (client) =>
markProcessingEventsPublished(client, [events[0]!.id])
)
const afterPublish = await dbAuthContext.testContext.withSchemaClient(async (client) =>
queryDomainEventById(client, events[0]!.id)
)
expect(afterPublish!.status).toBe('published')
})Key Test Scenarios
The outbox integration suite covers:
- Atomicity -- exactly one event per aggregate creation.
- Lifecycle transitions --
pendingtoprocessingtopublished(andfailed). - State guards --
markPublishedonly affectsprocessingevents; calling it onpendingis a no-op. - Idempotency -- second
markPublishedon an already-published event is a no-op. - Concurrency -- two concurrent pollers cannot claim the same event (
FOR UPDATE SKIP LOCKED). - Stuck event reset -- processing events older than a threshold are reset to
pending. - Pruning -- old published/failed events are deleted while pending and recent events are preserved.
- Sequence numbers -- auto-increment per aggregate and independent across aggregates.
Run the outbox integration suite with:
pnpm test:integration -- --filter testkit