Skip to content

Queue

Contract

@modularityjs/queue defines the abstract QueueService and @Consume decorator:

typescript
abstract class QueueService {
  abstract publish<T>(
    topic: string,
    payload: T,
    options?: PublishOptions,
  ): Promise<void>;
  abstract publishBatch<T>(
    messages: Array<{ topic: string; payload: T; options?: PublishOptions }>,
  ): Promise<void>;
  abstract getDeadLetters(topic?: string): Promise<QueueMessage[]>;
  abstract purgeDeadLetters(topic?: string): Promise<number>;
}

Drivers

Memory (@modularityjs/queue-memory)

In-memory queue with immediate dispatch. For development and testing.

typescript
import { QueueModule } from '@modularityjs/core';
import { QueueMemoryModule } from '@modularityjs/queue-memory';

const modules = [QueueModule, QueueMemoryModule];

Redis (@modularityjs/queue-redis)

Redis Streams-backed queue with consumer groups, retry with backoff, dead letter queues, and delayed messages. Requires @modularityjs/redis.

typescript
import { QueueModule } from '@modularityjs/core';
import { QueueRedisModule } from '@modularityjs/queue-redis';
import { RedisModule } from '@modularityjs/redis';

const modules = [
  RedisModule.forRoot({ host: 'localhost' }),
  QueueModule,
  QueueRedisModule,
];

Publishing

typescript
import { Inject, Injectable, QueueService } from '@modularityjs/core';

@Injectable()
class OrderService {
  constructor(@Inject(QueueService) private readonly queue: QueueService) {}

  async placeOrder(order: Order): Promise<void> {
    await this.queue.publish('order.placed', order);
  }
}

Consuming

Consumers use the @Consume decorator. Register the consumer class in the QueueConsumersPool:

typescript
import { Consume, QueueConsumersPool } from '@modularityjs/core';

@Injectable()
class OrderConsumer {
  @Consume({ topic: 'order.placed', name: 'send-confirmation' })
  async handleOrderPlaced(message: QueueMessage<Order>): Promise<void> {
    await sendConfirmationEmail(message.payload);
  }
}

@Module({
  name: 'order-consumers',
  imports: [QueueModule],
  providers: [OrderConsumer],
  pools: [
    {
      pool: QueueConsumersPool,
      key: 'order-consumer',
      useClass: OrderConsumer,
    },
  ],
})
class OrderConsumersModule {}

Retry Policy

@Consume supports configurable retry:

typescript
@Consume({
  topic: 'order.placed',
  name: 'send-confirmation',
  retryPolicy: {
    maxAttempts: 5,
    strategy: 'exponential', // or 'fixed'
    delayMs: 1000,
    maxDelayMs: 30_000,
  },
  concurrency: 3,
})

Messages that exhaust all retries are moved to the dead letter queue.