Outbox
@modularityjs/outbox implements the transactional outbox pattern — durable, at-least-once delivery of events whose visibility must commit atomically with the database writes that produced them.
The Problem
The classic broken pattern: write to the database, then publish an event.
await db.transaction(async () => {
await db.update('orders', { id, status: 'paid' });
await events.publish(new OrderPaid({ id })); // can fail independently
});Three failure modes that retries cannot fix:
- DB commits, then
events.publishthrows (network blip) → lost event. events.publishsucceeds, then the transaction rolls back → phantom event.- The process crashes between commit and publish → same as (1).
The Pattern
Don't publish directly. Inside the same transaction, write to a regular outbox table:
await db.transaction(async (em) => {
await em.update(Order, { id }, { status: 'paid' });
await outbox.enqueueWith(em, [{ topic: 'order.paid', payload: { id } }]);
});Both writes commit atomically. A separate dispatcher polls the outbox, publishes pending rows to the real bus, and marks them dispatched on success — survives crashes, survives rollbacks, costs you one poll-interval of latency in exchange for never losing or hallucinating events.
Contract
abstract class OutboxStore {
// Hot path
abstract enqueue(entries: OutboxEntry[]): Promise<void>;
abstract fetchPending(limit: number): Promise<OutboxRow[]>;
abstract markDispatched(ids: string[]): Promise<void>;
abstract markFailed(id: string, error: string): Promise<void>;
abstract markDead(id: string, error: string): Promise<void>;
// Maintenance / inspection
abstract count(): Promise<OutboxCounts>;
abstract peek(
filter: { state?: OutboxState },
limit: number,
): Promise<OutboxRow[]>;
abstract requeue(id: string): Promise<void>;
abstract purgeDispatched(olderThan: Date): Promise<number>;
}
abstract class OutboxPublisher {
abstract publish(row: OutboxRow): Promise<void>;
}
interface OutboxRow {
id: string;
topic: string;
payload: unknown;
createdAt: Date;
dispatchedAt?: Date;
deadAt?: Date;
attempts: number;
lastError?: string;
}
type OutboxState = 'pending' | 'dispatched' | 'dead';
interface OutboxCounts {
pending: number;
dispatched: number;
dead: number;
total: number;
}A row is in exactly one of three states:
- pending —
dispatchedAtanddeadAtare both null. Returned byfetchPending. - dispatched —
dispatchedAtis set. Excluded fromfetchPending. Eligible for retention purging. - dead —
deadAtis set afterattemptsreachedOutboxConfig.maxAttempts. Excluded fromfetchPending. Operators triage manually (outbox:list --state dead, then either fix the consumer andoutbox:retry <id>, or accept the loss).
OutboxDispatcher.dispatchPending does the work:
class OutboxDispatcher {
async dispatchPending(limit?: number): Promise<DispatchSummary> {
// 1. fetchPending(batchSize) — claims rows
// 2. for each row: publisher.publish(row)
// - on success: collect id for bulk markDispatched
// - on throw: if attempts+1 >= maxAttempts → markDead, else markFailed
// 3. bulk markDispatched(successful ids)
// returns { fetched, dispatched, failed, dead }
}
}OutboxConfig.batchSize (default 50) controls fetch size. OutboxConfig.maxAttempts (default unlimited) caps retries — once the next attempt would equal it, a failed publish moves the row to dead instead of staying pending. Apps drive the dispatcher from their scheduler of choice; built-in options are described below.
Stores
TypeORM (@modularityjs/outbox-database-typeorm)
Persists rows in an outbox table managed by the TypeORM driver. The entity is automatically registered via DatabaseEntitiesPool, so synchronize: true (or a generated migration) creates the table.
import { DatabaseModule } from '@modularityjs/database';
import { DatabaseTypeormModule } from '@modularityjs/database-typeorm';
import { OutboxModule } from '@modularityjs/outbox';
import {
OutboxTypeormModule,
TypeormOutboxStore,
} from '@modularityjs/outbox-database-typeorm';
const modules = [
DatabaseModule,
DatabaseTypeormModule.forRoot({ type: 'postgres', synchronize: true }),
OutboxModule,
OutboxTypeormModule.forRoot({ claimTimeoutMs: 5 * 60_000 }),
];Transactional Enqueue
To get the atomicity guarantee, inject TypeormOutboxStore (driver-typed, not the abstract OutboxStore) and use enqueueWith:
@Injectable()
class OrderService {
constructor(
@Inject(DataSource) private readonly db: DataSource,
@Inject(TypeormOutboxStore) private readonly outbox: TypeormOutboxStore,
) {}
async pay(id: string): Promise<void> {
await this.db.transaction(async (em) => {
await em.update(Order, { id }, { status: 'paid' });
await this.outbox.enqueueWith(em, [
{ topic: 'order.paid', payload: { id } },
]);
});
}
}Both writes commit (or roll back) together.
Multi-Dispatcher Safety
fetchPending opens a short transaction that:
- Selects candidate rows where
dispatched_at IS NULLand not currently claimed (or whose claim is older thanclaimTimeoutMs). - Issues an atomic
UPDATEsettingclaimed_at = NOW()and a uniqueclaim_tokenonly on rows that are still unclaimed. - Reads back the rows it actually claimed (filtered by
claim_token).
Two dispatcher instances calling fetchPending see disjoint subsets — neither double-publishes a row. If a dispatcher crashes mid-publish, its claim becomes stale after claimTimeoutMs and another instance picks the row up. markFailed clears the claim immediately so the row is retried on the next tick.
Prisma (@modularityjs/outbox-database-prisma)
Same surface as the TypeORM driver, against a Prisma-managed outbox table. Postgres only in v0 — the driver issues raw SQL with $N placeholders via $queryRawUnsafe / $executeRawUnsafe.
Add the model to your schema.prisma:
model Outbox {
id String @id @db.VarChar(36)
topic String @db.VarChar(255)
payload Json?
createdAt DateTime @default(now()) @map("created_at")
dispatchedAt DateTime? @map("dispatched_at")
deadAt DateTime? @map("dead_at")
claimedAt DateTime? @map("claimed_at")
claimToken String? @map("claim_token") @db.VarChar(36)
attempts Int @default(0)
lastError String? @map("last_error")
@@map("outbox")
@@index([dispatchedAt, deadAt, createdAt], name: "idx_outbox_pending")
@@index([claimedAt], name: "idx_outbox_claimed")
}Then wire it up:
import { OutboxModule } from '@modularityjs/outbox';
import { DatabasePrismaModule } from '@modularityjs/database-prisma';
import {
OutboxPrismaModule,
PrismaOutboxStore,
} from '@modularityjs/outbox-database-prisma';
const modules = [
DatabasePrismaModule.forRoot({ clientFactory: () => new PrismaClient() }),
OutboxModule,
OutboxPrismaModule.forRoot({ claimTimeoutMs: 5 * 60_000 }),
];For transactional enqueue, inject PrismaOutboxStore directly and use the transaction-scoped client from prisma.$transaction:
@Injectable()
class OrderService {
constructor(
@Inject(PrismaClientToken) private readonly prisma: PrismaClient,
@Inject(PrismaOutboxStore) private readonly outbox: PrismaOutboxStore,
) {}
async pay(id: string): Promise<void> {
await this.prisma.$transaction(async (tx) => {
await tx.order.update({ where: { id }, data: { status: 'paid' } });
await this.outbox.enqueueWith(tx, [
{ topic: 'order.paid', payload: { id } },
]);
});
}
}Memory (@modularityjs/outbox-memory)
In-memory store for tests and single-process apps. Non-durable across restarts.
import { OutboxModule } from '@modularityjs/outbox';
import { OutboxMemoryModule } from '@modularityjs/outbox-memory';
const modules = [OutboxModule, OutboxMemoryModule];Publishers
OutboxPublisher is a contract with no default driver — apps wire one explicitly.
Events (@modularityjs/outbox-events)
Dispatches each outbox row as a generic OutboxEvent through the EventBus. Listeners use @OnEvent(OutboxEvent) and select by topic:
import { EventsModule, OnEvent } from '@modularityjs/events';
import { EventsMemoryModule } from '@modularityjs/events-memory';
import { OutboxEvent, OutboxEventsModule } from '@modularityjs/outbox-events';
const modules = [EventsModule, EventsMemoryModule, OutboxEventsModule];
@Injectable()
class OrderProjector {
@OnEvent(OutboxEvent, { name: 'order-projector' })
on(event: OutboxEvent) {
if (event.topic === 'order.paid') {
const { id } = event.payload as { id: string };
// …project to read model, send email, etc.
}
}
}If any handler errors, OutboxEventsPublisher throws — the outbox dispatcher then markFaileds the row and retries on the next tick. Listeners must be idempotent because at-least-once delivery means duplicates are possible (same row republished after a partial failure).
Custom Publisher
For HTTP webhooks, queue providers, Kafka, etc., implement OutboxPublisher directly:
@Injectable()
class HttpWebhookPublisher extends OutboxPublisher {
async publish(row: OutboxRow): Promise<void> {
const response = await fetch('https://example.com/hook', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ topic: row.topic, payload: row.payload }),
});
if (!response.ok) {
throw new Error(`webhook ${response.status}`);
}
}
}Bind it via preferences: [{ provide: OutboxPublisher, useClass: HttpWebhookPublisher }].
Driving the Dispatcher
OutboxDispatcher.dispatchPending() is invoked manually — the core contract has no built-in polling loop. Three options, in order of ergonomics:
Option 1 — outbox-scheduler extension (@modularityjs/outbox-scheduler)
The fastest path: drop in the extension module and a cron expression.
import { OutboxSchedulerModule } from '@modularityjs/outbox-scheduler';
import { SchedulerCronerModule } from '@modularityjs/scheduler-croner';
const modules = [
// …outbox + scheduler driver…
SchedulerCronerModule,
OutboxSchedulerModule.forRoot({
schedule: '*/1 * * * * *', // every second
lockTtlMs: 30_000,
}),
];Internally this contributes an OutboxDispatchJob to ScheduledJobsPool whose execute() calls OutboxDispatcher.dispatchPending(). The scheduler driver handles distributed locking via lockTtlMs so multiple instances won't fire simultaneously.
Option 2 — outbox:dispatch CLI command (@modularityjs/outbox-cli)
For one-off drains or worker pods that run on a separate cron:
$ myapp outbox:dispatch
outbox:dispatch fetched=12 dispatched=12 failed=0
# Drain everything pending
$ myapp outbox:dispatch --loop
# Smaller batch
$ myapp outbox:dispatch --limit 10Add OutboxCliModule to the CLI app's modules list to register the command.
Option 3 — Roll your own
If you need custom scheduling (HTTP endpoint, queue worker, manual button in an admin UI), inject OutboxDispatcher and call it directly:
import { Inject, Injectable } from '@modularityjs/di';
import { OutboxDispatcher } from '@modularityjs/outbox';
import type { ScheduledJob } from '@modularityjs/scheduler';
@Injectable()
class OutboxPoller implements ScheduledJob {
readonly name = 'outbox-poller';
readonly schedule = '*/1 * * * * *';
constructor(
@Inject(OutboxDispatcher) private readonly dispatcher: OutboxDispatcher,
) {}
async execute() {
await this.dispatcher.dispatchPending();
}
}Retention
The outbox table grows monotonically — every dispatched row stays around forever unless something deletes it. OutboxRetentionModule (also in @modularityjs/outbox-scheduler) runs a periodic sweep that deletes dispatched rows older than the retention window. Dead rows are not purged; operators triage them by hand.
import {
OutboxRetentionModule,
OutboxSchedulerModule,
} from '@modularityjs/outbox-scheduler';
const modules = [
OutboxSchedulerModule, // dispatch loop
OutboxRetentionModule.forRoot({
retentionMs: 7 * 24 * 60 * 60_000, // 7 days
schedule: '0 3 * * *', // 03:00 UTC daily
}),
];Defaults: 7-day retention, daily 03:00 UTC sweep, 60s lock TTL. The job logs only when it actually deletes rows.
Inspection and Operations
@modularityjs/outbox-cli exposes four commands for operators:
| Command | What it does |
|---|---|
outbox:dispatch [--limit N] [--loop] | Drain pending rows now. --loop repeats until empty. Output: fetched=N dispatched=M failed=K dead=D. |
outbox:stats | Print pending=N dispatched=M dead=D total=T. |
outbox:list [--state STATE] [--limit N] | Read-only listing (no claim). STATE is pending, dispatched, or dead. Default limit 20. |
outbox:retry <id> | Requeue a single row — clears dispatchedAt / deadAt / claim, resets attempts to 0. |
Typical poison-message workflow:
$ myapp outbox:stats
outbox:stats pending=4 dispatched=12873 dead=2 total=12879
$ myapp outbox:list --state dead
abc123 state=dead topic=order.refunded attempts=10 createdAt=... lastError="Stripe webhook 502"
# Fix the consumer / clear the upstream issue, then:
$ myapp outbox:retry abc123
outbox:retry id=abc123 requeuedThe same operations are available programmatically via injected OutboxStore (count, peek, requeue, markDead, purgeDispatched) — useful for an admin-UI dashboard or scripted recovery.
Telemetry
@modularityjs/outbox-telemetry wires three OTel spans automatically:
| Span | Wraps | Attributes |
|---|---|---|
outbox enqueue | OutboxStore.enqueue | outbox.entry_count, outbox.topic (or outbox.topics) |
outbox dispatch | OutboxDispatcher.dispatchPending | outbox.limit, outbox.fetched, outbox.dispatched, outbox.failed, outbox.dead (ERROR status set when failed or dead > 0) |
outbox publish | OutboxPublisher.publish | outbox.row_id, outbox.topic, outbox.attempts |
Add OutboxTelemetryModule alongside TelemetryOtelModule and the spans appear in your tracing backend with no further wiring.
Failure Modes and Tuning
| Concern | Mitigation |
|---|---|
| Dispatcher crashes mid-publish | Claim becomes stale after claimTimeoutMs (default 5 min); another dispatcher reclaims and retries. |
| Publisher throws transiently | markFailed increments attempts, clears the claim; row is retried on the next tick. |
| Publisher throws permanently (poison message) | Set OutboxConfig.maxAttempts to a sane bound (e.g. 10). After exhaustion the row goes to dead; triage via outbox:list --state dead. |
| Two dispatcher instances racing | Transactional UPDATE WHERE not-claimed claim ensures disjoint batches; safe to scale dispatchers horizontally. |
| At-least-once duplicates | Listeners must be idempotent (same input → same effect, regardless of replay count). |
| Outbox table grows | OutboxRetentionModule purges dispatched rows older than retentionMs on a daily sweep by default. |
Choosing a Setup
| Need | Modules |
|---|---|
| Tests / single-process toy app | OutboxModule, OutboxMemoryModule, custom publisher |
| Production with database + EventBus listeners | OutboxModule, OutboxTypeormModule.forRoot({...}), OutboxEventsModule |
| Production publishing to webhooks / Kafka / queue | OutboxModule, OutboxTypeormModule.forRoot({...}), custom OutboxPublisher driver |