Skip to main content

MQ Api

Overview

OPRA's MQ API provides a transport-agnostic layer on top of brokers such as Apache Kafka, RabbitMQ, Amazon SQS, and others. You define your message consumers once using TypeScript decorators; the adapter package for your chosen broker handles connection management, serialization, acknowledgement, and error handling under the hood. Incoming payload and headers are automatically validated and decoded against their declared types before your handler is called, and any response payload is encoded and validated before it is published to the reply channel.

Message Receivedbroker → consumerChannel Matchingcontroller · operationHeadersdecode & validatePayloaddecode & validateHandlerasync methodResponseencode & publishoptionalerrornackreject · requeueerrornackreject · requeueerrordead-lettererror topic / DLQ

A Message Queue API in OPRA is built from two decorator types:

  • @MQController() — groups related message operations. Controllers can share headers across all their operations.
  • @MQOperation() — defines a single message consumer on a controller method. It declares the payload type, the channel(s) to subscribe to, and an optional response.

ApiDocument

import { ApiDocumentFactory } from '@opra/common';
import { MailConsumer } from './api/mail-consumer.js';
import { SendMailDto } from './dto/send-mail.dto.js';

const document = await ApiDocumentFactory.createDocument({
info: { title: 'Notification API', version: '1.0' },
types: [SendMailDto],
api: {
transport: 'mq',
platform: 'Kafka',
name: 'NotificationService',
controllers: [MailConsumer],
},
});

MQController

Decorate a class with @MQController() to register it as a message consumer controller.

import { MQController } from '@opra/common';

@MQController({
description: 'Mail consumer controller',
})
export class MailConsumer { }

Options

OptionTypeDescription
namestringRegistry name. Defaults to the class name without the Controller suffix.
descriptionstringHuman-readable description.

Controller Headers

Use .Header() on the @MQController() decorator to declare message headers shared by all operations in the controller. Header names are matched case-insensitively; RegExp patterns are also supported.

import { MQController } from '@opra/common';

@(MQController({ description: 'Mail consumer controller' })
.Header('access-token', 'string')
.Header('x-tenant-id', { type: 'string', required: true }))
export class MailConsumer { }
note

Headers defined on a controller are inherited by all its operations. An operation can declare additional headers on top of the controller-level ones.

Header options

OptionTypeDescription
typestring | TypeHeader value type.
requiredbooleanHeader must be present.
deprecatedboolean | stringMarks the header deprecated.
descriptionstringHuman-readable description.

MQOperation

Decorate a controller method with @MQOperation() to define a message consumer. The first argument is the payload type; the second is an options object.

import { ComplexType, ApiField, MQController, MQOperation } from '@opra/common';

@ComplexType({ description: 'Send Mail DTO' })
export class SendMailDto {
@ApiField() declare from: string;
@ApiField() declare target: string;
@ApiField() declare message: string;
}

@MQController({ description: 'Mail consumer controller' })
export class MailConsumer {

@MQOperation(SendMailDto, {
channel: 'send-email',
})
async sendMail() { }
}

Channel

The channel option declares which topic or queue the operation subscribes to. It accepts a string, a RegExp, or an array of either:

@MQController()
export class EventConsumer {

// Exact channel name
@MQOperation(OrderCreatedEvent, { channel: 'order.created' })
async onOrderCreated() { }

// RegExp pattern — matches any channel starting with 'order.'
@MQOperation(OrderEvent, { channel: /^order\..*/ })
async onOrderEvent() { }

// Multiple channels
@MQOperation(CustomerEvent, {
channel: ['customer.created', 'customer.updated'],
})
async onCustomerEvent() { }
}

Message key type

For platforms that support keyed messages (e.g. Kafka), declare the key type with keyType:

@MQOperation(SendMailDto, {
channel: 'send-email',
keyType: 'uuid',
})
async sendMail() { }

Options

OptionTypeDescription
channelstring | RegExp | (string | RegExp)[]Channel name(s) or pattern(s) to subscribe to.
keyTypestring | TypeMessage key type (e.g. for Kafka partition keys).
descriptionstringHuman-readable description.

Operation Headers

Use .Header() on @MQOperation() to declare headers specific to a single operation.

@MQController({ description: 'Mail consumer controller' })
export class MailConsumer {

@(MQOperation(SendMailDto, { channel: 'send-email' })
.Header('x-priority', 'integer')
.Header('x-retry-count', { type: 'integer', required: false }))
async sendMail() { }
}

Response

Use .Response() to declare the reply message sent after the operation processes the incoming message. The first argument is the response payload type; the second is an options object with the reply channel.

@MQController({ description: 'Mail consumer controller' })
export class MailConsumer {

@(MQOperation(SendMailDto, { channel: 'send-email' })
.Response('string', {
channel: 'send-email-response',
}))
async sendMail() { }
}

Chain .Header() after .Response() to declare headers on the response message:

@(MQOperation(SendMailDto, { channel: 'send-email' })
.Response('string', { channel: 'send-email-response' })
.Header('x-request-id', 'uuid'))
async sendMail() { }

Response options

OptionTypeDescription
channelstring | RegExp | (string | RegExp)[]Channel to send the reply message to.
keyTypestring | TypeMessage key type for the response.
descriptionstringHuman-readable description.

Platform Extensions

Platform-specific behaviour is configured via chainable extension methods added by the respective adapter packages.

Kafka

Import @opra/kafka to enable the .Kafka() chain method on @MQOperation():

import '@opra/kafka';
import { MQController, MQOperation } from '@opra/common';

@MQController({ description: 'Order event consumer' })
export class OrderConsumer {

@(MQOperation(OrderCreatedEvent, { channel: 'order.created' })
.Kafka({
consumer: {
groupId: 'order-service',
},
subscribe: {
fromBeginning: false,
},
}))
async onOrderCreated() { }
}

RabbitMQ

Import @opra/rabbitmq to enable the .RabbitMQ() chain method on @MQOperation():

import '@opra/rabbitmq';
import { MQController, MQOperation } from '@opra/common';

@MQController({ description: 'Mail consumer controller' })
export class MailConsumer {

@(MQOperation(SendMailDto, { channel: 'send-email' })
.RabbitMQ({
concurrency: 5,
requeue: true,
qos: { prefetchCount: 10 },
}))
async sendMail() { }
}