Background workers
Three primitives carry async work in ModularityJS, and they're not interchangeable:
| Use | Reach for |
|---|---|
| Hand off a task to be done soon, on some worker (fan-out) | @modularityjs/queue |
| Run something on a schedule (cron) with multi-replica safety | @modularityjs/scheduler |
| Deliver an event transactionally with the DB write that caused it | @modularityjs/outbox |
The most common mistake is using a queue when you actually need an outbox — losing events when a transaction rolls back is much worse than a five-second delivery lag.
Queue
Producers call QueueService.publish(topic, payload). Consumers are class methods marked with @Consume({ name, topic }) and discovered via QueueConsumersPool. The driver picks each pending message up and runs the consumer; each driver implements retries and dead-letter routing according to RetryPolicy.
import { Consume, QueueConsumersPool, QueueService } from '@modularityjs/queue';
@Injectable()
class EmailWorker {
@Consume({
name: 'send-welcome',
topic: 'user.signups',
retryPolicy: { maxAttempts: 5, strategy: 'exponential', delayMs: 1000 },
concurrency: 4,
})
async onSignup(message: QueueMessage<{ userId: string }>) {
// ... do work; throw to retry, return to ack
}
}
@Module({
imports: [QueueModule],
providers: [EmailWorker],
pools: [
{ pool: QueueConsumersPool, key: 'email:welcome', useClass: EmailWorker },
],
})
class EmailModule {}Two drivers:
@modularityjs/queue-memory— single-process. Useful in dev and tests.@modularityjs/queue-redis— multi-replica. Use this in production.
Defaults: maxAttempts: 3, exponential backoff starting at 1 s, capped at 30 s. After the last attempt, the message lands in the topic's dead-letter list (getDeadLetters(topic) / purgeDeadLetters(topic)).
See Queue.
Scheduler
The scheduler does not use a decorator. Jobs are class instances of ScheduledJob contributed to ScheduledJobsPool:
import { ScheduledJob, ScheduledJobsPool } from '@modularityjs/scheduler';
@Injectable()
class RebuildSearchIndex implements ScheduledJob {
readonly name = 'rebuild-search-index';
readonly schedule = '0 3 * * *'; // 03:00 daily
readonly lockTtlMs = 5 * 60 * 1000; // expected runtime upper bound
async execute() {
// ...
}
}
@Module({
imports: [SchedulerModule],
providers: [RebuildSearchIndex],
pools: [
{
pool: ScheduledJobsPool,
key: 'search:rebuild',
useClass: RebuildSearchIndex,
},
],
})
class SearchOpsModule {}The croner driver (@modularityjs/scheduler-croner) acquires a LockService lock before each fire — scheduler:{job-name} with the job's lockTtlMs. Pair it with @modularityjs/lock-redis and a single Redis target across replicas: every replica fires on the cron tick, but only the lock holder runs execute(). The lock auto-expires after lockTtlMs, so set that to your job's worst-case runtime.
See Scheduler.
Outbox
When an event must ride with a DB write — "create user and emit user.created" — the queue is wrong. A queue publish that runs between the commit and the next line is gone if the DB rolls back; a queue publish before the commit is gone if delivery succeeds but the commit fails. The outbox solves this by writing the event row in the same transaction as the business write, then dispatching it asynchronously.
You need three drivers, not one — they sit on different contracts:
- A store driver (
OutboxStore) — where outbox rows live.outbox-memory,outbox-typeorm,outbox-prisma. - A publisher driver (
OutboxPublisher) — whatOutboxDispatcherdoes with each pending row. The contract's default is a no-op;outbox-eventsreplaces it withOutboxEventsPublisher, which turns each row into a typed bus event (OutboxEvent { topic, payload, occurredAt }). The most common choice — let downstream@OnEventhandlers pick what they care about. - A tick driver — when
OutboxDispatcher.dispatchPending()runs.outbox-schedulerregisters aScheduledJob(default*/1 * * * * *, every second) that calls it. The job pairs with any publisher; nothing dispatches without a tick driver.
The pair you almost always want in production:
DatabaseModule.forRoot({ migrationsRun: false }),
DatabaseTypeormModule.forRoot({ type: 'postgres', /* ... */ }),
SchedulerModule, // contract
SchedulerCronerModule, // scheduler driver — without this, the tick job never fires
OutboxModule.forRoot({ batchSize: 100 }),
OutboxTypeormModule, // store: ships an OutboxEntity for migrations
OutboxEventsModule, // publisher: rows → events
OutboxSchedulerModule.forRoot({ schedule: '*/1 * * * * *' }), // tickApplication code writes to the store as part of its existing transaction:
@Injectable()
class UsersService {
constructor(
@Inject(DataSource) private readonly db: DataSource,
@Inject(OutboxStore) private readonly outbox: OutboxStore,
) {}
async create(input: CreateUserInput) {
await this.db.transaction(async (tx) => {
const user = await tx.getRepository(User).save(input);
await this.outbox.enqueue([
{ topic: 'user.created', payload: { id: user.id } },
]);
return user;
});
}
}Downstream code listens on the events bus, not the queue:
@OnEvent(OutboxEvent, { name: 'send-welcome-on-user-created' })
async on(event: OutboxEvent) {
if (event.topic === 'user.created') {
await this.queue.publish('user.signups', event.payload);
}
}Handlers must be idempotent — the dispatcher retries until the row is marked dispatched, so duplicate deliveries during partial failures are expected.
See Outbox.
Events fan-out
@modularityjs/events is the in-process bus by default. @modularityjs/events-redis swaps in a pub/sub-backed bus that delivers each event to every subscribed replica.
Two @OnEvent options are easy to misuse:
local: true— handler only runs on the replica that calleddispatch(). Use for in-memory caches / side-effects that shouldn't propagate.once: true— handler runs exactly once across the cluster (Redis tracks the event id). Use for "send the welcome email" type one-shot effects.once: true+local: trueis a validation error — they want opposite scopes.
See Events.
Process topology
You don't need separate codebases for web and workers — same image, different module list. Keep a shared BaseAppModule (services, DI wiring, no entrypoints) and add the listener modules per process:
// src/main-web.ts
await createApp({
di: inversify,
modules: [
...base,
HttpModule.forRoot({ port: 8080 }),
HttpFastifyModule,
AppHttpModule,
],
});
// src/main-worker.ts
await createApp({
di: inversify,
modules: [
...base,
OutboxSchedulerModule,
AppQueueConsumersModule, // your @Consume() classes registered into QueueConsumersPool
AppWorkersModule,
],
});
// src/main-scheduler.ts (optional split)
await createApp({ di: inversify, modules: [...base, AppScheduledJobsModule] });Notes for the worker entry point:
- It does not need
HttpFastifyModule— drop it to keep the worker's surface area small. - It does need
RedisModuleif any of your distributed drivers (queue, lock, events, cache) use Redis. - Use the boot smoke gate on each entry point in CI.
- Graceful shutdown drains in-flight messages because
onShutdownrunsservice.stop()on the queue driver beforeonDestroycloses Redis. SetterminationGracePeriodSecondshigher thanQueueConfig.shutdownTimeoutMs(default 30 s) — that's the deadline the drain races against.
Choosing between queue and outbox
A short decision flow:
- Is the side-effect tied to a DB write that might roll back? → outbox.
- Does the work need to run on a schedule? → scheduler.
- Does it just need to happen off the request thread? → queue.
- Should every replica react (broadcast)? → events (with
events-redis). - Does it need to run once in the cluster regardless of caller? →
@OnEvent({ once: true })onevents-redis, or a scheduled job with a lock.
Next Steps
- Queue —
@Consume, retry policy, dead letters - Scheduler —
ScheduledJob, croner driver - Outbox — store + publisher split
- Events —
@OnEvent,local/once - Scaling out — flipping memory drivers to Redis