Skip to content

Outbox

@modularityjs/outbox implements the transactional outbox pattern — durable, at-least-once delivery of events whose visibility must commit atomically with the database writes that produced them.

The Problem

The classic broken pattern: write to the database, then publish an event.

typescript
await db.transaction(async () => {
  await db.update('orders', { id, status: 'paid' });
  await events.publish(new OrderPaid({ id })); // can fail independently
});

Three failure modes that retries cannot fix:

  1. DB commits, then events.publish throws (network blip) → lost event.
  2. events.publish succeeds, then the transaction rolls back → phantom event.
  3. The process crashes between commit and publish → same as (1).

The Pattern

Don't publish directly. Inside the same transaction, write to a regular outbox table:

typescript
await db.transaction(async (em) => {
  await em.update(Order, { id }, { status: 'paid' });
  await outbox.enqueueWith(em, [{ topic: 'order.paid', payload: { id } }]);
});

Both writes commit atomically. A separate dispatcher polls the outbox, publishes pending rows to the real bus, and marks them dispatched on success — survives crashes, survives rollbacks, costs you one poll-interval of latency in exchange for never losing or hallucinating events.

Contract

typescript
abstract class OutboxStore {
  // Hot path
  abstract enqueue(entries: OutboxEntry[]): Promise<void>;
  abstract fetchPending(limit: number): Promise<OutboxRow[]>;
  abstract markDispatched(ids: string[]): Promise<void>;
  abstract markFailed(id: string, error: string): Promise<void>;
  abstract markDead(id: string, error: string): Promise<void>;

  // Maintenance / inspection
  abstract count(): Promise<OutboxCounts>;
  abstract peek(
    filter: { state?: OutboxState },
    limit: number,
  ): Promise<OutboxRow[]>;
  abstract requeue(id: string): Promise<void>;
  abstract purgeDispatched(olderThan: Date): Promise<number>;
}

abstract class OutboxPublisher {
  abstract publish(row: OutboxRow): Promise<void>;
}

interface OutboxRow {
  id: string;
  topic: string;
  payload: unknown;
  createdAt: Date;
  dispatchedAt?: Date;
  deadAt?: Date;
  attempts: number;
  lastError?: string;
}

type OutboxState = 'pending' | 'dispatched' | 'dead';

interface OutboxCounts {
  pending: number;
  dispatched: number;
  dead: number;
  total: number;
}

A row is in exactly one of three states:

  • pendingdispatchedAt and deadAt are both null. Returned by fetchPending.
  • dispatcheddispatchedAt is set. Excluded from fetchPending. Eligible for retention purging.
  • deaddeadAt is set after attempts reached OutboxConfig.maxAttempts. Excluded from fetchPending. Operators triage manually (outbox:list --state dead, then either fix the consumer and outbox:retry <id>, or accept the loss).

OutboxDispatcher.dispatchPending does the work:

typescript
class OutboxDispatcher {
  async dispatchPending(limit?: number): Promise<DispatchSummary> {
    // 1. fetchPending(batchSize) — claims rows
    // 2. for each row: publisher.publish(row)
    //    - on success: collect id for bulk markDispatched
    //    - on throw:   if attempts+1 >= maxAttempts → markDead, else markFailed
    // 3. bulk markDispatched(successful ids)
    // returns { fetched, dispatched, failed, dead }
  }
}

OutboxConfig.batchSize (default 50) controls fetch size. OutboxConfig.maxAttempts (default unlimited) caps retries — once the next attempt would equal it, a failed publish moves the row to dead instead of staying pending. Apps drive the dispatcher from their scheduler of choice; built-in options are described below.

Stores

TypeORM (@modularityjs/outbox-database-typeorm)

Persists rows in an outbox table managed by the TypeORM driver. The entity is automatically registered via DatabaseEntitiesPool, so synchronize: true (or a generated migration) creates the table.

typescript
import { DatabaseModule } from '@modularityjs/database';
import { DatabaseTypeormModule } from '@modularityjs/database-typeorm';
import { OutboxModule } from '@modularityjs/outbox';
import {
  OutboxTypeormModule,
  TypeormOutboxStore,
} from '@modularityjs/outbox-database-typeorm';

const modules = [
  DatabaseModule,
  DatabaseTypeormModule.forRoot({ type: 'postgres', synchronize: true }),
  OutboxModule,
  OutboxTypeormModule.forRoot({ claimTimeoutMs: 5 * 60_000 }),
];

Transactional Enqueue

To get the atomicity guarantee, inject TypeormOutboxStore (driver-typed, not the abstract OutboxStore) and use enqueueWith:

typescript
@Injectable()
class OrderService {
  constructor(
    @Inject(DataSource) private readonly db: DataSource,
    @Inject(TypeormOutboxStore) private readonly outbox: TypeormOutboxStore,
  ) {}

  async pay(id: string): Promise<void> {
    await this.db.transaction(async (em) => {
      await em.update(Order, { id }, { status: 'paid' });
      await this.outbox.enqueueWith(em, [
        { topic: 'order.paid', payload: { id } },
      ]);
    });
  }
}

Both writes commit (or roll back) together.

Multi-Dispatcher Safety

fetchPending opens a short transaction that:

  1. Selects candidate rows where dispatched_at IS NULL and not currently claimed (or whose claim is older than claimTimeoutMs).
  2. Issues an atomic UPDATE setting claimed_at = NOW() and a unique claim_token only on rows that are still unclaimed.
  3. Reads back the rows it actually claimed (filtered by claim_token).

Two dispatcher instances calling fetchPending see disjoint subsets — neither double-publishes a row. If a dispatcher crashes mid-publish, its claim becomes stale after claimTimeoutMs and another instance picks the row up. markFailed clears the claim immediately so the row is retried on the next tick.

Prisma (@modularityjs/outbox-database-prisma)

Same surface as the TypeORM driver, against a Prisma-managed outbox table. Postgres only in v0 — the driver issues raw SQL with $N placeholders via $queryRawUnsafe / $executeRawUnsafe.

Add the model to your schema.prisma:

prisma
model Outbox {
  id           String    @id @db.VarChar(36)
  topic        String    @db.VarChar(255)
  payload      Json?
  createdAt    DateTime  @default(now()) @map("created_at")
  dispatchedAt DateTime? @map("dispatched_at")
  deadAt       DateTime? @map("dead_at")
  claimedAt    DateTime? @map("claimed_at")
  claimToken   String?   @map("claim_token") @db.VarChar(36)
  attempts     Int       @default(0)
  lastError    String?   @map("last_error")

  @@map("outbox")
  @@index([dispatchedAt, deadAt, createdAt], name: "idx_outbox_pending")
  @@index([claimedAt], name: "idx_outbox_claimed")
}

Then wire it up:

typescript
import { OutboxModule } from '@modularityjs/outbox';
import { DatabasePrismaModule } from '@modularityjs/database-prisma';
import {
  OutboxPrismaModule,
  PrismaOutboxStore,
} from '@modularityjs/outbox-database-prisma';

const modules = [
  DatabasePrismaModule.forRoot({ clientFactory: () => new PrismaClient() }),
  OutboxModule,
  OutboxPrismaModule.forRoot({ claimTimeoutMs: 5 * 60_000 }),
];

For transactional enqueue, inject PrismaOutboxStore directly and use the transaction-scoped client from prisma.$transaction:

typescript
@Injectable()
class OrderService {
  constructor(
    @Inject(PrismaClientToken) private readonly prisma: PrismaClient,
    @Inject(PrismaOutboxStore) private readonly outbox: PrismaOutboxStore,
  ) {}

  async pay(id: string): Promise<void> {
    await this.prisma.$transaction(async (tx) => {
      await tx.order.update({ where: { id }, data: { status: 'paid' } });
      await this.outbox.enqueueWith(tx, [
        { topic: 'order.paid', payload: { id } },
      ]);
    });
  }
}

Memory (@modularityjs/outbox-memory)

In-memory store for tests and single-process apps. Non-durable across restarts.

typescript
import { OutboxModule } from '@modularityjs/outbox';
import { OutboxMemoryModule } from '@modularityjs/outbox-memory';

const modules = [OutboxModule, OutboxMemoryModule];

Publishers

OutboxPublisher is a contract with no default driver — apps wire one explicitly.

Events (@modularityjs/outbox-events)

Dispatches each outbox row as a generic OutboxEvent through the EventBus. Listeners use @OnEvent(OutboxEvent) and select by topic:

typescript
import { EventsModule, OnEvent } from '@modularityjs/events';
import { EventsMemoryModule } from '@modularityjs/events-memory';
import { OutboxEvent, OutboxEventsModule } from '@modularityjs/outbox-events';

const modules = [EventsModule, EventsMemoryModule, OutboxEventsModule];

@Injectable()
class OrderProjector {
  @OnEvent(OutboxEvent, { name: 'order-projector' })
  on(event: OutboxEvent) {
    if (event.topic === 'order.paid') {
      const { id } = event.payload as { id: string };
      // …project to read model, send email, etc.
    }
  }
}

If any handler errors, OutboxEventsPublisher throws — the outbox dispatcher then markFaileds the row and retries on the next tick. Listeners must be idempotent because at-least-once delivery means duplicates are possible (same row republished after a partial failure).

Custom Publisher

For HTTP webhooks, queue providers, Kafka, etc., implement OutboxPublisher directly:

typescript
@Injectable()
class HttpWebhookPublisher extends OutboxPublisher {
  async publish(row: OutboxRow): Promise<void> {
    const response = await fetch('https://example.com/hook', {
      method: 'POST',
      headers: { 'content-type': 'application/json' },
      body: JSON.stringify({ topic: row.topic, payload: row.payload }),
    });
    if (!response.ok) {
      throw new Error(`webhook ${response.status}`);
    }
  }
}

Bind it via preferences: [{ provide: OutboxPublisher, useClass: HttpWebhookPublisher }].

Driving the Dispatcher

OutboxDispatcher.dispatchPending() is invoked manually — the core contract has no built-in polling loop. Three options, in order of ergonomics:

Option 1 — outbox-scheduler extension (@modularityjs/outbox-scheduler)

The fastest path: drop in the extension module and a cron expression.

typescript
import { OutboxSchedulerModule } from '@modularityjs/outbox-scheduler';
import { SchedulerCronerModule } from '@modularityjs/scheduler-croner';

const modules = [
  // …outbox + scheduler driver…
  SchedulerCronerModule,
  OutboxSchedulerModule.forRoot({
    schedule: '*/1 * * * * *', // every second
    lockTtlMs: 30_000,
  }),
];

Internally this contributes an OutboxDispatchJob to ScheduledJobsPool whose execute() calls OutboxDispatcher.dispatchPending(). The scheduler driver handles distributed locking via lockTtlMs so multiple instances won't fire simultaneously.

Option 2 — outbox:dispatch CLI command (@modularityjs/outbox-cli)

For one-off drains or worker pods that run on a separate cron:

bash
$ myapp outbox:dispatch
outbox:dispatch fetched=12 dispatched=12 failed=0

# Drain everything pending
$ myapp outbox:dispatch --loop

# Smaller batch
$ myapp outbox:dispatch --limit 10

Add OutboxCliModule to the CLI app's modules list to register the command.

Option 3 — Roll your own

If you need custom scheduling (HTTP endpoint, queue worker, manual button in an admin UI), inject OutboxDispatcher and call it directly:

typescript
import { Inject, Injectable } from '@modularityjs/di';
import { OutboxDispatcher } from '@modularityjs/outbox';
import type { ScheduledJob } from '@modularityjs/scheduler';

@Injectable()
class OutboxPoller implements ScheduledJob {
  readonly name = 'outbox-poller';
  readonly schedule = '*/1 * * * * *';

  constructor(
    @Inject(OutboxDispatcher) private readonly dispatcher: OutboxDispatcher,
  ) {}

  async execute() {
    await this.dispatcher.dispatchPending();
  }
}

Retention

The outbox table grows monotonically — every dispatched row stays around forever unless something deletes it. OutboxRetentionModule (also in @modularityjs/outbox-scheduler) runs a periodic sweep that deletes dispatched rows older than the retention window. Dead rows are not purged; operators triage them by hand.

typescript
import {
  OutboxRetentionModule,
  OutboxSchedulerModule,
} from '@modularityjs/outbox-scheduler';

const modules = [
  OutboxSchedulerModule, // dispatch loop
  OutboxRetentionModule.forRoot({
    retentionMs: 7 * 24 * 60 * 60_000, // 7 days
    schedule: '0 3 * * *', // 03:00 UTC daily
  }),
];

Defaults: 7-day retention, daily 03:00 UTC sweep, 60s lock TTL. The job logs only when it actually deletes rows.

Inspection and Operations

@modularityjs/outbox-cli exposes four commands for operators:

CommandWhat it does
outbox:dispatch [--limit N] [--loop]Drain pending rows now. --loop repeats until empty. Output: fetched=N dispatched=M failed=K dead=D.
outbox:statsPrint pending=N dispatched=M dead=D total=T.
outbox:list [--state STATE] [--limit N]Read-only listing (no claim). STATE is pending, dispatched, or dead. Default limit 20.
outbox:retry <id>Requeue a single row — clears dispatchedAt / deadAt / claim, resets attempts to 0.

Typical poison-message workflow:

bash
$ myapp outbox:stats
outbox:stats pending=4 dispatched=12873 dead=2 total=12879

$ myapp outbox:list --state dead
abc123 state=dead topic=order.refunded attempts=10 createdAt=... lastError="Stripe webhook 502"

# Fix the consumer / clear the upstream issue, then:
$ myapp outbox:retry abc123
outbox:retry id=abc123 requeued

The same operations are available programmatically via injected OutboxStore (count, peek, requeue, markDead, purgeDispatched) — useful for an admin-UI dashboard or scripted recovery.

Telemetry

@modularityjs/outbox-telemetry wires three OTel spans automatically:

SpanWrapsAttributes
outbox enqueueOutboxStore.enqueueoutbox.entry_count, outbox.topic (or outbox.topics)
outbox dispatchOutboxDispatcher.dispatchPendingoutbox.limit, outbox.fetched, outbox.dispatched, outbox.failed, outbox.dead (ERROR status set when failed or dead > 0)
outbox publishOutboxPublisher.publishoutbox.row_id, outbox.topic, outbox.attempts

Add OutboxTelemetryModule alongside TelemetryOtelModule and the spans appear in your tracing backend with no further wiring.

Failure Modes and Tuning

ConcernMitigation
Dispatcher crashes mid-publishClaim becomes stale after claimTimeoutMs (default 5 min); another dispatcher reclaims and retries.
Publisher throws transientlymarkFailed increments attempts, clears the claim; row is retried on the next tick.
Publisher throws permanently (poison message)Set OutboxConfig.maxAttempts to a sane bound (e.g. 10). After exhaustion the row goes to dead; triage via outbox:list --state dead.
Two dispatcher instances racingTransactional UPDATE WHERE not-claimed claim ensures disjoint batches; safe to scale dispatchers horizontally.
At-least-once duplicatesListeners must be idempotent (same input → same effect, regardless of replay count).
Outbox table growsOutboxRetentionModule purges dispatched rows older than retentionMs on a daily sweep by default.

Choosing a Setup

NeedModules
Tests / single-process toy appOutboxModule, OutboxMemoryModule, custom publisher
Production with database + EventBus listenersOutboxModule, OutboxTypeormModule.forRoot({...}), OutboxEventsModule
Production publishing to webhooks / Kafka / queueOutboxModule, OutboxTypeormModule.forRoot({...}), custom OutboxPublisher driver