MQ Api
Overview
OPRA's MQ API provides a transport-agnostic layer on top of brokers such as Apache Kafka, RabbitMQ, Amazon SQS, and others. You define your message consumers once using TypeScript decorators; the adapter package for your chosen broker handles connection management, serialization, acknowledgement, and error handling under the hood. Incoming payload and headers are automatically validated and decoded against their declared types before your handler is called, and any response payload is encoded and validated before it is published to the reply channel.
A Message Queue API in OPRA is built from two decorator types:
@MQController()— groups related message operations. Controllers can share headers across all their operations.@MQOperation()— defines a single message consumer on a controller method. It declares the payload type, the channel(s) to subscribe to, and an optional response.
import { ApiDocumentFactory } from '@opra/common';
import { MailConsumer } from './api/mail-consumer.js';
import { SendMailDto } from './dto/send-mail.dto.js';
const document = await ApiDocumentFactory.createDocument({
info: { title: 'Notification API', version: '1.0' },
types: [SendMailDto],
api: {
transport: 'mq',
platform: 'Kafka',
name: 'NotificationService',
controllers: [MailConsumer],
},
});
MQController
Decorate a class with @MQController() to register it as a message consumer controller.
import { MQController } from '@opra/common';
@MQController({
description: 'Mail consumer controller',
})
export class MailConsumer { }
Options
| Option | Type | Description |
|---|---|---|
name | string | Registry name. Defaults to the class name without the Controller suffix. |
description | string | Human-readable description. |
Controller Headers
Use .Header() on the @MQController() decorator to declare message headers shared by all operations in the controller. Header names are matched case-insensitively; RegExp patterns are also supported.
import { MQController } from '@opra/common';
@(MQController({ description: 'Mail consumer controller' })
.Header('access-token', 'string')
.Header('x-tenant-id', { type: 'string', required: true }))
export class MailConsumer { }
Headers defined on a controller are inherited by all its operations. An operation can declare additional headers on top of the controller-level ones.
Header options
| Option | Type | Description |
|---|---|---|
type | string | Type | Header value type. |
required | boolean | Header must be present. |
deprecated | boolean | string | Marks the header deprecated. |
description | string | Human-readable description. |
MQOperation
Decorate a controller method with @MQOperation() to define a message consumer. The first argument is the payload type; the second is an options object.
import { ComplexType, ApiField, MQController, MQOperation } from '@opra/common';
@ComplexType({ description: 'Send Mail DTO' })
export class SendMailDto {
@ApiField() declare from: string;
@ApiField() declare target: string;
@ApiField() declare message: string;
}
@MQController({ description: 'Mail consumer controller' })
export class MailConsumer {
@MQOperation(SendMailDto, {
channel: 'send-email',
})
async sendMail() { }
}
Channel
The channel option declares which topic or queue the operation subscribes to. It accepts a string, a RegExp, or an array of either:
@MQController()
export class EventConsumer {
// Exact channel name
@MQOperation(OrderCreatedEvent, { channel: 'order.created' })
async onOrderCreated() { }
// RegExp pattern — matches any channel starting with 'order.'
@MQOperation(OrderEvent, { channel: /^order\..*/ })
async onOrderEvent() { }
// Multiple channels
@MQOperation(CustomerEvent, {
channel: ['customer.created', 'customer.updated'],
})
async onCustomerEvent() { }
}
Message key type
For platforms that support keyed messages (e.g. Kafka), declare the key type with keyType:
@MQOperation(SendMailDto, {
channel: 'send-email',
keyType: 'uuid',
})
async sendMail() { }
Options
| Option | Type | Description |
|---|---|---|
channel | string | RegExp | (string | RegExp)[] | Channel name(s) or pattern(s) to subscribe to. |
keyType | string | Type | Message key type (e.g. for Kafka partition keys). |
description | string | Human-readable description. |
Operation Headers
Use .Header() on @MQOperation() to declare headers specific to a single operation.
@MQController({ description: 'Mail consumer controller' })
export class MailConsumer {
@(MQOperation(SendMailDto, { channel: 'send-email' })
.Header('x-priority', 'integer')
.Header('x-retry-count', { type: 'integer', required: false }))
async sendMail() { }
}
Response
Use .Response() to declare the reply message sent after the operation processes the incoming message. The first argument is the response payload type; the second is an options object with the reply channel.
@MQController({ description: 'Mail consumer controller' })
export class MailConsumer {
@(MQOperation(SendMailDto, { channel: 'send-email' })
.Response('string', {
channel: 'send-email-response',
}))
async sendMail() { }
}
Chain .Header() after .Response() to declare headers on the response message:
@(MQOperation(SendMailDto, { channel: 'send-email' })
.Response('string', { channel: 'send-email-response' })
.Header('x-request-id', 'uuid'))
async sendMail() { }
Response options
| Option | Type | Description |
|---|---|---|
channel | string | RegExp | (string | RegExp)[] | Channel to send the reply message to. |
keyType | string | Type | Message key type for the response. |
description | string | Human-readable description. |
Platform Extensions
Platform-specific behaviour is configured via chainable extension methods added by the respective adapter packages.
Kafka
Import @opra/kafka to enable the .Kafka() chain method on @MQOperation():
import '@opra/kafka';
import { MQController, MQOperation } from '@opra/common';
@MQController({ description: 'Order event consumer' })
export class OrderConsumer {
@(MQOperation(OrderCreatedEvent, { channel: 'order.created' })
.Kafka({
consumer: {
groupId: 'order-service',
},
subscribe: {
fromBeginning: false,
},
}))
async onOrderCreated() { }
}
RabbitMQ
Import @opra/rabbitmq to enable the .RabbitMQ() chain method on @MQOperation():
import '@opra/rabbitmq';
import { MQController, MQOperation } from '@opra/common';
@MQController({ description: 'Mail consumer controller' })
export class MailConsumer {
@(MQOperation(SendMailDto, { channel: 'send-email' })
.RabbitMQ({
concurrency: 5,
requeue: true,
qos: { prefetchCount: 10 },
}))
async sendMail() { }
}