Skip to main content

RabbitMQ

Overview

The @opra/rabbitmq package provides RabbitmqAdapter — a platform adapter that integrates an OPRA ApiDocument into a RabbitMQ consumer setup. The adapter connects over AMQP, subscribes to queues derived from your operation channel declarations, decodes incoming messages against your schema (including gzip, deflate, brotli, and base64 encodings), dispatches to your operation handlers, and manages the connection lifecycle — all driven by the type declarations in your schema.


Installation

npm install @opra/rabbitmq

Setup

Create your ApiDocument with transport: 'mq' and platform: 'rabbitmq', pass it to RabbitmqAdapter, then call start().

import { RabbitmqAdapter } from '@opra/rabbitmq';
import { ApiDocumentFactory } from '@opra/common';
import { OrdersController } from './api/orders.controller.js';

const document = await ApiDocumentFactory.createDocument({
info: { title: 'Order API', version: '1.0' },
api: {
transport: 'mq',
platform: 'rabbitmq',
controllers: [OrdersController],
},
});

const adapter = new RabbitmqAdapter(document, {
connection: 'amqp://guest:guest@localhost:5672',
});

await adapter.start();

process.on('SIGTERM', async () => {
await adapter.close();
process.exit(0);
});

RabbitmqAdapter establishes the AMQP connection, creates a consumer for each declared operation channel, and starts dispatching messages. No manual queue wiring is needed.


Adapter Options (RabbitmqAdapter.Config)

OptionTypeDescription
connectionstring | string[] | ConnectionOptionsAMQP connection URL(s) or a ConnectionOptions object.
defaults{ consumer?: ConsumerConfig }Default consumer options applied to all operations unless overridden.
scopestringValidation scope applied during message decoding.
interceptors(InterceptorFunction | IRabbitmqInterceptor)[]Interceptor chain executed on every message.
logExtrabooleanLog additional diagnostic output from the AMQP client.

ConnectionOptions

OptionTypeDescription
urlsstring[]List of AMQP broker URLs.
usernamestringAMQP username.
passwordstringAMQP password.
heartbeatnumberHeartbeat interval in seconds.
connectionTimeoutnumberConnection timeout in milliseconds.
retryLownumberMinimum retry delay in milliseconds.
retryHighnumberMaximum retry delay in milliseconds.
tlsobjectTLS options.

Defining Operations

Use @MQOperation() on a controller method to declare an operation. The channel property maps to a RabbitMQ queue name. Import @opra/rabbitmq to unlock the .RabbitMQ() chained decorator for per-operation consumer configuration.

import { MQController, MQOperation, ApiField, ComplexType } from '@opra/common';
import '@opra/rabbitmq'; // augments MQOperationDecorator with .RabbitMQ()

@ComplexType()
class OrderPayload {
@ApiField() declare orderId: string;
@ApiField() declare amount: number;
}

@MQController()
export class OrdersController {
@MQOperation({ channel: 'orders.created', type: OrderPayload })
async onOrderCreated(ctx: RabbitmqContext) {
const { content, headers, queue } = ctx;
console.log('Received order:', content.orderId);
}
}

Per-operation consumer config (.RabbitMQ())

Chain .RabbitMQ() on @MQOperation() to override consumer options for a specific operation.

@(MQOperation({ channel: 'payments.processed', type: PaymentPayload })
.RabbitMQ({
concurrency: 5,
requeue: false,
qos: { prefetchCount: 10 },
queueOptions: { durable: true },
}))
async onPayment(ctx: RabbitmqContext) { ... }

You can also pass a resolver function for dynamic configuration:

@(MQOperation({ channel: 'orders.created', type: OrderPayload })
.RabbitMQ(async () => {
const cfg = await loadConfig();
return { concurrency: cfg.concurrency };
}))
async onOrderCreated(ctx: RabbitmqContext) { ... }

ConsumerConfig options

OptionTypeDescription
concurrencynumberNumber of messages processed in parallel.
requeuebooleanWhether to requeue messages on failure.
qos{ prefetchCount?: number }Quality-of-service settings.
queueOptionsobjectAMQP queue declaration options (e.g. durable, arguments).
exchangesobject[]Exchange bindings to declare.
exchangeBindingsobject[]Additional exchange-to-queue bindings.
exclusivebooleanDeclare the queue as exclusive to this connection.

RabbitmqContext

Every operation handler receives a RabbitmqContext as its first argument.

import { RabbitmqContext } from '@opra/rabbitmq';

async onOrderCreated(ctx: RabbitmqContext) {
ctx.queue // queue name the message arrived on
ctx.content // decoded message payload
ctx.headers // decoded message headers as a plain object
ctx.message // raw AsyncMessage from the AMQP client
ctx.consumer // the Consumer instance
await ctx.reply({ status: 'ok' }) // send a reply (RPC pattern)
}
PropertyTypeDescription
queuestringThe queue the message arrived on.
contentanyDecoded message payload (validated against the operation's type). Content-encoding (gzip, deflate, brotli, base64) and content-type (JSON, text) are handled automatically.
headersRecord<string, any>Decoded message headers.
messagerabbit.AsyncMessageThe raw AMQP message object.
consumerrabbit.ConsumerThe consumer that received the message.
replyReplyFunctionSends a reply message — used for RPC-style request/reply flows.

Request / Reply (RPC pattern)

If an operation returns a value, the adapter automatically calls reply() with the return value. You can also call ctx.reply() directly for more control.

@MQOperation({ channel: 'orders.get', type: OrderPayload })
async getOrder(ctx: RabbitmqContext) {
const order = await this.service.findById(ctx.content.orderId);
return order; // automatically sent as reply
}

Lifecycle

MethodDescription
adapter.start()Establishes the AMQP connection and subscribes to all queues.
adapter.close()Closes all consumers and the connection, resets to idle.
adapter.statusCurrent status: 'idle' | 'starting' | 'started'.
adapter.connectionThe active rabbit.Connection instance, or undefined if not started.

Interceptors

Interceptors run before the operation handler on every incoming message.

import { RabbitmqContext } from '@opra/rabbitmq';

const adapter = new RabbitmqAdapter(document, {
connection: 'amqp://localhost',
interceptors: [
async (ctx: RabbitmqContext, next) => {
console.log('Message on queue:', ctx.queue);
await next();
},
],
});

Class form:

import { IRabbitmqInterceptor, RabbitmqContext } from '@opra/rabbitmq';

class AuthInterceptor implements IRabbitmqInterceptor {
async intercept(ctx: RabbitmqContext, next: () => Promise<any>) {
if (!ctx.headers['x-api-key']) throw new Error('Unauthorized');
await next();
}
}

const adapter = new RabbitmqAdapter(document, {
connection: 'amqp://localhost',
interceptors: [new AuthInterceptor()],
});

Error Handling

Throw any OpraException from a handler — the adapter catches it, emits an error event, and logs it without crashing the consumer.

import { OpraException } from '@opra/common';

async onOrderCreated(ctx: RabbitmqContext) {
if (!ctx.content.orderId) {
throw new OpraException('Missing orderId');
}
}

Listen to the adapter's error event to handle errors centrally:

adapter.on('error', (error, ctx) => {
console.error('RabbitMQ error:', error.message, ctx?.queue);
});

Events

EventPayloadDescription
messagerabbit.AsyncMessage, queue: stringEmitted when a raw message is received, before decoding.
executeRabbitmqContextEmitted just before the operation handler is called.
finishRabbitmqContext, resultEmitted after the operation handler completes successfully.
errorError, RabbitmqContext | undefinedEmitted when an error occurs in a handler or interceptor.