How to implement Kafka in Nest.Js ?

iamdeepakdev

Deepak Dev

Posted on January 3, 2023

How to implement Kafka in Nest.Js ?

Dear Devs, This is not a beginners guide. I want to show you how i have implemented Kafka in my Nest.Js project.
In this project we have 3 microservices developed in Nest.js

  1. ev-api-gateway
  2. ev-users
  3. ev-vehicles I am running Kafka, MongoDb , zookeeper , Redis in the Docker. The project structure is shown as below.

folder structure

docker-compose.yml

version: '3.8'
networks:
  ev:
    name: "ev"
    driver: bridge
    ipam:
      driver: default
      config:
        - subnet: 10.5.0.0/16
          ip_range: 10.5.0.0/24
          gateway: 10.5.0.1
          aux_addresses:
            kafka: 10.5.0.2
            zookeeper: 10.5.0.3
services:
  mongodb:
        image : mongo:latest
        container_name: mongodb
        environment:
        - MONGO_INITDB_ROOT_PASSWORD= root
        - MONGO_INITDB_ROOT_USERNAME= root
        volumes:
        - ./mongodb:/data/db
        ports:
        - 27017:27017
        restart: unless-stopped
  zookeeper:
        image: 'bitnami/zookeeper:latest'
        container_name: "zookeeper"
        ports:
        - 2181:2181
        # volumes:
        # - ./zookeeper:/var/lib/zookeeper
        networks:
        - "ev"
        environment:
        - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
        image: bitnami/kafka:latest
        container_name: "kafka"
        ports:
            - 9092:9092
        networks:
        - "ev"
        # volumes:
        # - ./kafka:/opt/bitnami/kafka
        restart: unless-stopped
        depends_on:
        - zookeeper
        environment:
        - KAFKA_BROKER_ID=1
        - KAFKA_LISTENERS=PLAINTEXT://:9092
        - ALLOW_PLAINTEXT_LISTENER=yes
        - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
        - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
        - KAFKA_RESTART_ATTEMPTS=10
        - KAFKA_RESTART_DELAY=5
        - ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL=0
        # - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
  redis:
        image: redis:3
        restart: always
        container_name: "redis"
        ports:
        - 6379:6379
        environment:
        - ALLOW_EMPTY_PASSWORD=yes
        volumes: 
        - ./redis:/data
        networks:
        - "ev"
Enter fullscreen mode Exit fullscreen mode

run the docker-compose up -d --build
Once the containers are up confirm by docker ps

Docker ps

ev-api-gateway\src\main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import * as compression from 'compression';
import { ValidationPipe } from '@nestjs/common';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.use(compression());
  app.enableCors({
    origin: ['http://localhost:4200'],
    methods: ['POST'],
    credentials: true,
  });
  app.setGlobalPrefix('api/v1');
  app.useGlobalPipes(
    new ValidationPipe({
      transform: false,
      whitelist: true,
      //enableDebugMessages: true,
    }),
  );
  app.startAllMicroservices();
  await app.listen(8081);
}
bootstrap();

Enter fullscreen mode Exit fullscreen mode

The api-gateway will start listening on 8081

Generate modules in api-gateway by

nest g mo auth
nest g mo kafka
nest g mo redis
nest g mo users
nest g mo vehicles
Enter fullscreen mode Exit fullscreen mode

In kafka module add the following codes

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { Partitioners } from 'kafkajs';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'USERS_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'users-consumer',
          },
          producer: {
            createPartitioner: Partitioners.LegacyPartitioner,
          },
        },
      },
      {
        name: 'VEHICLES_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'vehicles-consumer',
          },
          producer: {
            createPartitioner: Partitioners.LegacyPartitioner,
          },
        },
      },
    ]),
  ],
  exports: [ClientsModule],
})
export class KafkaModule {}

Enter fullscreen mode Exit fullscreen mode

in users module add the following code

import { Module } from '@nestjs/common';
import { UsersService } from './users.service';
import { UsersController } from './users.controller';
import { KafkaModule } from 'src/kafka/kafka.module';
import { RedisModule } from 'src/redis/redis.module';
@Module({
  imports: [KafkaModule, RedisModule],
  controllers: [UsersController],
  providers: [UsersService],
})
export class UsersModule {}

Enter fullscreen mode Exit fullscreen mode

users controller will be like this

import { Controller, Post, Body, Header, Res } from '@nestjs/common';
import { UsersService } from './users.service';

@Controller('users')
export class UsersController {
  constructor(private readonly usersService: UsersService) {}

  @Post('/signup')
  @Header('Content-Type', 'application/json')
  async create(@Body() signupData, @Res() res) {
    const result = await this.usersService.userCreation(signupData);
    res.send(result);
  }
  @Post('/login')
  @Header('Content-Type', 'application/json')
  async login(@Body() data, @Res() res) {
    const result = await this.usersService.login(data);
    res.send(result);
  }
  @Post('/list')
  @Header('Content-Type', 'application/json')
  async list(@Body() data, @Res() res) {
    const result = await this.usersService.redisSet();
    console.log('result');
    console.log(result);
    res.send({ result: result });
  }
}

Enter fullscreen mode Exit fullscreen mode

and the users service likes this

import {
  Inject,
  Injectable,
  OnApplicationShutdown,
  OnModuleInit,
} from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices/client';
import { RedisService } from 'src/redis/redis.service';

@Injectable()
export class UsersService implements OnModuleInit, OnApplicationShutdown {
  constructor(
    @Inject('USERS_SERVICE') private readonly usersService: ClientKafka,
    private cacheManager: RedisService,
  ) {}
  async onApplicationShutdown() {
    await this.usersService.close();
  }
  async onModuleInit() {
    const requestPatterns = ['users.userCreation', 'users.login'];
    requestPatterns.forEach((pattern) => {
      this.usersService.subscribeToResponseOf(pattern);
    });
    await this.usersService.connect();
  }
  async userCreation(signupData: any) {
    return await new Promise<any>((resolve) =>
      this.usersService
        .send('users.userCreation', signupData)
        .subscribe((data) => {
          console.log(data);
          resolve(data);
        }),
    );
  }
  async login(data: any) {
    return await new Promise<any>((resolve) =>
      this.usersService.send('users.login', data).subscribe(async (data) => {
        console.log(data);
        if (data?.id) {
          const pattern = 'loginData' + data.id;
          await this.cacheManager
            .get(pattern)
            .then((loginData) => {
              console.log(loginData);
              if (loginData) {
                loginData.count += 1;
                this.cacheManager.set(pattern, loginData);
              } else {
                const setLoginData = {
                  count: 1,
                };
                this.cacheManager.set(pattern, setLoginData);
              }
            })
            .catch((err) => {
              console.log(err);
            });
        }
        resolve(data);
      }),
    );
  }
  async redisSet() {
    this.cacheManager.set('address', 'deepakdev');
    return this.cacheManager.get('name');
  }
}

Enter fullscreen mode Exit fullscreen mode

Now lets focus on ev-users project
main.ts file

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { Partitioners } from 'kafkajs';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'users-consumer',
        },
        producer: {
          createPartitioner: Partitioners.LegacyPartitioner,
        },
      },
    },
  );
  app.listen();
}
bootstrap();

Enter fullscreen mode Exit fullscreen mode

app.module file

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { UsersModule } from './users/users.module';
import { MongodbModule } from './mongodb/mongodb.module';
@Module({
  imports: [MongodbModule, UsersModule],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Enter fullscreen mode Exit fullscreen mode

Generate users module in ev-users by nest g mo users
inside users module

import { Module } from '@nestjs/common';
import { UsersService } from './users.service';
import { UsersController } from './users.controller';
import { User, UsersSchema } from '../mongodb/schemas/user.schema';
import { MongooseModule } from '@nestjs/mongoose';

@Module({
  imports: [
    MongooseModule.forFeature([{ name: User.name, schema: UsersSchema }]),
  ],
  controllers: [UsersController],
  providers: [UsersService],
})
export class UsersModule {}

Enter fullscreen mode Exit fullscreen mode

users controller

import { Controller, Post, Body, Header, HttpStatus } from '@nestjs/common';
import { UsersService } from './users.service';
import * as bcrypt from 'bcrypt';
import { config } from '../config/configuration';
import * as jwt from 'jsonwebtoken';
import { Res } from '@nestjs/common/decorators';
import { MessagePattern, Payload } from '@nestjs/microservices';

@Controller()
export class UsersController {
  constructor(private readonly usersService: UsersService) {}

  @MessagePattern('users.userCreation')
  async create(@Payload() signupData: any) {
    const isUserExits = await this.usersService.isUserExits(signupData.emailId);
    if (isUserExits !== null) {
      return {
        status: HttpStatus.CONFLICT,
        message: 'You have already registered.',
      };
    } else {
      signupData.password = await bcrypt.hash(
        signupData.password,
        config.saltOrRounds,
      );

      try {
        return await this.usersService
          .createUser(signupData)
          .then(() => {
            return {
              status: HttpStatus.OK,
              message: 'You have successfully registered.',
            };
          })
          .catch(() => {
            return {
              status: HttpStatus.BAD_REQUEST,
              message: 'Inavlid details provided!',
            };
          });
      } catch {
        return {
          status: HttpStatus.BAD_REQUEST,
          message: 'Inavlid details provided!',
        };
      }
    }
  }

  @MessagePattern('users.login')
  async login(@Payload() loginData) {
    console.log('MessagePattern:users.login');
    const dbData = await this.usersService.isUserExits(loginData.emailId);
    if (dbData !== null) {
      const isMatch = await bcrypt.compare(loginData.password, dbData.password);
      if (isMatch) {
        const token = await this.tokenGenerator(dbData);
        return {
          status: HttpStatus.OK,
          token: token,
          id: dbData._id,
        };
      } else {
        return {
          status: HttpStatus.UNAUTHORIZED,
          message: 'Inavlid credentials!',
        };
      }
    } else {
      return {
        status: HttpStatus.NOT_ACCEPTABLE,
        message: 'Inavlid credentials!',
      };
    }
  }
  async tokenGenerator(data: any) {
    const payload = {
      userId: data._id.toString(),
      userIdObject: data._id,
      emailId: data.emailId,
      firstName: data.firstName,
    };
    const secret = config.JWTsecret;
    const options = config.JWToptios;

    const token = jwt.sign(payload, secret, options);
    return token;
  }
  async verifyToken(token: string): Promise<any> {
    return jwt.verify(token, config.JWTsecret, function (err, data) {
      if (err) {
        return err;
      }
      return data;
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

users service

import { Model } from 'mongoose';
import { Injectable } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { CreateUserDto } from './dto/create-user.dto';
import { User, UserDocument } from '../mongodb/schemas/user.schema';

@Injectable()
export class UsersService {
  constructor(
    @InjectModel(User.name) private usersModel: Model<UserDocument>,
  ) {}

  async createUser(createUserDto: CreateUserDto) {
    console.log('Users::createUser');
    try {
      const createdUser = new this.usersModel(createUserDto);
      return createdUser.save();
    } catch (err) {
      console.log('createUser Mongodb error!');
      console.log(err);
      return false;
    }
  }
  async isUserExits(email: string) {
    return this.usersModel.findOne({ emailId: email });
  }
}
Enter fullscreen mode Exit fullscreen mode

Once u start the users service and hit the login api as 8081/api/v1/login
What will happen :

  1. ev-api-gateway will hit.The login controller will produce the message to ev-users and it will subscribe to the same topic as well

  2. Once Message sent, the Ev-uses will consume the data and return the result to the ev-apigateway. Since its already subscribed to the same topic, the result will be received at the api-gateway and returned the result to the api.
    This is just a fragment of the project that i shared here but the producer and consumer codes are mentioned above which will be helpful for some of the Devs. In case if you have any doubts, please comment and let me help you out.

💖 💪 🙅 🚩
iamdeepakdev
Deepak Dev

Posted on January 3, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related