Queue
Contract
@modularityjs/queue defines the abstract QueueService and @Consume decorator:
typescript
abstract class QueueService {
abstract publish<T>(
topic: string,
payload: T,
options?: PublishOptions,
): Promise<void>;
abstract publishBatch<T>(
messages: Array<{ topic: string; payload: T; options?: PublishOptions }>,
): Promise<void>;
abstract getDeadLetters(topic?: string): Promise<QueueMessage[]>;
abstract purgeDeadLetters(topic?: string): Promise<number>;
}Drivers
Memory (@modularityjs/queue-memory)
In-memory queue with immediate dispatch. For development and testing.
typescript
import { QueueModule } from '@modularityjs/core';
import { QueueMemoryModule } from '@modularityjs/queue-memory';
const modules = [QueueModule, QueueMemoryModule];Redis (@modularityjs/queue-redis)
Redis Streams-backed queue with consumer groups, retry with backoff, dead letter queues, and delayed messages. Requires @modularityjs/redis.
typescript
import { QueueModule } from '@modularityjs/core';
import { QueueRedisModule } from '@modularityjs/queue-redis';
import { RedisModule } from '@modularityjs/redis';
const modules = [
RedisModule.forRoot({ host: 'localhost' }),
QueueModule,
QueueRedisModule,
];Publishing
typescript
import { Inject, Injectable, QueueService } from '@modularityjs/core';
@Injectable()
class OrderService {
constructor(@Inject(QueueService) private readonly queue: QueueService) {}
async placeOrder(order: Order): Promise<void> {
await this.queue.publish('order.placed', order);
}
}Consuming
Consumers use the @Consume decorator. Register the consumer class in the QueueConsumersPool:
typescript
import { Consume, QueueConsumersPool } from '@modularityjs/core';
@Injectable()
class OrderConsumer {
@Consume({ topic: 'order.placed', name: 'send-confirmation' })
async handleOrderPlaced(message: QueueMessage<Order>): Promise<void> {
await sendConfirmationEmail(message.payload);
}
}
@Module({
name: 'order-consumers',
imports: [QueueModule],
providers: [OrderConsumer],
pools: [
{
pool: QueueConsumersPool,
key: 'order-consumer',
useClass: OrderConsumer,
},
],
})
class OrderConsumersModule {}Retry Policy
@Consume supports configurable retry:
typescript
@Consume({
topic: 'order.placed',
name: 'send-confirmation',
retryPolicy: {
maxAttempts: 5,
strategy: 'exponential', // or 'fixed'
delayMs: 1000,
maxDelayMs: 30_000,
},
concurrency: 3,
})Messages that exhaust all retries are moved to the dead letter queue.