tx-agent-kit
Apps

Worker

Temporal worker for durable workflow execution with strict determinism constraints.

Worker (apps/worker)

The worker application connects to Temporal and executes durable workflows and activities. It runs as a long-lived process that polls the tx-agent-kit task queue.

Architecture

The worker follows Temporal's standard separation of concerns. Workflows define the orchestration logic and must be deterministic. Activities perform side effects such as database queries, API calls, and I/O.

// apps/worker/src/index.ts
const worker = await Worker.create({
  connection,
  namespace: env.TEMPORAL_NAMESPACE,
  taskQueue: env.TEMPORAL_TASK_QUEUE,
  workflowsPath: workflowSourcePath,
  activities
})

Domain Events Outbox

The worker implements a transactional outbox pattern for domain events. Events are written to the domain_events outbox table inside the same database transaction as the originating command. The worker then polls this table and dispatches each event to a dedicated child workflow for processing.

Outbox Workflows

WorkflowPurpose
outboxPollerWorkflow(batchSize)Polls the outbox for unprocessed domain events and dispatches a child workflow per event type
organizationCreatedWorkflow(event)Handles organization.created events (sends a welcome email to the creator)
resetStuckEventsWorkflow(stuckThresholdMinutes)Resets events stuck in processing state beyond the configured threshold
prunePublishedEventsWorkflow(retentionDays)Deletes old published and failed events beyond the retention window

The poller workflow fetches a batch of unprocessed events, atomically marks each as processing (using FOR UPDATE SKIP LOCKED for concurrent safety), and starts a child workflow matched by event type via startChild with an ABANDON parent-close policy. On success the event is marked published; on failure it is marked failed with the error message recorded.

Idempotency

Each child workflow is started with a deterministic workflowId (e.g., organization-created-<eventId>) and workflowIdReusePolicy: 'REJECT_DUPLICATE'. If the child workflow was already started — for example because the poller ran again before the parent completed — Temporal returns WorkflowExecutionAlreadyStartedError. The poller treats this as a successful dispatch and marks the event published. This guarantees at-least-once processing without duplicate child workflows.

Activity Retry Policy

All activities are configured with the following retry policy:

SettingValue
startToCloseTimeout30 seconds
maximumAttempts3
initialInterval1 second

Failed activities are retried up to 3 times with 1-second initial backoff before the event is marked failed.

Schedules

The worker registers Temporal schedules at startup via apps/worker/src/schedules.ts. Schedules are created idempotently so restarts do not produce duplicates.

ScheduleIntervalWorkflow
Outbox pollerEvery 5 secondsoutboxPollerWorkflow
Stuck events resetEvery 120 secondsresetStuckEventsWorkflow
Prune published eventsEvery 24 hoursprunePublishedEventsWorkflow

All schedules use ScheduleOverlapPolicy.SKIP, meaning concurrent runs do not queue — the overlapping run is simply skipped.

Determinism Constraints

Temporal replays workflows from event history to rebuild state. Any non-deterministic operation inside a workflow will cause replay failures. The following are forbidden inside workflow code:

ForbiddenReason
Date.now() / new Date()Use Temporal's workflow.now() instead
Math.random()Use deterministic alternatives or activities
setTimeout / setIntervalUse Temporal's workflow.sleep()
clearTimeout / clearIntervalNot applicable in workflow context
Infrastructure importsWorkflows must not import database clients, HTTP clients, or other I/O modules

These constraints are enforced at the ESLint level through the domain-invariants configuration.

Activities

Activities are normal async functions that handle all side effects. They are injected into the worker at startup via apps/worker/src/activities.ts.

ActivityPurpose
pingHealth-check activity that returns 'pong'
fetchUnprocessedEventsQueries the outbox for unprocessed domain events up to the configured batch size
markEventsPublishedMarks a batch of event IDs as published after successful processing
markEventFailedRecords a failure reason against a single event ID
resetStuckProcessingEventsResets events stuck in processing beyond the threshold back to pending
prunePublishedEventsDeletes published and failed events older than the retention window
sendOrganizationWelcomeEmailSends a welcome email via Resend API when an organization is created

Welcome Email

The sendOrganizationWelcomeEmail activity sends a transactional email through the Resend API. It requires RESEND_API_KEY and RESEND_FROM_EMAIL to be configured. When either variable is missing the activity gracefully degrades: it logs a warning and returns without error so the event is still marked as published. If credentials are present but the Resend API returns a non-2xx response, the activity throws and Temporal retries it according to the activity retry policy. If all retries are exhausted, the event is marked failed.

Configuration

Environment variables are centralized in apps/worker/src/config/env.ts:

VariableDescription
TEMPORAL_RUNTIME_MODERuntime mode (cli or cloud)
TEMPORAL_ADDRESSTemporal server address
TEMPORAL_NAMESPACETemporal namespace
TEMPORAL_TASK_QUEUETask queue name (default: tx-agent-kit)
TEMPORAL_API_KEYRequired when runtime mode is cloud
TEMPORAL_TLS_ENABLEDMust be true when runtime mode is cloud
TEMPORAL_TLS_SERVER_NAMEOptional TLS SNI override for cloud/private endpoints
TEMPORAL_TLS_CA_CERT_PEMOptional TLS CA certificate PEM
TEMPORAL_TLS_CLIENT_CERT_PEMOptional client certificate PEM (must pair with key)
TEMPORAL_TLS_CLIENT_KEY_PEMOptional client key PEM (must pair with cert)
RESEND_API_KEYOptional Resend API key for transactional email
RESEND_FROM_EMAILOptional sender email address for outgoing mail
WEB_BASE_URLOptional base URL for dashboard links in emails
OUTBOX_POLL_BATCH_SIZEBatch size for outbox polling (default: 50)
OUTBOX_STUCK_THRESHOLD_MINUTESMinutes before a processing event is considered stuck (default: 5)
OUTBOX_PRUNE_RETENTION_DAYSDays to retain published/failed events before pruning (default: 30)
NODE_ENVRuntime environment (development, staging, production)
DATABASE_URLPostgreSQL connection string (required for outbox table access)
WORKER_SENTRY_DSNOptional Sentry DSN for error reporting

Direct process.env access is forbidden outside the config module.

Temporal Connection

Connection options are resolved by resolveWorkerTemporalConnectionOptions() in apps/worker/src/config/env.ts. The function produces different connection shapes depending on configuration:

  • Local (default): No TLS, no API key. Connects directly to TEMPORAL_ADDRESS (default: localhost:7233).
  • Temporal Cloud (API key): Set TEMPORAL_RUNTIME_MODE=cloud and TEMPORAL_API_KEY. TLS is enabled automatically. The temporal-namespace gRPC metadata header is injected.
  • mTLS: Set TEMPORAL_TLS_ENABLED=true and provide TEMPORAL_TLS_CLIENT_CERT_PEM + TEMPORAL_TLS_CLIENT_KEY_PEM. Both must be provided together; setting one without the other throws at startup.
  • Custom CA: TEMPORAL_TLS_CA_CERT_PEM overrides the root CA certificate for server verification.
  • SNI override: TEMPORAL_TLS_SERVER_NAME sets the TLS SNI hostname, useful for private endpoints where the hostname does not match the certificate.

PEM values in env vars may use literal \n escape sequences; the config module normalizes them to real newlines automatically.

Observability

The worker initializes OpenTelemetry on startup via @tx-agent-kit/observability:

await startTelemetry('tx-agent-kit-worker')

This exports traces, metrics, and logs to the configured OTEL collector endpoint.

Optional Sentry Errors

The worker also supports optional Sentry error reporting via WORKER_SENTRY_DSN.

  • If WORKER_SENTRY_DSN is blank/unset, Sentry is skipped.
  • Sentry is configured for errors only (tracesSampleRate: 0).
  • Unhandled worker bootstrap/runtime errors are captured before shutdown flush.

See Sentry (Optional) for complete setup.

Graceful Shutdown

The worker handles SIGINT and SIGTERM signals for clean shutdown. It stops accepting new workflow tasks, lets in-flight activities complete, then closes the Temporal connection and flushes telemetry.

Integration Testing

The worker integration suite validates the domain events outbox end-to-end. Key assertions include:

  • The outbox poller fetches unprocessed events and marks them as published after successful dispatch.
  • Duplicate event processing is idempotent.
  • Stuck events beyond the configured threshold are reset to unprocessed.
  • Published and failed events are pruned after the retention window expires.
  • The sendOrganizationWelcomeEmail activity gracefully degrades when Resend credentials are not configured.

Test fixtures are seeded per test (no shared table truncation).

On this page