tx-agent-kit
Guides

Adding Workflows

How to add Temporal workflows and activities with determinism constraints.

Temporal workflows provide durable, fault-tolerant execution for long-running operations. This guide covers how to add new workflows, activities, and schedules to the worker.

Workflow vs Activity

ConcernWorkflowsActivities
PurposeOrchestration logicSide effects
DeterminismRequiredNot required
I/OForbiddenAllowed
Replay safeYesN/A
Locationapps/worker/src/workflows.tsapps/worker/src/activities.ts

Adding an Activity

Activities are normal async functions registered with the worker. They handle all side effects: database queries, API calls, file I/O. All activities are exported as methods on a single activities object:

// apps/worker/src/activities.ts
import { createLogger } from '@tx-agent-kit/logging'
import { domainEventsRepository } from '@tx-agent-kit/db'
import { Effect } from 'effect'

const logger = createLogger('tx-agent-kit-worker-activities')

export const activities = {
  fetchUnprocessedEvents: async (batchSize: number): Promise<ReadonlyArray<SerializedDomainEvent>> => {
    const events = await runEffect(
      domainEventsRepository.fetchUnprocessed(batchSize)
    )
    // Serialize dates and payload for Temporal's JSON transport
    return events.map((event) => ({
      id: event.id,
      eventType: event.eventType,
      aggregateType: event.aggregateType,
      aggregateId: event.aggregateId,
      payload: toJsonRecord(event.payload),
      correlationId: event.correlationId,
      sequenceNumber: event.sequenceNumber,
      status: event.status,
      occurredAt: event.occurredAt.toISOString(),
      processingAt: event.processingAt ? event.processingAt.toISOString() : null,
      publishedAt: event.publishedAt ? event.publishedAt.toISOString() : null,
      failedAt: event.failedAt ? event.failedAt.toISOString() : null,
      failureReason: event.failureReason,
      createdAt: event.createdAt.toISOString()
    }))
  },

  markEventsPublished: async (eventIds: ReadonlyArray<string>): Promise<void> => {
    await runEffect(domainEventsRepository.markPublished(eventIds))
  },

  markEventFailed: async (eventId: string, reason: string): Promise<void> => {
    await runEffect(domainEventsRepository.markFailed(eventId, reason))
  },

  sendOrganizationWelcomeEmail: async (payload: {
    organizationName: string
    ownerUserId: string
    ownerEmail: string
  }): Promise<void> => {
    // Side-effect: send email via external API
  }
}

Activities should be idempotent when possible. If an activity is retried due to a worker crash, it should produce the same result or detect the duplicate.

The SerializedDomainEvent interface defines the wire format that activities return and workflows consume:

// apps/worker/src/activities.ts
export interface SerializedDomainEvent {
  id: string
  eventType: string
  aggregateType: string
  aggregateId: string
  payload: Record<string, unknown>
  correlationId: string | null
  sequenceNumber: number
  status: string
  occurredAt: string
  processingAt: string | null
  publishedAt: string | null
  failedAt: string | null
  failureReason: string | null
  createdAt: string
}

Adding a Workflow

Workflows orchestrate activities and must be fully deterministic. Use proxyActivities to create typed activity stubs with a shared retry policy:

// apps/worker/src/workflows.ts
import { WorkflowExecutionAlreadyStartedError } from '@temporalio/common'
import {
  ParentClosePolicy,
  startChild,
  proxyActivities
} from '@temporalio/workflow'
import type { activities, SerializedDomainEvent } from './activities.js'

const {
  fetchUnprocessedEvents,
  markEventsPublished,
  markEventFailed,
  sendOrganizationWelcomeEmail
} = proxyActivities<typeof activities>({
  startToCloseTimeout: '30 seconds',
  retry: {
    maximumAttempts: 3,
    initialInterval: '1 second'
  }
})

All activity stubs share the same startToCloseTimeout and retry config. Individual activities can override these by creating a separate proxyActivities call with different options.

The Outbox Poller Pattern

The primary workflow pattern in the codebase is the outbox poller. A scheduled workflow fetches unprocessed domain events, validates each event's payload, dispatches a child workflow per event type, and marks dispatched events as published:

// apps/worker/src/workflows.ts
export async function outboxPollerWorkflow(batchSize: number): Promise<void> {
  const events = await fetchUnprocessedEvents(batchSize)

  if (events.length === 0) {
    return
  }

  const dispatched: string[] = []

  for (const event of events) {
    try {
      switch (event.eventType) {
        case 'organization.created': {
          // Validate payload before dispatching
          const hasValidPayload =
            typeof event.payload.organizationName === 'string'
            && typeof event.payload.ownerUserId === 'string'
            && typeof event.payload.ownerEmail === 'string'

          if (!hasValidPayload) {
            await markEventFailed(event.id, `Invalid organization.created payload for event ${event.id}`)
            break
          }

          await startChild(organizationCreatedWorkflow, {
            workflowId: `organization-created-${event.id}`,
            args: [event],
            parentClosePolicy: ParentClosePolicy.ABANDON,
            workflowIdReusePolicy: 'REJECT_DUPLICATE',
            workflowRunTimeout: '5 minutes'
          })
          dispatched.push(event.id)
          break
        }
        default:
          await markEventFailed(event.id, `Unknown event type '${event.eventType}'`)
          break
      }
    } catch (err: unknown) {
      if (err instanceof WorkflowExecutionAlreadyStartedError) {
        // Already dispatched — treat as success
        dispatched.push(event.id)
      } else {
        const message = err instanceof Error ? err.message : String(err)
        await markEventFailed(event.id, `Failed to dispatch: ${message}`)
      }
    }
  }

  if (dispatched.length > 0) {
    await markEventsPublished(dispatched)
  }
}

Key design decisions in the outbox poller:

  • startChild dispatches each event as an independent child workflow. The ParentClosePolicy.ABANDON option keeps the child running even if the poller completes.
  • workflowIdReusePolicy: 'REJECT_DUPLICATE' provides idempotency. If the same event ID has already been dispatched, Temporal throws WorkflowExecutionAlreadyStartedError, which the poller catches and treats as success.
  • Payload validation happens before dispatch. Invalid payloads are marked as failed immediately, preventing child workflow failures.
  • Unknown event types are marked as failed rather than silently dropped.

Domain Event Workflows

Event-driven child workflows receive a SerializedDomainEvent payload. The workflow should validate the payload fields it needs before processing:

// apps/worker/src/workflows.ts
export async function organizationCreatedWorkflow(
  event: SerializedDomainEvent
): Promise<void> {
  const organizationName = typeof event.payload.organizationName === 'string'
    ? event.payload.organizationName
    : undefined
  const ownerUserId = typeof event.payload.ownerUserId === 'string'
    ? event.payload.ownerUserId
    : undefined
  const ownerEmail = typeof event.payload.ownerEmail === 'string'
    ? event.payload.ownerEmail
    : undefined

  if (!organizationName || !ownerUserId) {
    throw new Error(
      `Invalid organization.created payload for event ${event.id}: missing organizationName or ownerUserId`
    )
  }

  await sendOrganizationWelcomeEmail({
    organizationName,
    ownerUserId,
    ownerEmail: ownerEmail ?? ''
  })
}

When adding a new domain event type:

  1. Register the event type string in packages/contracts/src/literals.ts (domainEventTypes).
  2. Add a typed *EventPayload interface in the domain's domain/*-events.ts.
  3. Add a matching *EventPayloadSchema in packages/temporal-client/src/types/domain-event.ts.
  4. Add a case branch in outboxPollerWorkflow to dispatch the new child workflow.
  5. Implement the child workflow function that validates and processes the event.

Maintenance Workflows

The worker also includes maintenance workflows for outbox hygiene:

export async function resetStuckEventsWorkflow(
  stuckThresholdMinutes: number
): Promise<ReadonlyArray<string>> {
  return resetStuckProcessingEvents(stuckThresholdMinutes)
}

export async function prunePublishedEventsWorkflow(
  retentionDays: number
): Promise<number> {
  return prunePublishedEvents(retentionDays)
}

These are thin wrappers around activities that run on a schedule to keep the outbox table healthy.

Determinism Rules

The following are strictly forbidden inside workflow code:

// FORBIDDEN in workflows:
Date.now()                    // Use workflow.now()
new Date()                    // Use workflow.now()
Math.random()                 // Use deterministic alternatives
setTimeout(fn, ms)            // Use workflow.sleep(ms)
setInterval(fn, ms)           // Not applicable
clearTimeout(id)              // Not applicable
clearInterval(id)             // Not applicable
import { db } from '...'     // No infrastructure imports
import { fetch } from '...'  // No I/O imports

These constraints exist because Temporal replays workflow code from event history. Any non-deterministic operation produces different results on replay, causing the workflow to fail.

Safe Alternatives

ForbiddenUse Instead
Date.now()Temporal's workflow.now()
Math.random()Pass random values as activity results
setTimeoutworkflow.sleep('5s')
Side effectsMove to activities

Registering with the Worker

Activities are passed to the worker at creation time:

const worker = await Worker.create({
  connection,
  namespace: env.TEMPORAL_NAMESPACE,
  taskQueue: env.TEMPORAL_TASK_QUEUE,
  workflowsPath: workflowSourcePath,
  activities,
  shutdownGraceTime: '30s'
})

New workflows are automatically discovered from the workflowsPath file. Any exported async function in workflows.ts becomes a registered workflow type.

Adding a Schedule

Temporal schedules run workflows on a recurring interval. The worker registers all schedules at startup via ensureSchedule functions defined in apps/worker/src/schedules.ts.

Schedule Pattern

Each schedule follows a describe/update-or-create pattern with ScheduleOverlapPolicy.SKIP to prevent overlapping executions:

// apps/worker/src/schedules.ts
import type { Client } from '@temporalio/client'
import { isGrpcServiceError, ScheduleOverlapPolicy } from '@temporalio/client'
import { createLogger } from '@tx-agent-kit/logging'

const logger = createLogger('tx-agent-kit-worker-schedules')
const GRPC_STATUS_NOT_FOUND = 5
const GRPC_STATUS_ALREADY_EXISTS = 6

export async function ensureOutboxPollerSchedule(
  client: Client,
  taskQueue: string,
  intervalSeconds: number,
  batchSize: number
): Promise<void> {
  const handle = client.schedule.getHandle('outbox-poller-schedule')

  try {
    // Try to describe the existing schedule, then update it
    await handle.describe()
    await handle.update((prev) => ({
      ...prev,
      spec: {
        intervals: [{ every: `${intervalSeconds}s` }]
      },
      action: {
        type: 'startWorkflow' as const,
        workflowType: 'outboxPollerWorkflow',
        taskQueue,
        args: [batchSize]
      },
      policies: {
        ...prev.policies,
        overlap: ScheduleOverlapPolicy.SKIP
      }
    }))
    logger.info('Updated outbox poller schedule.', { intervalSeconds, batchSize })
  } catch (error: unknown) {
    const isNotFound =
      isGrpcServiceError(error) && Number(error.code) === GRPC_STATUS_NOT_FOUND

    if (!isNotFound) {
      throw error
    }

    // Schedule does not exist yet — create it
    try {
      await client.schedule.create({
        scheduleId: 'outbox-poller-schedule',
        spec: {
          intervals: [{ every: `${intervalSeconds}s` }]
        },
        action: {
          type: 'startWorkflow',
          workflowType: 'outboxPollerWorkflow',
          taskQueue,
          args: [batchSize]
        },
        policies: {
          overlap: ScheduleOverlapPolicy.SKIP
        }
      })
      logger.info('Created outbox poller schedule.', { intervalSeconds, batchSize })
    } catch (createError: unknown) {
      // Another worker instance may have created it concurrently
      const isAlreadyExists =
        isGrpcServiceError(createError) && Number(createError.code) === GRPC_STATUS_ALREADY_EXISTS

      if (!isAlreadyExists) {
        throw createError
      }

      logger.info('Outbox poller schedule already created by another worker instance.')
    }
  }
}

The describe/update-or-create flow handles three scenarios:

  1. Schedule exists -- update its spec and action to match the current config.
  2. Schedule does not exist -- create it from scratch.
  3. Race condition -- another worker instance created it between our check and create. The ALREADY_EXISTS gRPC error is caught and treated as success.

Registering Schedules at Startup

Schedules are registered in the worker's run function after the worker is created but before worker.run() blocks:

// apps/worker/src/index.ts
await ensureOutboxPollerSchedule(temporalClient, env.TEMPORAL_TASK_QUEUE, 5, env.OUTBOX_POLL_BATCH_SIZE)
await ensureStuckEventsResetSchedule(temporalClient, env.TEMPORAL_TASK_QUEUE, 120, env.OUTBOX_STUCK_THRESHOLD_MINUTES)
await ensurePrunePublishedSchedule(temporalClient, env.TEMPORAL_TASK_QUEUE, 24, env.OUTBOX_PRUNE_RETENTION_DAYS)
await ensureDataRetentionSchedule(temporalClient, env.TEMPORAL_TASK_QUEUE, 24)

Current Schedules

ScheduleIntervalWorkflowPurpose
outbox-poller-schedule5soutboxPollerWorkflowFetch and dispatch pending domain events
stuck-events-reset-schedule120sresetStuckEventsWorkflowReset events stuck in processing state
prune-published-events-schedule24hprunePublishedEventsWorkflowDelete old published/failed events
data-retention-schedule24hdataRetentionWorkflowGeneral data retention housekeeping

Adding a New Schedule

  1. Define the ensureYourSchedule function in apps/worker/src/schedules.ts following the describe/update-or-create pattern.
  2. Use ScheduleOverlapPolicy.SKIP so that a slow execution does not pile up.
  3. Call your ensureYourSchedule function from apps/worker/src/index.ts during startup.
  4. Add any new env vars for interval/threshold to apps/worker/src/config/env.ts.

Integration Testing

Outbox Lifecycle Testing

The domain events outbox integration suite validates the full event lifecycle. Tests use createDbAuthContext to wire up a real database context and exercise the outbox state machine:

// packages/testkit/src/domain-events-outbox.integration.test.ts
it('supports full event lifecycle: pending -> processing -> published', async () => {
  const user = await dbAuthContext.createUser({
    email: 'lifecycle@example.com',
    password: 'strong-pass-12345',
    name: 'Lifecycle Test User'
  })

  const org = await dbAuthContext.createTeam({
    token: user.token,
    name: 'Lifecycle Org'
  })

  // Verify event was created with pending status
  const events = await dbAuthContext.testContext.withSchemaClient(async (client) =>
    queryDomainEventsByAggregate(client, 'organization', org.id)
  )
  expect(events).toHaveLength(1)
  expect(events[0]!.status).toBe('pending')

  // Claim events for processing (simulates outbox poller fetch)
  await dbAuthContext.testContext.withSchemaClient(async (client) =>
    claimPendingEventsForProcessing(client, 1)
  )

  const afterClaim = await dbAuthContext.testContext.withSchemaClient(async (client) =>
    queryDomainEventById(client, events[0]!.id)
  )
  expect(afterClaim!.status).toBe('processing')

  // Mark as published (simulates successful child workflow dispatch)
  await dbAuthContext.testContext.withSchemaClient(async (client) =>
    markProcessingEventsPublished(client, [events[0]!.id])
  )

  const afterPublish = await dbAuthContext.testContext.withSchemaClient(async (client) =>
    queryDomainEventById(client, events[0]!.id)
  )
  expect(afterPublish!.status).toBe('published')
})

Key Test Scenarios

The outbox integration suite covers:

  • Atomicity -- exactly one event per aggregate creation.
  • Lifecycle transitions -- pending to processing to published (and failed).
  • State guards -- markPublished only affects processing events; calling it on pending is a no-op.
  • Idempotency -- second markPublished on an already-published event is a no-op.
  • Concurrency -- two concurrent pollers cannot claim the same event (FOR UPDATE SKIP LOCKED).
  • Stuck event reset -- processing events older than a threshold are reset to pending.
  • Pruning -- old published/failed events are deleted while pending and recent events are preserved.
  • Sequence numbers -- auto-increment per aggregate and independent across aggregates.

Run the outbox integration suite with:

pnpm test:integration -- --filter testkit

On this page