Worker
Temporal worker for durable workflow execution with strict determinism constraints.
Worker (apps/worker)
The worker application connects to Temporal and executes durable workflows and activities. It runs as a long-lived process that polls the tx-agent-kit task queue.
Architecture
The worker follows Temporal's standard separation of concerns. Workflows define the orchestration logic and must be deterministic. Activities perform side effects such as database queries, API calls, and I/O.
// apps/worker/src/index.ts
const worker = await Worker.create({
connection,
namespace: env.TEMPORAL_NAMESPACE,
taskQueue: env.TEMPORAL_TASK_QUEUE,
workflowsPath: workflowSourcePath,
activities
})Domain Events Outbox
The worker implements a transactional outbox pattern for domain events. Events are written to the domain_events outbox table inside the same database transaction as the originating command. The worker then polls this table and dispatches each event to a dedicated child workflow for processing.
Outbox Workflows
| Workflow | Purpose |
|---|---|
outboxPollerWorkflow(batchSize) | Polls the outbox for unprocessed domain events and dispatches a child workflow per event type |
organizationCreatedWorkflow(event) | Handles organization.created events (sends a welcome email to the creator) |
resetStuckEventsWorkflow(stuckThresholdMinutes) | Resets events stuck in processing state beyond the configured threshold |
prunePublishedEventsWorkflow(retentionDays) | Deletes old published and failed events beyond the retention window |
The poller workflow fetches a batch of unprocessed events, atomically marks each as processing (using FOR UPDATE SKIP LOCKED for concurrent safety), and starts a child workflow matched by event type via startChild with an ABANDON parent-close policy. On success the event is marked published; on failure it is marked failed with the error message recorded.
Idempotency
Each child workflow is started with a deterministic workflowId (e.g., organization-created-<eventId>) and workflowIdReusePolicy: 'REJECT_DUPLICATE'. If the child workflow was already started — for example because the poller ran again before the parent completed — Temporal returns WorkflowExecutionAlreadyStartedError. The poller treats this as a successful dispatch and marks the event published. This guarantees at-least-once processing without duplicate child workflows.
Activity Retry Policy
All activities are configured with the following retry policy:
| Setting | Value |
|---|---|
startToCloseTimeout | 30 seconds |
maximumAttempts | 3 |
initialInterval | 1 second |
Failed activities are retried up to 3 times with 1-second initial backoff before the event is marked failed.
Schedules
The worker registers Temporal schedules at startup via apps/worker/src/schedules.ts. Schedules are created idempotently so restarts do not produce duplicates.
| Schedule | Interval | Workflow |
|---|---|---|
| Outbox poller | Every 5 seconds | outboxPollerWorkflow |
| Stuck events reset | Every 120 seconds | resetStuckEventsWorkflow |
| Prune published events | Every 24 hours | prunePublishedEventsWorkflow |
All schedules use ScheduleOverlapPolicy.SKIP, meaning concurrent runs do not queue — the overlapping run is simply skipped.
Determinism Constraints
Temporal replays workflows from event history to rebuild state. Any non-deterministic operation inside a workflow will cause replay failures. The following are forbidden inside workflow code:
| Forbidden | Reason |
|---|---|
Date.now() / new Date() | Use Temporal's workflow.now() instead |
Math.random() | Use deterministic alternatives or activities |
setTimeout / setInterval | Use Temporal's workflow.sleep() |
clearTimeout / clearInterval | Not applicable in workflow context |
| Infrastructure imports | Workflows must not import database clients, HTTP clients, or other I/O modules |
These constraints are enforced at the ESLint level through the domain-invariants configuration.
Activities
Activities are normal async functions that handle all side effects. They are injected into the worker at startup via apps/worker/src/activities.ts.
| Activity | Purpose |
|---|---|
ping | Health-check activity that returns 'pong' |
fetchUnprocessedEvents | Queries the outbox for unprocessed domain events up to the configured batch size |
markEventsPublished | Marks a batch of event IDs as published after successful processing |
markEventFailed | Records a failure reason against a single event ID |
resetStuckProcessingEvents | Resets events stuck in processing beyond the threshold back to pending |
prunePublishedEvents | Deletes published and failed events older than the retention window |
sendOrganizationWelcomeEmail | Sends a welcome email via Resend API when an organization is created |
Welcome Email
The sendOrganizationWelcomeEmail activity sends a transactional email through the Resend API. It requires RESEND_API_KEY and RESEND_FROM_EMAIL to be configured. When either variable is missing the activity gracefully degrades: it logs a warning and returns without error so the event is still marked as published. If credentials are present but the Resend API returns a non-2xx response, the activity throws and Temporal retries it according to the activity retry policy. If all retries are exhausted, the event is marked failed.
Configuration
Environment variables are centralized in apps/worker/src/config/env.ts:
| Variable | Description |
|---|---|
TEMPORAL_RUNTIME_MODE | Runtime mode (cli or cloud) |
TEMPORAL_ADDRESS | Temporal server address |
TEMPORAL_NAMESPACE | Temporal namespace |
TEMPORAL_TASK_QUEUE | Task queue name (default: tx-agent-kit) |
TEMPORAL_API_KEY | Required when runtime mode is cloud |
TEMPORAL_TLS_ENABLED | Must be true when runtime mode is cloud |
TEMPORAL_TLS_SERVER_NAME | Optional TLS SNI override for cloud/private endpoints |
TEMPORAL_TLS_CA_CERT_PEM | Optional TLS CA certificate PEM |
TEMPORAL_TLS_CLIENT_CERT_PEM | Optional client certificate PEM (must pair with key) |
TEMPORAL_TLS_CLIENT_KEY_PEM | Optional client key PEM (must pair with cert) |
RESEND_API_KEY | Optional Resend API key for transactional email |
RESEND_FROM_EMAIL | Optional sender email address for outgoing mail |
WEB_BASE_URL | Optional base URL for dashboard links in emails |
OUTBOX_POLL_BATCH_SIZE | Batch size for outbox polling (default: 50) |
OUTBOX_STUCK_THRESHOLD_MINUTES | Minutes before a processing event is considered stuck (default: 5) |
OUTBOX_PRUNE_RETENTION_DAYS | Days to retain published/failed events before pruning (default: 30) |
NODE_ENV | Runtime environment (development, staging, production) |
DATABASE_URL | PostgreSQL connection string (required for outbox table access) |
WORKER_SENTRY_DSN | Optional Sentry DSN for error reporting |
Direct process.env access is forbidden outside the config module.
Temporal Connection
Connection options are resolved by resolveWorkerTemporalConnectionOptions() in apps/worker/src/config/env.ts. The function produces different connection shapes depending on configuration:
- Local (default): No TLS, no API key. Connects directly to
TEMPORAL_ADDRESS(default:localhost:7233). - Temporal Cloud (API key): Set
TEMPORAL_RUNTIME_MODE=cloudandTEMPORAL_API_KEY. TLS is enabled automatically. Thetemporal-namespacegRPC metadata header is injected. - mTLS: Set
TEMPORAL_TLS_ENABLED=trueand provideTEMPORAL_TLS_CLIENT_CERT_PEM+TEMPORAL_TLS_CLIENT_KEY_PEM. Both must be provided together; setting one without the other throws at startup. - Custom CA:
TEMPORAL_TLS_CA_CERT_PEMoverrides the root CA certificate for server verification. - SNI override:
TEMPORAL_TLS_SERVER_NAMEsets the TLS SNI hostname, useful for private endpoints where the hostname does not match the certificate.
PEM values in env vars may use literal \n escape sequences; the config module normalizes them to real newlines automatically.
Observability
The worker initializes OpenTelemetry on startup via @tx-agent-kit/observability:
await startTelemetry('tx-agent-kit-worker')This exports traces, metrics, and logs to the configured OTEL collector endpoint.
Optional Sentry Errors
The worker also supports optional Sentry error reporting via WORKER_SENTRY_DSN.
- If
WORKER_SENTRY_DSNis blank/unset, Sentry is skipped. - Sentry is configured for errors only (
tracesSampleRate: 0). - Unhandled worker bootstrap/runtime errors are captured before shutdown flush.
See Sentry (Optional) for complete setup.
Graceful Shutdown
The worker handles SIGINT and SIGTERM signals for clean shutdown. It stops accepting new workflow tasks, lets in-flight activities complete, then closes the Temporal connection and flushes telemetry.
Integration Testing
The worker integration suite validates the domain events outbox end-to-end. Key assertions include:
- The outbox poller fetches unprocessed events and marks them as
publishedafter successful dispatch. - Duplicate event processing is idempotent.
- Stuck events beyond the configured threshold are reset to
unprocessed. - Published and failed events are pruned after the retention window expires.
- The
sendOrganizationWelcomeEmailactivity gracefully degrades when Resend credentials are not configured.
Test fixtures are seeded per test (no shared table truncation).