Node Backend Server - Youtube GIF Maker Using Next.js, Node and RabbitMQ
Raggi
Posted on February 14, 2021
Hello everyone,
This Article is the second part of the series Youtube GIF Maker Using Next.js, Node and RabbitMQ.
In this article we will dive into building the backend server of our Youtube to GIF converter. This Article will contain some code snippets but the whole project can be accessed on github which contains the full source code as well as additional integration tests and swagger api docs. You can also view the app demo. The following topics will be covered here
- Functionalities
- Project Architecture
- Implementation
Functionalities
As seen in the sequence diagram above, the backend server has 3 main functionalities which are:
- Handling GIF conversion requests by creating a new job record in the database
- Dispatching events to RabbitMQ indicating that a new conversion job has been created (task queueing)
- Handling job fetching requests by querying the job by its id from the database and returning the appropriate response.
Project Architecture
Our express app architecture contains three main components
- Route Handler
- Controller
- Service
Each one of them has certain functionalities that we will discuses in some detail as well as explain why its structured this way
- Route Handler
- Responsible for routing paths to their route handlers. Typically these route handlers are composed of an array of handlers which we call the "Middleware Chain", the final handler in that chain is the Route Controller
- The Middleware Chain typically is responsible for doing "checks" on the incoming request as well as modifying the request object in some cases. In our case we will be doing validation using a custom validation middleware.
- Controller
- Extracting data from the request as well as sanitizing this data if necessary
- Delegating the control to the relevant Service
- Handling responses
- Delegating errors to a custom error handling middleware
- Service
- Has all the business logic
- Access data using the Data Access Layer (ORM/ODM)
Controllers should be dumb meaning they shouldn't have any details about the business logic, all they know is "which service can handle this request" , "what data this service needs" , "what the response should look like". This avoids having Fat Controllers
Implementation
Database Schema
In this project, we are using TypeORM which is a TypeScript ready ORM that supports many databases (we are going to be using MongoDB as mentioned in the first part of the series).
We are going to represent each GIF Conversion as a Job, which will be our only Collection.
The Job Collection in TypeORM looks like this
import { BaseEntity, Entity, ObjectID, Column, CreateDateColumn, UpdateDateColumn, ObjectIdColumn } from 'typeorm';
@Entity('jobs')
export class Job extends BaseEntity {
@ObjectIdColumn()
id: ObjectID;
@Column({
nullable: false,
})
youtubeUrl: string;
@Column({
nullable: false,
})
youtubeId: string;
@Column({
nullable: true,
})
gifUrl: string;
@Column({
nullable: false,
})
startTime: number;
@Column({
nullable: false,
})
endTime: number;
@Column({
type: 'enum',
enum: ['pending', 'processing', 'done', 'error'],
})
status: 'pending' | 'processing' | 'done' | 'error';
@Column()
@CreateDateColumn()
createdAt: Date;
@Column()
@UpdateDateColumn()
updatedAt: Date;
}
The important thing to notice here is the field status, this essentially acts as an enum to indicate the current status of the GIF conversion. All the other fields are standard data needed to do the conversion job
Route Handling
As mentioned before , we are going to only have two routes.
- Route for creating a new GIF conversion job
- Route for fetching data about a conversion job from its id which will be used for polling later by the client side
This is how our route handler looks like
//routes.interface
import { Router } from 'express';
interface Route {
path?: string;
router: Router;
}
export default Route;
//jobs.route.ts
import { Router } from 'express';
import { CreateJobDto } from '../../common/dtos/createJob.dto';
import Route from '../../common/interfaces/routes.interface';
import JobsController from '../../controllers/jobs.controller';
import validationMiddleware from '../middlewares/validation.middleware';
class JobsRoute implements Route {
public path = '/jobs';
public router = Router();
constructor(private jobsController = new JobsController()) {
this.initializeRoutes();
}
private initializeRoutes() {
this.router.get(`${this.path}/:id`, this.jobsController.getJobById);
this.router.post(`${this.path}`, validationMiddleware(CreateJobDto, 'body'), this.jobsController.createJob);
}
}
export default JobsRoute;
For validation we use a custom validation middleware that validates a DTO using class-validator and class-transformer
//createJob.dto
import { Expose } from 'class-transformer';
import { IsNotEmpty, IsNumber, IsString, Matches } from 'class-validator';
import { IsGreaterThan } from './validators/isGreaterThan';
import { MaximumDifference } from './validators/maximumDifference';
export class CreateJobDto {
@IsNotEmpty()
@IsString()
@Matches(/^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/, {
message: 'Invalid youtube url',
})
@Expose()
public youtubeUrl: string;
@IsNotEmpty()
@IsNumber()
@Expose()
public startTime: number;
@IsNotEmpty()
@IsNumber()
@IsGreaterThan('startTime', {
message: 'end time must be greater than start time',
})
@MaximumDifference('startTime', {
message: 'maximum gif duration is 30 seconds',
})
@Expose()
public endTime: number;
}
Notice that IsGreaterThan and MaximumDifference are custom class-validator validation decorators, essentially they look like this (more information on this can be found in class-validator docs)
//isGreaterThan.ts
import { registerDecorator, ValidationOptions, ValidationArguments } from 'class-validator';
export function IsGreaterThan(property: string, validationOptions?: ValidationOptions) {
return function (object: Object, propertyName: string) {
registerDecorator({
name: 'isGreaterThan',
target: object.constructor,
propertyName: propertyName,
constraints: [property],
options: validationOptions,
validator: {
validate(value: any, args: ValidationArguments) {
const [relatedPropertyName] = args.constraints;
const relatedValue = (args.object as any)[relatedPropertyName];
return typeof value === 'number' && typeof relatedValue === 'number' && value > relatedValue;
},
},
});
};
}
MaximumDifference looks similar to this but its return looks like this instead
return typeof value === 'number' && typeof relatedValue === 'number' && value - relatedValue <= difference;
And now our validation middleware looks like this
validation.middleware.ts
import { plainToClass } from 'class-transformer';
import { validate, ValidationError } from 'class-validator';
import { RequestHandler } from 'express';
const validationMiddleware = (type: any, value: string | 'body' | 'query' | 'params' = 'body', skipMissingProperties = false): RequestHandler => {
return (req, res, next) => {
validate(plainToClass(type, req[value]), { skipMissingProperties }).then((errors: ValidationError[]) => {
if (errors.length > 0) {
const message = errors.map((error: ValidationError) => Object.values(error.constraints)).join(', ');
res.status(400).send(message);
} else {
next();
}
});
};
};
export default validationMiddleware;
Controller
Our controller looks pretty standard, the only takeaways are extracting the CreateJobDto object from the body using plainToClass from class-transformer with excludeExtraneousValues: true, which destructures only the exposed fields (having @Expose() decorator in the CreateJobDto class) more about this in class-transformer docs
//jobs.controllers.ts
import { plainToClass } from 'class-transformer';
import { NextFunction, Request, Response } from 'express';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import { Job } from '../entities/jobs.entity';
import JobsService from '../services/jobs.service';
class JobsController {
constructor(private jobService = new JobsService()) {}
public createJob = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const jobDto: CreateJobDto = plainToClass(CreateJobDto, req.body, { excludeExtraneousValues: true });
const createdJob: Job = await this.jobService.createJob(jobDto);
res.status(201).json(createdJob);
} catch (error) {
next(error);
}
};
public getJobById = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const jobId = req.params.id;
const job: Job = await this.jobService.findJobById(jobId);
const responseStatus = job.status === 'done' ? 200 : 202;
res.status(responseStatus).json(job);
} catch (error) {
next(error);
}
};
}
export default JobsController;
Also its worth noting that the response status code of [GET] /job/{id} is 202 when the conversion job is still in processing. See Asynchronous Request-Response Pattern for more information on this
In case of an error, the error is passed to the error middleware, which is the last middleware in our express middleware chain and it looks like this:
//error.middleware.ts
import { NextFunction, Request, Response } from 'express';
import { isBoom, Boom } from '@hapi/boom';
import { logger } from '../../common/utils/logger';
function errorMiddleware(error: Boom | Error, req: Request, res: Response, next: NextFunction) {
const statusCode: number = isBoom(error) ? error.output.statusCode : 500;
const errorMessage: string = isBoom(error) ? error.message : 'Something went wrong';
logger.error(`StatusCode : ${statusCode}, Message : ${error}`);
return res.status(statusCode).send(errorMessage);
}
export default errorMiddleware;
You might notice we imported a package called Boom, we will talk about it later in the Services section
Services
Job Service
The JobService has all the business logic and access to the Data Access layer as well as communicating with the RabbitMQ Service to dispatch events to the queue
//jobs.service.ts
import * as Boom from '@hapi/boom';
import Container from 'typedi';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import EventEmitter from '../common/utils/eventEmitter';
import { Job } from '../entities/jobs.entity';
import RabbitMQService from './rabbitmq.service';
class JobsService {
private events = {
JobCreated: 'JobCreated',
};
constructor() {
this.intiializeEvents();
}
private intiializeEvents() {
EventEmitter.on(this.events.JobCreated, (job: Job) => {
const rabbitMQInstance = Container.get(RabbitMQService);
rabbitMQInstance.sendToQueue(JSON.stringify(job));
});
}
public async findJobById(jobId: string): Promise<Job> {
const job: Job = await Job.findOne(jobId);
if (!job) throw Boom.notFound();
return job;
}
public async createJob(jobDto: CreateJobDto): Promise<Job> {
const createdJob: Job = await Job.save({ ...jobDto, youtubeId: jobDto.youtubeUrl.split('v=')[1]?.slice(0, 11), status: 'pending' } as Job);
EventEmitter.emit(this.events.JobCreated, createdJob);
return createdJob;
}
}
export default JobsService;
Right off the bat you can see two imports you may be unfamiliar with, we will quickly go through them and then explain each function in this class in details.
-
Boom
- Used to create http objects with a powerful, simple and friendly interface. you can see how easy it was throwing a 404 Not Found Error object
-
typedi
- TypeDI is a powerful dependency injection package that has many features. One of this features is having Singleton Services which is how we use it in our case.
Now lets go into more details into some functions in the class
intiializeEvents()
This function uses a global EventEmitter we use globally across the project to add a pub/sub layer. its as simple as
//eventEmitter.ts
import { EventEmitter } from 'events';
export default new EventEmitter();
and now we can start listening on events specifically an event we will emit later when creating a new job called 'JobCreated'
// Defines all the events in our service
private events = {
JobCreated: 'JobCreated',
};
private intiializeEvents() {
// Start listening for the event 'JobCreated'
EventEmitter.on(this.events.JobCreated, (job: Job) => {
// Get a singleton instance of our RabbitMQService
const rabbitMQInstance = Container.get(RabbitMQService);
// Dispatch an event containing the data of the created job
rabbitMQInstance.sendToQueue(JSON.stringify(job));
});
}
See more information on Adding A Pub/Sub layer To Your Express Backend
createJob()
This function does exactly two things.
- Creating a new job document in the database
- Dispatching an event 'JobCreated' that a new job has been created, that way the event listener will handle the logic of dispatching that event to the RabbitMQ Service
RabbitMQ Service
This service is responsible for connecting to the RabbitMQ Server, creating a channel and initializing the queue which will be used for producing tasks (will be consumed by our service worker).
amqplib is used to as the client for our RabbitMQ Server
//rabbitmq.service.ts
import { Service } from 'typedi';
import amqp, { Channel, Connection } from 'amqplib';
import { logger } from '../common/utils/logger';
@Service()
export default class RabbitMQService {
private connection: Connection;
private channel: Channel;
private queueName = 'ytgif-jobs';
constructor() {
this.initializeService();
}
private async initializeService() {
try {
await this.initializeConnection();
await this.initializeChannel();
await this.initializeQueues();
} catch (err) {
logger.error(err);
}
}
private async initializeConnection() {
try {
this.connection = await amqp.connect(process.env.NODE_ENV === 'production' ? process.env.RABBITMQ_PROD : process.env.RABBITMQ_DEV);
logger.info('Connected to RabbitMQ Server');
} catch (err) {
throw err;
}
}
private async initializeChannel() {
try {
this.channel = await this.connection.createChannel();
logger.info('Created RabbitMQ Channel');
} catch (err) {
throw err;
}
}
private async initializeQueues() {
try {
await this.channel.assertQueue(this.queueName, {
durable: true,
});
logger.info('Initialized RabbitMQ Queues');
} catch (err) {
throw err;
}
}
public async sendToQueue(message: string) {
this.channel.sendToQueue(this.queueName, Buffer.from(message), {
persistent: true,
});
logger.info(`sent: ${message} to queue ${this.queueName}`);
}
}
The code for bootstrapping the connection/channels/queues is pretty standard and you can find references to these functons on RabbitMQ Docs or anqplib docs. The one function that we will need to use from outside this class is sendToQueue() which is used to dispatch a message to our task queue as seen in the JobService by dispatching a stringified Job object.
rabbitMQInstance.sendToQueue(JSON.stringify(job));
We now only need to initialize the RabbitMQ Service at the start of our app like this
import Container from 'typedi';
// Call initializeRabbitMQ() somewhere when starting the app
private initializeRabbitMQ() {
Container.get(RabbitMqService);
}
Now the job of our backend service is done and all that is left is for the node service worker to consume the task queue and do the actual GIF conversion.
Remember that the full source code can be viewed on the github repository
In the next part of the series we will see how we can implement a node service worker that will consume the task queue and do the actual Youtube to GIF conversion.
Posted on February 14, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.