Deepak Dev
Posted on January 3, 2023
Dear Devs, This is not a beginners guide. I want to show you how i have implemented Kafka in my Nest.Js project.
In this project we have 3 microservices developed in Nest.js
- ev-api-gateway
- ev-users
- ev-vehicles I am running Kafka, MongoDb , zookeeper , Redis in the Docker. The project structure is shown as below.
docker-compose.yml
version: '3.8'
networks:
ev:
name: "ev"
driver: bridge
ipam:
driver: default
config:
- subnet: 10.5.0.0/16
ip_range: 10.5.0.0/24
gateway: 10.5.0.1
aux_addresses:
kafka: 10.5.0.2
zookeeper: 10.5.0.3
services:
mongodb:
image : mongo:latest
container_name: mongodb
environment:
- MONGO_INITDB_ROOT_PASSWORD= root
- MONGO_INITDB_ROOT_USERNAME= root
volumes:
- ./mongodb:/data/db
ports:
- 27017:27017
restart: unless-stopped
zookeeper:
image: 'bitnami/zookeeper:latest'
container_name: "zookeeper"
ports:
- 2181:2181
# volumes:
# - ./zookeeper:/var/lib/zookeeper
networks:
- "ev"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:latest
container_name: "kafka"
ports:
- 9092:9092
networks:
- "ev"
# volumes:
# - ./kafka:/opt/bitnami/kafka
restart: unless-stopped
depends_on:
- zookeeper
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://:9092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_RESTART_ATTEMPTS=10
- KAFKA_RESTART_DELAY=5
- ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL=0
# - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
redis:
image: redis:3
restart: always
container_name: "redis"
ports:
- 6379:6379
environment:
- ALLOW_EMPTY_PASSWORD=yes
volumes:
- ./redis:/data
networks:
- "ev"
run the docker-compose up -d --build
Once the containers are up confirm by docker ps
ev-api-gateway\src\main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import * as compression from 'compression';
import { ValidationPipe } from '@nestjs/common';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.use(compression());
app.enableCors({
origin: ['http://localhost:4200'],
methods: ['POST'],
credentials: true,
});
app.setGlobalPrefix('api/v1');
app.useGlobalPipes(
new ValidationPipe({
transform: false,
whitelist: true,
//enableDebugMessages: true,
}),
);
app.startAllMicroservices();
await app.listen(8081);
}
bootstrap();
The api-gateway will start listening on 8081
Generate modules in api-gateway by
nest g mo auth
nest g mo kafka
nest g mo redis
nest g mo users
nest g mo vehicles
In kafka module add the following codes
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { Partitioners } from 'kafkajs';
@Module({
imports: [
ClientsModule.register([
{
name: 'USERS_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'users-consumer',
},
producer: {
createPartitioner: Partitioners.LegacyPartitioner,
},
},
},
{
name: 'VEHICLES_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'vehicles-consumer',
},
producer: {
createPartitioner: Partitioners.LegacyPartitioner,
},
},
},
]),
],
exports: [ClientsModule],
})
export class KafkaModule {}
in users module add the following code
import { Module } from '@nestjs/common';
import { UsersService } from './users.service';
import { UsersController } from './users.controller';
import { KafkaModule } from 'src/kafka/kafka.module';
import { RedisModule } from 'src/redis/redis.module';
@Module({
imports: [KafkaModule, RedisModule],
controllers: [UsersController],
providers: [UsersService],
})
export class UsersModule {}
users controller will be like this
import { Controller, Post, Body, Header, Res } from '@nestjs/common';
import { UsersService } from './users.service';
@Controller('users')
export class UsersController {
constructor(private readonly usersService: UsersService) {}
@Post('/signup')
@Header('Content-Type', 'application/json')
async create(@Body() signupData, @Res() res) {
const result = await this.usersService.userCreation(signupData);
res.send(result);
}
@Post('/login')
@Header('Content-Type', 'application/json')
async login(@Body() data, @Res() res) {
const result = await this.usersService.login(data);
res.send(result);
}
@Post('/list')
@Header('Content-Type', 'application/json')
async list(@Body() data, @Res() res) {
const result = await this.usersService.redisSet();
console.log('result');
console.log(result);
res.send({ result: result });
}
}
and the users service likes this
import {
Inject,
Injectable,
OnApplicationShutdown,
OnModuleInit,
} from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices/client';
import { RedisService } from 'src/redis/redis.service';
@Injectable()
export class UsersService implements OnModuleInit, OnApplicationShutdown {
constructor(
@Inject('USERS_SERVICE') private readonly usersService: ClientKafka,
private cacheManager: RedisService,
) {}
async onApplicationShutdown() {
await this.usersService.close();
}
async onModuleInit() {
const requestPatterns = ['users.userCreation', 'users.login'];
requestPatterns.forEach((pattern) => {
this.usersService.subscribeToResponseOf(pattern);
});
await this.usersService.connect();
}
async userCreation(signupData: any) {
return await new Promise<any>((resolve) =>
this.usersService
.send('users.userCreation', signupData)
.subscribe((data) => {
console.log(data);
resolve(data);
}),
);
}
async login(data: any) {
return await new Promise<any>((resolve) =>
this.usersService.send('users.login', data).subscribe(async (data) => {
console.log(data);
if (data?.id) {
const pattern = 'loginData' + data.id;
await this.cacheManager
.get(pattern)
.then((loginData) => {
console.log(loginData);
if (loginData) {
loginData.count += 1;
this.cacheManager.set(pattern, loginData);
} else {
const setLoginData = {
count: 1,
};
this.cacheManager.set(pattern, setLoginData);
}
})
.catch((err) => {
console.log(err);
});
}
resolve(data);
}),
);
}
async redisSet() {
this.cacheManager.set('address', 'deepakdev');
return this.cacheManager.get('name');
}
}
Now lets focus on ev-users project
main.ts file
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { Partitioners } from 'kafkajs';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'users-consumer',
},
producer: {
createPartitioner: Partitioners.LegacyPartitioner,
},
},
},
);
app.listen();
}
bootstrap();
app.module file
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { UsersModule } from './users/users.module';
import { MongodbModule } from './mongodb/mongodb.module';
@Module({
imports: [MongodbModule, UsersModule],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Generate users module in ev-users by nest g mo users
inside users module
import { Module } from '@nestjs/common';
import { UsersService } from './users.service';
import { UsersController } from './users.controller';
import { User, UsersSchema } from '../mongodb/schemas/user.schema';
import { MongooseModule } from '@nestjs/mongoose';
@Module({
imports: [
MongooseModule.forFeature([{ name: User.name, schema: UsersSchema }]),
],
controllers: [UsersController],
providers: [UsersService],
})
export class UsersModule {}
users controller
import { Controller, Post, Body, Header, HttpStatus } from '@nestjs/common';
import { UsersService } from './users.service';
import * as bcrypt from 'bcrypt';
import { config } from '../config/configuration';
import * as jwt from 'jsonwebtoken';
import { Res } from '@nestjs/common/decorators';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class UsersController {
constructor(private readonly usersService: UsersService) {}
@MessagePattern('users.userCreation')
async create(@Payload() signupData: any) {
const isUserExits = await this.usersService.isUserExits(signupData.emailId);
if (isUserExits !== null) {
return {
status: HttpStatus.CONFLICT,
message: 'You have already registered.',
};
} else {
signupData.password = await bcrypt.hash(
signupData.password,
config.saltOrRounds,
);
try {
return await this.usersService
.createUser(signupData)
.then(() => {
return {
status: HttpStatus.OK,
message: 'You have successfully registered.',
};
})
.catch(() => {
return {
status: HttpStatus.BAD_REQUEST,
message: 'Inavlid details provided!',
};
});
} catch {
return {
status: HttpStatus.BAD_REQUEST,
message: 'Inavlid details provided!',
};
}
}
}
@MessagePattern('users.login')
async login(@Payload() loginData) {
console.log('MessagePattern:users.login');
const dbData = await this.usersService.isUserExits(loginData.emailId);
if (dbData !== null) {
const isMatch = await bcrypt.compare(loginData.password, dbData.password);
if (isMatch) {
const token = await this.tokenGenerator(dbData);
return {
status: HttpStatus.OK,
token: token,
id: dbData._id,
};
} else {
return {
status: HttpStatus.UNAUTHORIZED,
message: 'Inavlid credentials!',
};
}
} else {
return {
status: HttpStatus.NOT_ACCEPTABLE,
message: 'Inavlid credentials!',
};
}
}
async tokenGenerator(data: any) {
const payload = {
userId: data._id.toString(),
userIdObject: data._id,
emailId: data.emailId,
firstName: data.firstName,
};
const secret = config.JWTsecret;
const options = config.JWToptios;
const token = jwt.sign(payload, secret, options);
return token;
}
async verifyToken(token: string): Promise<any> {
return jwt.verify(token, config.JWTsecret, function (err, data) {
if (err) {
return err;
}
return data;
});
}
}
users service
import { Model } from 'mongoose';
import { Injectable } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { CreateUserDto } from './dto/create-user.dto';
import { User, UserDocument } from '../mongodb/schemas/user.schema';
@Injectable()
export class UsersService {
constructor(
@InjectModel(User.name) private usersModel: Model<UserDocument>,
) {}
async createUser(createUserDto: CreateUserDto) {
console.log('Users::createUser');
try {
const createdUser = new this.usersModel(createUserDto);
return createdUser.save();
} catch (err) {
console.log('createUser Mongodb error!');
console.log(err);
return false;
}
}
async isUserExits(email: string) {
return this.usersModel.findOne({ emailId: email });
}
}
Once u start the users service and hit the login api as 8081/api/v1/login
What will happen :
ev-api-gateway will hit.The login controller will produce the message to ev-users and it will subscribe to the same topic as well
Once Message sent, the Ev-uses will consume the data and return the result to the ev-apigateway. Since its already subscribed to the same topic, the result will be received at the api-gateway and returned the result to the api.
This is just a fragment of the project that i shared here but the producer and consumer codes are mentioned above which will be helpful for some of the Devs. In case if you have any doubts, please comment and let me help you out.
Posted on January 3, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.