Skip to content

Queue

Contract

@modularityjs/queue defines the abstract QueueService and @Consume decorator:

typescript
abstract class QueueService {
  abstract publish<T>(
    topic: string,
    payload: T,
    options?: PublishOptions,
  ): Promise<void>;
  abstract publishBatch<T>(
    messages: Array<{ topic: string; payload: T; options?: PublishOptions }>,
  ): Promise<void>;
  abstract getDeadLetters(topic?: string): Promise<QueueMessage[]>;
  abstract purgeDeadLetters(topic?: string): Promise<number>;
}

Drivers

Memory (@modularityjs/queue-memory)

In-memory queue with immediate dispatch. For development and testing.

typescript
import { QueueModule } from '@modularityjs/queue';
import { QueueMemoryModule } from '@modularityjs/queue-memory';

const modules = [QueueModule, QueueMemoryModule];

Redis (@modularityjs/queue-redis)

Redis Streams-backed queue with consumer groups, retry with backoff, dead letter queues, and delayed messages. Requires @modularityjs/redis.

typescript
import { QueueModule } from '@modularityjs/queue';
import { QueueRedisModule } from '@modularityjs/queue-redis';
import { RedisModule } from '@modularityjs/redis';

const modules = [
  RedisModule.forRoot({ host: 'localhost' }),
  QueueModule,
  QueueRedisModule,
];

Publishing

typescript
import { Inject, Injectable } from '@modularityjs/di';
import { QueueService } from '@modularityjs/queue';

@Injectable()
class OrderService {
  constructor(@Inject(QueueService) private readonly queue: QueueService) {}

  async placeOrder(order: Order): Promise<void> {
    await this.queue.publish('order.placed', order);
  }
}

Consuming

Consumers use the @Consume decorator. Register the consumer class in the QueueConsumersPool:

typescript
import { Consume, QueueConsumersPool } from '@modularityjs/queue';

@Injectable()
class OrderConsumer {
  @Consume({ topic: 'order.placed', name: 'send-confirmation' })
  async handleOrderPlaced(message: QueueMessage<Order>): Promise<void> {
    await sendConfirmationEmail(message.payload);
  }
}

@Module({
  name: 'order-consumers',
  imports: [QueueModule],
  providers: [OrderConsumer],
  pools: [
    {
      pool: QueueConsumersPool,
      key: 'order-consumer',
      useClass: OrderConsumer,
    },
  ],
})
class OrderConsumersModule {}

Retry Policy

@Consume supports configurable retry:

typescript
@Consume({
  topic: 'order.placed',
  name: 'send-confirmation',
  retryPolicy: {
    maxAttempts: 5,
    strategy: 'exponential', // or 'fixed'
    delayMs: 1000,
    maxDelayMs: 30_000,
  },
  concurrency: 3,
})

Messages that exhaust all retries are moved to the dead letter queue.

Configuration

QueueModule:

typescript
QueueModule.forRoot({
  defaultRetryPolicy: {
    maxAttempts: 5,
    strategy: 'exponential',
    delayMs: 2000,
    maxDelayMs: 60_000,
  },
  shutdownTimeoutMs: 10_000,
});
OptionDefaultDescription
defaultRetryPolicy.maxAttempts3Maximum delivery attempts before dead-lettering
defaultRetryPolicy.strategy'exponential''fixed' or 'exponential' backoff
defaultRetryPolicy.delayMs1000Base delay between retries in ms
defaultRetryPolicy.maxDelayMs30000Upper bound for exponential backoff in ms
shutdownTimeoutMs30000Grace period for in-flight messages during shutdown

QueueRedisModule:

typescript
QueueRedisModule.forRoot({
  keyNamespace: 'queue:',
  consumerGroup: 'default',
  blockTimeoutMs: 5000,
  batchSize: 10,
});
OptionDefaultDescription
keyNamespace'queue:'Prefix for Redis Stream keys
consumerGroup'default'Redis consumer group name
consumerIdauto-generated UUIDUnique consumer identifier
blockTimeoutMs5000XREADGROUP block timeout in ms
batchSize10Max messages per read

Publish Options

The publish method accepts an optional options object for delayed delivery and metadata headers:

typescript
await queue.publish('order.placed', order, {
  delayMs: 30_000, // delay delivery by 30 seconds
  headers: { 'x-correlation-id': traceId }, // metadata
});

Message Shape

Consumers receive a QueueMessage<T> with the following shape:

typescript
interface QueueMessage<T = unknown> {
  readonly id: string;
  readonly topic: string;
  readonly payload: T;
  readonly headers: Record<string, string>;
  readonly attempt: number; // starts at 1, increments on retry
  readonly publishedAt: Date;
}

Dead Letter Queue

Messages that exhaust all retries are moved to the dead letter queue. Access them programmatically or via CLI:

typescript
const deadLetters = await queue.getDeadLetters('order.placed');
// Re-process or inspect...
await queue.purgeDeadLetters('order.placed');

CLI:

bash
myapp queue:dead-letter:list

Concurrency

The concurrency option on @Consume controls parallel message processing per consumer. The default is 1 (sequential).

typescript
@Consume({ topic: 'email.send', name: 'send-email', concurrency: 5 })
async handleSend(message: QueueMessage<Email>): Promise<void> {
  await sendEmail(message.payload);
}

Use 1 for ordering-sensitive work. Higher values (e.g., 3-10) are appropriate for independent I/O-bound tasks like sending emails or calling external APIs.

Serialization

Payloads are serialized with JSON.stringify and deserialized with JSON.parse. Values must be JSON-serializable -- Date instances become ISO strings and class instances lose their prototype chain.