Data Flow
End-to-end data flow from client applications through the API to the database and worker
This page traces the end-to-end data flow through tx-agent-kit, from a user action in the browser to the database and back.
Overview
┌─────────────┐ HTTP ┌─────────────┐ Effect ┌─────────────┐
│ Web/Mobile │ ────────────▶ │ API │ ────────────▶ │ Core │
│ (Client) │ │ (Effect │ │ (Domain │
│ │ ◀──────────── │ HttpApi) │ ◀──────────── │ Logic) │
└─────────────┘ JSON └──────┬──────┘ Result └─────────────┘
│
┌───────▼───────┐
│ DB │
│ (Drizzle/ │
│ PostgreSQL) │
└───────┬───────┘
│
┌───────▼───────┐
│ Worker │
│ (Temporal │
│ Workflows) │
└───────────────┘Request lifecycle
1. Client sends request
The web app (or mobile app) sends an HTTP request to the API server. The web app uses a typed API client layer generated from the OpenAPI spec. It never uses raw fetch.
// apps/web - using generated API hooks
const { data: organizations } = useListOrganizations()The request goes directly to API_BASE_URL (e.g., http://localhost:4000). There is no proxy layer or BFF. The web app is a pure client that communicates with the API over HTTP.
On the web client, the short-lived access token is kept in memory and attached as a bearer header. Refresh persistence lives in an HttpOnly cookie, so session restoration happens through /v1/auth/refresh instead of browser storage.
2. API receives and validates
The Effect HttpApi server receives the request, matches it to a route, and validates the input using effect/Schema contracts from packages/contracts:
// apps/api - route handler
const CreateOrganizationRoute = HttpApiEndpoint.post("createOrganization", "/v1/organizations")
.setPayload(createOrganizationRequestSchema) // effect/Schema from packages/contracts
.addSuccess(organizationSchema)
.addError(UnauthorizedError)If validation fails, the API returns a typed error response immediately. No domain code is executed for invalid requests.
3. Domain logic executes
For valid requests, the route handler calls the application service in packages/core:
// Application service orchestrates domain logic
const org = yield* organizationService.create({
name: payload.name,
})The application service calls domain functions for business rule validation, invokes repository ports for data access, coordinates with other services when cross-domain logic is involved, and returns typed success or failure results back to the route handler.
4. Repository persists data
The repository implementation in packages/infra/db translates domain operations into Drizzle queries:
// packages/infra/db - repository implementation
create: (input) => Effect.tryPromise(() =>
db.insert(organizations).values({
id: generateId(),
name: input.name,
}).returning().then(rows => rows[0]!)
)The repository returns domain record types, not Drizzle row types. The mapping between database rows and domain records happens here.
5. Response returns to client
The result flows back through every layer in reverse. The repository returns a domain record to the application service, which passes the result to the route handler. The route handler serializes the response using the effect/Schema contract, and the API sends the JSON response to the client. Finally, the client receives typed data through the generated API hooks.
6. Background work (domain events outbox)
Some operations produce side effects that must happen asynchronously. Instead of importing @temporalio/* from the API (which is a hard architectural constraint), the API writes a domain event transactionally alongside the business data into the domain_events outbox table:
// Application service writes the record and event in one transaction
const org = yield* organizationService.createWithEvent({
name: payload.name,
})
// The domain_events row is committed atomically with the organization rowThe Temporal worker polls the outbox table on a 5-second schedule, atomically claims a batch of pending events, and dispatches child workflows to handle them:
// Worker atomically claims pending events (FOR UPDATE SKIP LOCKED)
// This SQL transitions matching rows from 'pending' to 'processing' in one statement
const events = await fetchUnprocessedEvents(batchSize)
for (const event of events) {
// Dispatch a non-blocking child workflow per event type
await startChild(handleOrganizationCreated, {
workflowId: `organization-created-${event.id}`, // deterministic for idempotency
parentClosePolicy: 'ABANDON',
args: [event],
})
}
// Mark all dispatched events as 'published' in a single batch
await markEventsPublished(dispatchedIds)If a child workflow was already started (e.g., from a previous poller run), Temporal returns WorkflowExecutionAlreadyStartedError, which the poller treats as a success — the event is still marked published.
Event state machine
Events move through four states:
pending → processing → published
↘ failed (with failure_reason)
processing → pending (stuck event reset after threshold)- pending: Written by the API transaction, waiting for the poller to claim it.
- processing: Claimed by the poller via
FOR UPDATE SKIP LOCKED. If the worker crashes during dispatch, theresetStuckEventsWorkflow(every 120s) resets these back topending. - published: Successfully dispatched to a child workflow.
- failed: Dispatch or validation failed. The failure reason is recorded for debugging.
Temporal schedules
The worker registers three Temporal schedules at startup:
| Schedule | Interval | Purpose |
|---|---|---|
| Outbox poller | Every 5 seconds | Claims pending events, dispatches child workflows |
| Stuck events reset | Every 120 seconds | Resets processing rows older than the threshold back to pending |
| Prune published events | Every 24 hours | Deletes published and failed rows older than the retention window |
All schedules use ScheduleOverlapPolicy.SKIP — overlapping runs are not queued.
Retention
Published and failed events are pruned daily by prunePublishedEventsWorkflow. The retention period is controlled by OUTBOX_PRUNE_RETENTION_DAYS (default: 30). The domain_events table is listed in the retentionTableNames contract registry. Financial audit trail tables (usage_records, credit_ledger) are intentionally excluded from all retention policies.
This outbox pattern guarantees that domain events are never lost (they share a transaction with the business write), the API remains decoupled from Temporal (no @temporalio/* imports), and events are processed at least once with durable retry semantics.
Strict boundaries
The data flow enforces strict boundaries between layers:
| Boundary | Rule |
|---|---|
| Client to API | HTTP only. No shared process, no direct DB access. |
| API to Core | Effect services. Domain logic has no knowledge of HTTP. |
| Core to DB | Port interfaces. Domain defines the contract, DB implements it. |
| API to Worker | Domain events outbox table. Async, durable, decoupled. The API never imports @temporalio/*. |
These boundaries mean you can replace the web frontend without touching the API, switch databases without changing domain logic, scale the worker independently of the API, and test each layer in isolation.
What the web app never does
To reinforce the architecture, the web app is explicitly prohibited from a set of operations, all enforced by ESLint rules and structural invariant checks:
| Prohibition | Enforcement |
|---|---|
| Querying the database directly | ESLint no-restricted-imports on drizzle-orm |
| Running server-side code (no API routes, no middleware) | Structural invariant check |
| Importing Effect or Drizzle | ESLint no-restricted-imports |
Managing browser-visible auth token state outside lib/auth-token.ts | ESLint restriction + centralized auth-token module |
Using raw fetch instead of the typed API client | ESLint restriction |
Reading window.location directly instead of lib/url-state.tsx | ESLint restriction |