Skip to main content

Kafka Module

The @opra/nestjs-kafka package integrates the OPRA Kafka adapter into a NestJS application. It discovers your OPRA message queue controllers automatically from the NestJS provider tree, builds the ApiDocument, wires up the Kafka consumer lifecycle through NestJS hooks, and resolves class-based interceptors through the NestJS DI container — all transparently.


Installation

npm install @opra/nestjs-kafka @opra/kafka @opra/common

Setup

Import OpraKafkaModule in your root application module:

import { Module } from '@nestjs/common';
import { OpraKafkaModule } from '@opra/nestjs-kafka';
import { OrdersController } from './orders/orders.controller.js';
import { OrdersService } from './orders/orders.service.js';
import * as models from './models/models.js';

@Module({
imports: [
OpraKafkaModule.forRoot({
providers: [OrdersController, OrdersService],
name: 'OrderApi',
info: { title: 'Order API', version: '1.0' },
types: [...Object.values(models)],
client: {
bootstrapBrokers: ['localhost:9092'],
},
}),
],
})
export class AppModule {}

Controllers decorated with @MQController are discovered automatically from the NestJS provider tree — no explicit controllers list is needed in the module options.

Options

OptionTypeDescription
namestringAPI name.
infoobjectDocument metadata — title, version, description.
typesany[]Data types to register (decorated classes, EnumType results).
referencesRecord<string, ReferenceThunk>Namespaced references to other ApiDocument instances or async thunks.
clientClientOptionsKafka client connection options. Must include bootstrapBrokers.
consumersRecord<string, ConsumerOptions>Named consumer group configurations.
defaults{ consumer?, subscribe? }Default consumer and subscription options applied to all operations unless overridden.
scopestringValidation scope applied during message decoding.
interceptors(InterceptorFunction | IKafkaInterceptor | Type<IKafkaInterceptor>)[]Interceptor chain executed on every message. Class types are resolved through the NestJS DI container.
loggerLoggerCustom NestJS logger instance. Defaults to a logger named after the API.
logExtrabooleanLog additional diagnostic output from the Kafka client.
importsany[]NestJS modules to import into the OPRA module context.
providersProvider[]NestJS providers available for injection inside controllers.
exportsany[]Providers to export from the OPRA module to the rest of the application.
globalbooleanRegister the module as a NestJS global module.

Async configuration

Use forRootAsync() when client, info, types, or other options depend on injected services:

import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { OpraKafkaModule } from '@opra/nestjs-kafka';
import * as models from './models/models.js';

@Module({
imports: [
ConfigModule.forRoot(),
OpraKafkaModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
providers: [OrdersController, OrdersService],
useFactory: (config: ConfigService) => ({
name: 'OrderApi',
info: {
title: 'Order API',
version: config.get('API_VERSION'),
},
types: [...Object.values(models)],
client: {
bootstrapBrokers: config.get<string[]>('KAFKA_BROKERS'),
},
}),
}),
],
})
export class AppModule {}

How it works

OpraKafkaModule scans all NestJS providers in its module context for classes decorated with @MQController and builds the ApiDocument from them automatically. No explicit controller list is required.

Once the document is ready, the module creates a KafkaAdapter and manages its lifecycle through NestJS hooks:

  • onApplicationBootstrap — calls adapter.start(), which connects to brokers, fetches available topics, and begins consuming messages.
  • onApplicationShutdown — calls adapter.close(true) to force-close all consumers gracefully.

Dependency injection in controllers

OPRA MQ controllers are registered as NestJS providers, so you can inject services directly into them:

import { MQController, MQOperation } from '@opra/common';
import { KafkaContext } from '@opra/kafka';
import { Injectable } from '@nestjs/common';
import { OrdersService } from './orders.service.js';

@Injectable()
@MQController()
export class OrdersController {
constructor(private readonly service: OrdersService) {}

@MQOperation({ channel: 'orders.created', type: OrderPayload })
async onOrderCreated(ctx: KafkaContext) {
await this.service.process(ctx.payload);
}
}

Register the controller as a NestJS provider in the module options:

OpraKafkaModule.forRoot({
providers: [OrdersController, OrdersService],
// ...
})

Class-based interceptors

Class-based interceptors passed to the interceptors option are resolved through the NestJS DI container, so they can have injected dependencies:

import { IKafkaInterceptor, KafkaContext } from '@opra/kafka';
import { Injectable } from '@nestjs/common';
import { MetricsService } from './metrics.service.js';

@Injectable()
class MetricsInterceptor implements IKafkaInterceptor {
constructor(private readonly metrics: MetricsService) {}

async intercept(ctx: KafkaContext, next: () => Promise<any>) {
const start = Date.now();
await next();
this.metrics.record(ctx.topic, Date.now() - start);
}
}

OpraKafkaModule.forRoot({
providers: [OrdersController, OrdersService, MetricsService, MetricsInterceptor],
interceptors: [MetricsInterceptor], // resolved via DI
// ...
})

Injecting the adapter

The KafkaAdapter instance is exported from the module and can be injected elsewhere in your application:

import { Injectable } from '@nestjs/common';
import { KafkaAdapter } from '@opra/kafka';

@Injectable()
export class HealthService {
constructor(private readonly adapter: KafkaAdapter) {}

isReady() {
return this.adapter.status === 'started';
}
}