mkadirtan
Posted on January 2, 2023
Cover Photo by Enrico Mantegazza
In NestJS, HTTP requests are traced by req.id
parameter. You can write your custom id generator, or use default id generated by NestJS or Express or Fastify. However, there is no such concept of request id, or request tracing when it comes to Websockets / Socket.IO.
Though NestJS allows use of interceptors, filters or guards in Gateways
, one feature I've felt missing is request id. Without request id, you can't trace websocket events and that is not very good for observability.
In this post, I will explain how to trace websocket events in NestJS. For this, we will create a custom WebSocketAdapter
. In doing so, we will dive deep into source code of NestJS and solve some challenges!
What is a WebSocketAdapter?
NestJS makes use of Adapter Pattern to abstract underlying libraries from your code. For example NestJS doesn't care if you are using Fastify or Express as an HTTP library. If you want to use another library, you just need to implement an adapter defined by NestJS.
There are various websocket libraries in Node.js, and also there is Socket.IO, which makes things easier for developers. In this post, we will modify IoAdapter
, from the existing official NestJS package: @nestjs/platform-socket.io.
Socket.IO is a bidirectional communication protocol, than can work over HTTP or Websockets. We will use it as an example websocket library in this post, but actual details of the Socket.IO protocol is out of scope for this post. I'm planning to publish an article about how Socket.IO works, if you don't want to miss out, subscribe to Noop Today!
How to Write a WebSocketAdapter in NestJS
To create a custom WebSocketAdapter
in NestJS, you need to implement WebSocketAdapter
interface from @nestjs/common.
// https://github.com/nestjs/nest/blob/master/packages/common/interfaces/websockets/web-socket-adapter.interface.ts
export interface WebSocketAdapter<TServer = any, TClient = any, TOptions = any> {
create(port: number, options?: TOptions): TServer;
bindClientConnect(server: TServer, callback: Function): any;
bindClientDisconnect?(client: TClient, callback: Function): any;
bindMessageHandlers(client: TClient, handlers: WsMessageHandler[], transform: (data: any) => Observable<any>): any;
close(server: TServer): any;
}
In a typical Node.js application involving websockets / socket.io, you probably have the same functionality in a different way. Since we are using NestJS as our framework, lets see how we should structure our code.
A typical socket.io server in Node.js
const express = require('express');
const app = express();
const http = require('http');
const server = http.createServer(app);
const { Server } = require("socket.io");
// create in WebSocketAdapter
const io = new Server(server);
// bindClientConnect in WebSocketAdapter
io.on('connection', (socket) => {
console.log('handleConnection');
// bindClientDisconnect in WebSocketAdapter
socket.on('disconnect', () => {
console.log('handleDisconnect');
})
// bindMessageHandlers in WebSocketAdapter
socket.on('foo', (data, callback) => {
console.log('handleFoo');
callback('OK');
})
});
server.listen(3000, () => {
console.log('listening on *:3000');
});
// close in WebSocketAdapter
setTimeout(() => { io.close() }, 10000);
NestJS provides an AbstractWsAdapter
class to make things easier for us. So instead of this:
import { WebSocketAdapter } from '@nestjs/common';
export class MyWebSocketAdapter implements WebSocketAdapter {}
We can do this:
import { AbstractWsAdapter } from '@nestjs/websockets';
export class MyWebsocketAdapter extends AbstractWsAdapter {}
The difference is that, AbstractWsAdapter
provides default implementations for some methods, but you still need to implement create
and bindMessageHandlers
methods.
We will act smart and use existing code from the official package, and make modifications only in required parts.
Use Existing IoAdapter from @nestjs/platform-socket.io
This is the IoAdapter
from NestJS official packages, that acts as a bridge between NestJS and Socket.IO.
// https://github.com/nestjs/nest/blob/master/packages/platform-socket.io/adapters/io-adapter.ts
import { isFunction, isNil } from '@nestjs/common/utils/shared.utils';
import {
AbstractWsAdapter,
MessageMappingProperties,
} from '@nestjs/websockets';
import { DISCONNECT_EVENT } from '@nestjs/websockets/constants';
import { fromEvent, Observable } from 'rxjs';
import { filter, first, map, mergeMap, share, takeUntil } from 'rxjs/operators';
import { Server, ServerOptions, Socket } from 'socket.io';
export class IoAdapter extends AbstractWsAdapter {
public create(
port: number,
options?: ServerOptions & { namespace?: string; server?: any },
): Server {
if (!options) {
return this.createIOServer(port);
}
const { namespace, server, ...opt } = options;
return server && isFunction(server.of)
? server.of(namespace)
: namespace
? this.createIOServer(port, opt).of(namespace)
: this.createIOServer(port, opt);
}
public createIOServer(port: number, options?: any): any {
if (this.httpServer && port === 0) {
return new Server(this.httpServer, options);
}
return new Server(port, options);
}
public bindMessageHandlers(
socket: Socket,
handlers: MessageMappingProperties[],
transform: (data: any) => Observable<any>,
) {
const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
share(),
first(),
);
handlers.forEach(({ message, callback }) => {
const source$ = fromEvent(socket, message).pipe(
mergeMap((payload: any) => {
const { data, ack } = this.mapPayload(payload);
return transform(callback(data, ack)).pipe(
filter((response: any) => !isNil(response)),
map((response: any) => [response, ack]),
);
}),
takeUntil(disconnect$),
);
source$.subscribe(([response, ack]) => {
if (response.event) {
return socket.emit(response.event, response.data);
}
isFunction(ack) && ack(response);
});
});
}
public mapPayload(payload: unknown): { data: any; ack?: Function } {
if (!Array.isArray(payload)) {
if (isFunction(payload)) {
return { data: undefined, ack: payload as Function };
}
return { data: payload };
}
const lastElement = payload[payload.length - 1];
const isAck = isFunction(lastElement);
if (isAck) {
const size = payload.length - 1;
return {
data: size === 1 ? payload[0] : payload.slice(0, size),
ack: lastElement,
};
}
return { data: payload };
}
}
Let's see what does all the methods do:
-
create
method is responsible for creating the socket.io server, we don't need to modify here. -
createIoServer
is an helper method forcreate
-
bindMessageHandlers
is responsible for delivering socket.io messages to our handlers. We will modify this for sending extra arguments to our handlers. -
mapPayload
parses payload from socket.io to{ ack, data }
, we don't need to modify here.
So, we just need to focus on bindMessageHandlers
.
Modifying The bindMessageHandlers
public bindMessageHandlers(
socket: Socket,
handlers: MessageMappingProperties[],
transform: (data: any) => Observable<any>,
) {
const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
share(),
first(),
);
handlers.forEach(({ message, callback }) => {
const source$ = fromEvent(socket, message).pipe(
mergeMap((payload: any) => {
const { data, ack } = this.mapPayload(payload);
return transform(callback(data, ack)).pipe(
filter((response: any) => !isNil(response)),
map((response: any) => [response, ack]),
);
}),
takeUntil(disconnect$),
);
source$.subscribe(([response, ack]) => {
if (response.event) {
return socket.emit(response.event, response.data);
}
isFunction(ack) && ack(response);
});
});
}
First, lets understand what is handlers
in this. It has the type: MessageMappingProperties
:
export interface MessageMappingProperties {
message: any;
methodName: string;
callback: (...args: any[]) => Observable<any> | Promise<any> | any;
}
In socket.io terms: socket.on(message, callback)
, message will match the event name, and callback will be the actual handler function.
So, in order to pass parameters to our handler methods, we must pass parameters to the **callback**
.
Lets take a closer look at code with additional commentary:
// message --> eventName
// callback --> eventHandler
handlers.forEach(({ message, callback }) => {
// RxJS equivelant for --> socket.on(message)
const source$ = fromEvent(socket, message).pipe(
// payload is socket.io payload
mergeMap((payload: any) => {
const { data, ack } = this.mapPayload(payload);
// transform, transforms our callback handler to observable
// callback function is called with TWO parameters
return transform(callback(data, ack)).pipe(
filter((response: any) => !isNil(response)),
// If callback returns any response, wrap it with ack
map((response: any) => [response, ack]),
);
}),
takeUntil(disconnect$),
);
source$.subscribe(([response, ack]) => {
// If you return { event: 'foo', data: 'bar' } from handler
// See: https://docs.nestjs.com/websockets/gateways#multiple-responses
if (response.event) {
return socket.emit(response.event, response.data);
}
// If client accepts response, send response
isFunction(ack) && ack(response);
});
});
A Little Problem in NestJS
If you've ever used IoAdapter in your project, you might realize even though the adapter passes two arguments to the callback function, your handler methods don't receive them in full. Let me explain:
Say you have a message handler:
class MySocketGateway {
@SubscribeMessage('foo')
fooHandler(argument1, argument2, argument3){
console.log({
argument1, // Client
argument2, // Body
argument3, // undefined
})
}
}
If you go ahead and try something like this, you will see that 1st argument is Socket / Client object, 2nd argument is body BUT 3rd argument is undefined. Two questions come to mind:
- Where did 1st argument came from?
- Where did my ack function disappear?
Lets go with the first question, if we dive deep into underlying NestJS code, we can find the exact place where adapter.bindMessageHandlers
is called.
// https://github.com/nestjs/nest/blob/21bd8c37364a2a2591e3de9bfb88d32d09431438/packages/websockets/web-sockets-controller.ts#L147
public subscribeMessages<T = any>(
subscribersMap: MessageMappingProperties[],
client: T,
instance: NestGateway,
) {
const adapter = this.config.getIoAdapter();
const handlers = subscribersMap.map(({ callback, message }) => ({
message,
callback: callback.bind(instance, client),
}));
adapter.bindMessageHandlers(client, handlers, data =>
fromPromise(this.pickResult(data)).pipe(mergeAll()),
);
}
If you look closely, callback is bound with two parameters. bind
method has one required argument for this
, and rest of the arguments are supplied as arguments to the function - *callback. *More info about this in mozilla docs. So, our 1st argument is fixed by NestJS to be client object.
How about ack function, why does it disapper?
This was very hard for me to debug but I've found why does that happen, and how you can fix it!
The problem is not about the ack function. I've tried sending different parameters, or sending more than two parameters to callback function but only the first parameter ends up in handler, and rest is gone.
There is a helper class [WsContextCreator](https://github.com/nestjs/nest/blob/aa3ad07c1023b71edda6b6ea53374787d3712231/packages/websockets/context/ws-context-creator.ts#L56)
in @nestjs/websockets
, that is responsible for attaching Interceptors
, Guards
, Pipes
and ExceptionHandlers
to handlers, AND managing parameters sent to handler methods!
That helper class is responsible for disappearing parameters! The reason it makes other parameters disappear is related to this use case:
class MyWebsocketGateway {
@SubscribeMessage('foo')
handleFoo(client: Client, body){
console.log({ body });
}
@SubscribeMessage('bar')
handleBar(@MessageBody() body){
console.log({ body })
}
}
If you want to decorate your handler parameters with @MessageBody
or @ConnectedSocket
, the WsContextCreator
assigns those parameters for you. You can even do experiments like this:
class MyWebsocketGateway {
@SubscribeMessage('foo')
handleFoo(arg1, arg2, arg3, @MessageBody() body){
console.log({ arg1, arg2, arg3 }) // All undefined
console.log({ body })
}
}
I won't go into line by line details of how does this happen, but I will provide you a conceptual explanation and if you want to investigate more, you can always look at source code of @nestjs/websockets for a good exercise.
The key point is WsContextCreator
extracts data from initial arguments to the callback function, rearranges them and sends those arguments to your handler function. During the extraction process, WsContextCreator
looks up metadata of the handler function:
// https://github.com/nestjs/nest/blob/aa3ad07c1023b71edda6b6ea53374787d3712231/packages/websockets/context/ws-context-creator.ts#L163
const metadata = this.contextUtils.reflectCallbackMetadata<TMetadata>(
instance,
methodName,
PARAM_ARGS_METADATA,
) || DEFAULT_CALLBACK_METADATA;
If your function doesn't have PARAM_ARGS_METADATA
, NestJS assigns default callback metadata. Guess what, DEFAULT_CALLBACK_METADATA
assigns only two parameters to our handler parameters - client and payload.
// https://github.com/nestjs/nest/blob/master/packages/websockets/context/ws-metadata-constants.ts
import { WsParamtype } from '../enums/ws-paramtype.enum';
export const DEFAULT_CALLBACK_METADATA = {
[`${WsParamtype.PAYLOAD}:1`]: { index: 1, data: undefined, pipes: [] },
[`${WsParamtype.SOCKET}:0`]: { index: 0, data: undefined, pipes: [] },
};
Solution for Providing Additional Parameters to Handlers
NestJS exports an undocumented helper function called assignCustomParameterMetadata
, which allows us to override DEFAULT_CALLBACK_METADATA
and use our own metadata.
// https://github.com/nestjs/nest/blob/5aeb40b/packages/common/utils/assign-custom-metadata.util.ts#L9
export function assignCustomParameterMetadata(
args: Record<number, RouteParamMetadata>,
paramtype: number | string,
index: number,
factory: CustomParamFactory,
data?: ParamData,
...pipes: (Type<PipeTransform> | PipeTransform)[]
) {
return {
...args,
[`${paramtype}${CUSTOM_ROUTE_AGRS_METADATA}:${index}`]: {
index,
factory,
data,
pipes,
},
};
}
The function is not self-explanatory, but I think it will be much more clear after example below. Important thing is "where should we use this function?".
This function is meant to modify PARAM_ARGS_METADATA
, and normally that metadata is defined by decorators such as @ConnectedSocket
or @MessageBody
. So, lets comply with the conventions and create ourselves a new parameter decorator: @Ack
.
Creating the @Ack Decorator
If you are not familiar with the concept of decorators, or decorators in NestJS, you can read my previous article: Using Custom Decorators in NestJS.
import { PARAM_ARGS_METADATA } from '@nestjs/websockets/constants';
export function Ack(): ParameterDecorator {
// index is the index of parameter we decorated
// It is not related to getArgByIndex below!
return function(target, key, index) {
// Get existing metadata of the handler
const args = Reflect.getMetadata(PARAM_ARGS_METADATA, target.constructor, key) || {};
// Extend with new metadata
const meta = assignCustomParameterMetadata(args, 'Ack', index, (data, input: ExecutionContext) => {
// This allows NestJS to extract required parameter from initial arguments supplied to 'callback' function
// Index here needs to match index of the callback parameters
// 0 --> Client
// 1 --> Payload from IoAdapter
// 2 --> Ack from IoAdapter
// 0 is always client, but rest of the parameters depend on the underlying adapter.
return input.getArgByIndex<Function>(2);
});
Reflect.defineMetadata(PARAM_ARGS_METADATA, meta, target.constructor, key);
};
}
Now we can reach our ack
function from the handler as follows:
class MyWebSocketGateway {
@SubscribeMessage('foo')
fooHandler(@MessageBody() body, @Ack() ack){
ack('foo_response');
}
}
Lets Move On With RequestId
That really was a case study, isn't it? Lets add a request id to our handler parameters in case we want to trace our requests.
First we need to modify our adapter, if you remember from above, the adapter supplies arguments to the callback function. We can create a request id in our adapter and supply it to the callback.
// You can make your own implementation for this.
private createRequestId(){
return crypto.randomUUID();
}
public bindMessageHandlers(
socket: Socket,
handlers: MessageMappingProperties[],
transform: (data: any) => Observable<any>,
) {
const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
share(),
first(),
);
handlers.forEach(({ message, callback }) => {
const source$ = fromEvent(socket, message).pipe(
mergeMap((payload: any) => {
const { data, ack } = this.mapPayload(payload);
// <-- Modified Section Start
const requestId = this.createRequestId();
return transform(callback(data, ack, requestId))
// Modified Section End -->
.pipe(
filter((response: any) => !isNil(response)),
map((response: any) => [response, ack]),
);
}),
takeUntil(disconnect$),
);
source$.subscribe(([response, ack]) => {
if (response.event) {
return socket.emit(response.event, response.data);
}
isFunction(ack) && ack(response);
});
});
}
That is all the modification our adapter needs. Now we can create another decorator: @RequestId
import { PARAM_ARGS_METADATA } from '@nestjs/websockets/constants';
export function RequestId(): ParameterDecorator {
return function(target, key, index) {
const args = Reflect.getMetadata(PARAM_ARGS_METADATA, target.constructor, key) || {};
const meta = assignCustomParameterMetadata(args, 'RequestId', index, (data, input: ExecutionContext) => {
return input.getArgByIndex<string>(3);
});
Reflect.defineMetadata(PARAM_ARGS_METADATA, meta, target.constructor, key);
};
}
That is exactly the same decorator with @Ack
with only difference being return input.getArgByIndex(3);
. Now our handler function looks like this:
class MyWebSocketGateway {
@SubscribeMessage('foo')
fooHandler(@MessageBody() body, @Ack() ack, @RequestId() requestId){
console.log('Received request with id: ', requestId);
ack('foo_response');
}
}
Next Steps
If you don't want to deal with all of these, I'm currently working on an npm package that has better developer ergonomics compared to official @nestjs/platform-socket.io package. I'm aware that decorating every parameter in handler functions can be ugly and tiring to eyes. I believe there are more beautiful solutions to this problem. I'm trying to figure out which solution would result in better developer experience.
I would like to hear your opinions about this. Whether you suggest a usage example, or you face other problems with official package, please let me know what you think in the comments.
Also, if you don't want to miss out on articles about NestJS and programming in general subscribe to Noop Today. I hope you learned something new today, see you in the upcoming posts!
Here is the repo that contains TraceableIoAdapter
with example code: https://github.com/nooptoday/nestjs-trace-websocket-events
Posted on January 2, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.