MQ Controllers
MQ controllers handle message queue operations for transports such as Kafka and RabbitMQ. Each method maps to a channel or topic via @MQOperation decorators. OPRA validates incoming message payloads and encodes outgoing responses automatically.
Defining a Controller
Decorate the class with @MQController(). The controller name defaults to the class name with the Controller suffix stripped.
import { MQController } from '@opra/common';
@MQController()
export class CustomersController {}
@MQController() applies @Injectable() automatically in a NestJS application.
@MQController() returns a chainable builder with the following methods:
.Header()
Declares a message header shared across all operations in the controller.
@MQController()
.Header('x-tenant-id', { type: 'string', required: true })
export class CustomersController {}
.UseType()
Registers types scoped to this controller without exposing them globally.
import { CustomerStatus } from '../models/enum/customer-status.js';
@MQController()
.UseType(CustomerStatus)
export class CustomersController {}
Defining Operations
Each operation is declared with @MQOperation. The first argument is the payload type. The channel name defaults to the method name.
import { MQController, MQOperation } from '@opra/common';
import { MQContext } from '@opra/kafka'; // or @opra/rabbitmq
import { Customer, CustomerCreate } from '../models/customer.js';
@MQController()
export class CustomersController {
@MQOperation(CustomerCreate)
async create(ctx: MQContext) { ... }
@MQOperation(Customer)
.Response(Customer)
async update(ctx: MQContext) { ... }
}
.Header()
Declares a header specific to this operation.
@MQOperation(CustomerCreate)
.Header('x-idempotency-key', { type: 'string', required: true })
async create(ctx: MQContext) { ... }
.Response()
Declares the response payload type sent back to the caller (RPC pattern). After .Response(), .Header() can be chained to declare response-specific headers.
@MQOperation(CustomerCreate)
.Response(Customer)
.Header('x-request-id', { type: 'string' })
async create(ctx: MQContext) { ... }
.UseType()
Registers types scoped to this operation without exposing them globally.
@MQOperation(CustomerCreate)
.UseType(CustomerStatus)
.Response(Customer)
async create(ctx: MQContext) { ... }
NestJS Integration
Register the controller and its dependencies in the transport module:
// Kafka
import { OpraKafkaModule } from '@opra/nestjs-kafka';
OpraKafkaModule.forRoot({
controllers: [CustomersController],
providers: [CustomersController, CustomersService],
})
// RabbitMQ
import { OpraRabbitmqModule } from '@opra/nestjs-rabbitmq';
OpraRabbitmqModule.forRoot({
controllers: [CustomersController],
providers: [CustomersController, CustomersService],
})