How to Instantly Track Message Broker Job Status Within the Same Request With NestJS and RabbitMQ
Lucas Silva
Posted on June 4, 2024
Suppose you have a distributed job system. When you create a job to be processed by workers, you want to return the result status if the job completes within an interval of 500 milliseconds or up to 10 seconds. This can be a challenging task. However, NestJS Microservices offer an easy way to achieve this.
Let's say you have a basic NestJS Microservice setup with an API and a RabbitMQ consumer. The project setup configuration would look like this:
Setting Up NestJS Microservices
// main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrapHttp() {
const app = await NestFactory.create(AppModule);
await app.listen(3000);
}
async function bootstrapConsumer() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false,
},
},
},
);
await app.listen();
}
if (require.main === module) {
if (process.env.MODE === 'consumer') {
bootstrapConsumer();
} else if (process.env.MODE === 'http') {
bootstrapHttp();
} else {
throw new Error('Invalid mode');
}
}
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: true,
queueOptions: {
durable: false,
},
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Implementing the Controller
Initially, the controller might look like this:
// app.controller.ts
import { Controller, Get, Inject } from '@nestjs/common';
import { ClientProxy, MessagePattern, Payload } from '@nestjs/microservices';
type EnqueuedJob = {
status: 'enqueued';
jobId: string;
};
type ProcessedJob = {
status: 'processed';
jobId: string;
};
type JobState = EnqueuedJob | ProcessedJob;
const sleep = async (ms: number) => {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
};
const generateRandomId = (): string => {
return Math.random().toString(36).substring(7);
};
@Controller()
export class AppController {
constructor(
@Inject('JOB_QUEUE_SERVICE') private readonly client: ClientProxy,
) {}
@Get()
createJob(): JobState {
const jobId = generateRandomId();
this.client.send('process_job', { jobId }).subscribe();
return {
status: 'enqueued',
jobId,
};
}
@Get('state/:jobId')
getJobState(jobId: string): JobState {
return {
status: 'processed',
jobId,
};
}
@MessagePattern('process_job')
async getNotifications(@Payload() data: { jobId: string }) {
console.log('Processing job', data.jobId);
await sleep(10000);
console.log('Processed job', data.jobId);
}
}
Running the Project
To run the project in consumer mode:
MODE=consumer yarn start
To run the project in HTTP mode:
MODE=http yarn start
Now you can create a job by sending a GET request to the / endpoint.
curl http://localhost:3000
With the subscribe function, the producer
service awaits the consumer to process the message and reply to the messages auto generated id, not the one we created early. Look:
The message at rabbitmq has a property called reply_to
. This property is set on runtime
at the producer, waiting for a signal
. NestJS then manages to use this unique string to send a message (the consumer functions's serialized return data) back to the producer, using the callback queue
which is named by the reply_to
property and the correlation_id
. The return message is then sent to the producer, and the producer can then handle the return message.
Modifying the Code for Synchronous Job Status Tracking
The createJob method is responsible for creating a job ID and sending it to the queue. The client then needs to poll the /state/ endpoint to check if the job is completed. However, using NestJS Microservices' ClientProxy interface, which is compatible with RxJS, we can handle callbacks more effectively.
Update the message sending code as follows:
this.client
.send('process_job', { jobId })
.subscribe({
next(value) {
console.log('Processed. Result:', value);
},
});
Tracking Job Status Within the Same Request
To track the job status within the same request, set a timeout. If the job is not completed within the timeout, return a status of enqueued. If the job is completed within the timeout, return a status of processed.
Update the createJob method:
import { Subject, lastValueFrom } from 'rxjs';
//...
@Get()
createJob() {
const jobId = generateRandomId();
const maxWaitTimeMs = 1000;
const jobStateSubject = new Subject<JobState>();
// Set a timeout to return 'enqueued' if the job is not completed within the timeout
const timeoutId = setTimeout(() => {
jobStateSubject.next({
status: 'enqueued',
jobId,
});
jobStateSubject.complete();
}, maxWaitTimeMs);
this.client
.send('process_job', { jobId })
.subscribe({
next(value) {
// Clear the timeout and return 'processed' once the job is completed
console.log('Received value', value);
clearTimeout(timeoutId);
jobStateSubject.next({
status: 'processed',
jobId,
});
jobStateSubject.complete();
},
});
return lastValueFrom(jobStateSubject);
}
You may need to adjust the timeout value based on your requirements.
Troubleshooting
The Producer Is Not Receiving the Response from the Consumer
Ensure that the consumer function returns a value. undefined is not valid. If you need to return nothing, return an empty object {} or null.
Posted on June 4, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.