Queue
Contract
@modularityjs/queue defines the abstract QueueService and @Consume decorator:
abstract class QueueService {
abstract publish<T>(
topic: string,
payload: T,
options?: PublishOptions,
): Promise<void>;
abstract publishBatch<T>(
messages: Array<{ topic: string; payload: T; options?: PublishOptions }>,
): Promise<void>;
abstract getDeadLetters(topic?: string): Promise<QueueMessage[]>;
abstract purgeDeadLetters(topic?: string): Promise<number>;
}Drivers
Memory (@modularityjs/queue-memory)
In-memory queue with immediate dispatch. For development and testing.
import { QueueModule } from '@modularityjs/queue';
import { QueueMemoryModule } from '@modularityjs/queue-memory';
const modules = [QueueModule, QueueMemoryModule];Redis (@modularityjs/queue-redis)
Redis Streams-backed queue with consumer groups, retry with backoff, dead letter queues, and delayed messages. Requires @modularityjs/redis.
import { QueueModule } from '@modularityjs/queue';
import { QueueRedisModule } from '@modularityjs/queue-redis';
import { RedisModule } from '@modularityjs/redis';
const modules = [
RedisModule.forRoot({ host: 'localhost' }),
QueueModule,
QueueRedisModule,
];Publishing
import { Inject, Injectable } from '@modularityjs/di';
import { QueueService } from '@modularityjs/queue';
@Injectable()
class OrderService {
constructor(@Inject(QueueService) private readonly queue: QueueService) {}
async placeOrder(order: Order): Promise<void> {
await this.queue.publish('order.placed', order);
}
}Consuming
Consumers use the @Consume decorator. Register the consumer class in the QueueConsumersPool:
import { Consume, QueueConsumersPool } from '@modularityjs/queue';
@Injectable()
class OrderConsumer {
@Consume({ topic: 'order.placed', name: 'send-confirmation' })
async handleOrderPlaced(message: QueueMessage<Order>): Promise<void> {
await sendConfirmationEmail(message.payload);
}
}
@Module({
name: 'order-consumers',
imports: [QueueModule],
providers: [OrderConsumer],
pools: [
{
pool: QueueConsumersPool,
key: 'order-consumer',
useClass: OrderConsumer,
},
],
})
class OrderConsumersModule {}Retry Policy
@Consume supports configurable retry:
@Consume({
topic: 'order.placed',
name: 'send-confirmation',
retryPolicy: {
maxAttempts: 5,
strategy: 'exponential', // or 'fixed'
delayMs: 1000,
maxDelayMs: 30_000,
},
concurrency: 3,
})Messages that exhaust all retries are moved to the dead letter queue.
Configuration
QueueModule:
QueueModule.forRoot({
defaultRetryPolicy: {
maxAttempts: 5,
strategy: 'exponential',
delayMs: 2000,
maxDelayMs: 60_000,
},
shutdownTimeoutMs: 10_000,
});| Option | Default | Description |
|---|---|---|
defaultRetryPolicy.maxAttempts | 3 | Maximum delivery attempts before dead-lettering |
defaultRetryPolicy.strategy | 'exponential' | 'fixed' or 'exponential' backoff |
defaultRetryPolicy.delayMs | 1000 | Base delay between retries in ms |
defaultRetryPolicy.maxDelayMs | 30000 | Upper bound for exponential backoff in ms |
shutdownTimeoutMs | 30000 | Grace period for in-flight messages during shutdown |
QueueRedisModule:
QueueRedisModule.forRoot({
keyNamespace: 'queue:',
consumerGroup: 'default',
blockTimeoutMs: 5000,
batchSize: 10,
});| Option | Default | Description |
|---|---|---|
keyNamespace | 'queue:' | Prefix for Redis Stream keys |
consumerGroup | 'default' | Redis consumer group name |
consumerId | auto-generated UUID | Unique consumer identifier |
blockTimeoutMs | 5000 | XREADGROUP block timeout in ms |
batchSize | 10 | Max messages per read |
Publish Options
The publish method accepts an optional options object for delayed delivery and metadata headers:
await queue.publish('order.placed', order, {
delayMs: 30_000, // delay delivery by 30 seconds
headers: { 'x-correlation-id': traceId }, // metadata
});Message Shape
Consumers receive a QueueMessage<T> with the following shape:
interface QueueMessage<T = unknown> {
readonly id: string;
readonly topic: string;
readonly payload: T;
readonly headers: Record<string, string>;
readonly attempt: number; // starts at 1, increments on retry
readonly publishedAt: Date;
}Dead Letter Queue
Messages that exhaust all retries are moved to the dead letter queue. Access them programmatically or via CLI:
const deadLetters = await queue.getDeadLetters('order.placed');
// Re-process or inspect...
await queue.purgeDeadLetters('order.placed');CLI:
myapp queue:dead-letter:listConcurrency
The concurrency option on @Consume controls parallel message processing per consumer. The default is 1 (sequential).
@Consume({ topic: 'email.send', name: 'send-email', concurrency: 5 })
async handleSend(message: QueueMessage<Email>): Promise<void> {
await sendEmail(message.payload);
}Use 1 for ordering-sensitive work. Higher values (e.g., 3-10) are appropriate for independent I/O-bound tasks like sending emails or calling external APIs.
Serialization
Payloads are serialized with JSON.stringify and deserialized with JSON.parse. Values must be JSON-serializable -- Date instances become ISO strings and class instances lose their prototype chain.