Node JS Custom Streams Part 2

thebridge2

theBridge2

Posted on March 28, 2023

Node JS Custom Streams Part 2

If you haven't already read it, see Part 1.

Advanced example - custom protocol

In small file transfers, you can simply send a packet of data or an object of data and parse it in a single chunk. Generally this is sending some data with metadata attached, but it is overall very small.

The challenge comes when you are sending massive amounts of data with small amounts of metadata with it. In this setup it is impossible to store all the data in RAM once received and process it all at once. The processing must be broken down into chunks.

The key question is: how do I know where to split up this almost endless stream of data to properly process it? The answer is: by implementing a custom protocol.

NodeJS streams can send objects or binary/strings. When we are dealing with large amounts of data, the best answer is to send it via binary because binary is the most efficient way to send data.

See my blog post on binary.

Setting it up

For this example we are going to continue with the example we were using in my previous post. We are going to send a large file using packets of data in a custom protocol we define. In the custom protocol we are going to include a hash value of each data chunk automatically verified with each chunk instead of the manual process we were doing before.

In order to make custom protocols work, typically the approach is to packetize the data into a chunk of bytes that contain a header, body, and footer.

Image description

The header and footer are typically fixed byte-length metadata fields that contain a field with the packet length and/or the data length.

Since we want this protocol to be able to handle any size of data and the metadata length field has to be fixed, we have to predetermine the maximum length of data sent in any packet and thus predetermine the maximum packet length.

To make sure we can identify the start of the packet, usually a unique number is chosen as key that is unlikely to come up randomly in the data.

Image description

In the footer we placed the hash key that we are going to use to verify each chunk was sent correctly. We also placed an end key for good measure.

Implementing the custom protocol

To implement the custom protocol, we are going to create a transform stream that takes in chunks of file data and spits out a packet to be sent to a socket. On the other side that will be received by a net server object that then parses the data with a transform stream and sends to file via a writable stream.

Image description

Since the default high water mark for NodeJS readable streams is 65536, lets keep our maximum packet length to 65536. This way our largest packets are still sent in a single chunk through the streams.

To do this, we need to adjust the high water mark for our readable stream to be 65536 - our total length of fixed meta data fields.

Here are the lengths we are going to use for our fields:

Packet Section Item Length
Header Start Key 8 bytes
Header Data length 8 bytes
Data Data Variable length
Footer Hash 40 bytes
Footer End Key 8 bytes

So 65536 - 64 = 65472. We will limit the size of the file chunks read at a time to be 65472 bytes. To limit this we will use this line of code:

const input = fs.createReadStream('/test.mp4',{highWaterMark: 65472})
Enter fullscreen mode Exit fullscreen mode

By adjusting this high water mark, the nodeJs readable stream will never output any more than 65472 bytes when reading the test.mp4 file out to a single chunk.

Here it is integrated into the client code.

client.js

// client

//setup read file
var fs = require('fs');

const MY_HIGH_WATER_MARK= 65472;

const input = fs.createReadStream('./testMp4.mp4',{highWaterMark: MY_HIGH_WATER_MARK});

//setup transform stream
var CustomTransform = require('./toPacketTransformStream.js');
transformStream = new CustomTransform();

//Setup connection to server
var fs = require('fs');
var socket = require('net').Socket();
socket.connect(8082);

//Connect everything together!
input.pipe(transformStream).pipe(socket)


input.on('close',function(){    
    console.log('end');    
    socket.end();
})

Enter fullscreen mode Exit fullscreen mode

Now we need to create the packet in our toPacketTransformStream.js file. Like before, we will use the stream and crypto libraries. The only thing really new in this file compared to my previous post is the addition of some Buffer use and manipulation.

Lets use the key DEADBEEF for the packet startkey which is 8 bytes and FEEBDAED for the end key which is also 8 bytes.

Lastly, the hash value from nodeJs' crypto module will output a fixed value depending on the algorithm. For the nodeJS crypto sha1 algorithm we are using in this case, the output is 40 bytes.

toPacketTransformStream.js

const stream = require('stream');
const crypto = require('crypto');

const PACKET_START_KEY = 'DEADBEEF';
const PACKET_END_KEY = 'FEEBDAED';
const FIELD_SIZE_BYTES_MAX_DATA_LENGTH = 8;
const FIELD_SIZE_BYTES_HASH = 40;

/* PACKET STRUCTURE
8 bytes - start key
8 bytes - data length
variable - data
40 bytes - hash
8 bytes - end key
*/


class CustomTransform extends stream.Transform{
    constructor(){
        super();
        this.hash = {};
        this.header = {};
        this.data = {};
        this.footer = {};  
        this.totalByteCtr = 0;     
        this.chunkTxCount = 0; 
    }
    _transform(chunk,encoding,callback){
        var obj = this;       
        obj.chunkTxCount++;    

        obj.totalByteCtr += chunk.length;
        obj.data = chunk;
        obj.header = obj.makeHeader();            
        obj.footer = obj.makeFooter();

        var packet = obj.makePacket();        
        obj.push(packet);
        callback();
    }
    makePacket(){
        var obj = this;
        var bufferArr = [obj.header,obj.data,obj.footer];        
        var packet = Buffer.concat(bufferArr);
        return packet;
    }

    makeHeader(){
        var obj = this;

        var startKeyBuffer = Buffer.from(PACKET_START_KEY); 
        var dataLenBuffer = Buffer.alloc(FIELD_SIZE_BYTES_MAX_DATA_LENGTH)
        dataLenBuffer.writeUInt32BE(obj.data.length);

        var bufferArr = [startKeyBuffer, dataLenBuffer];

        var header = Buffer.concat(bufferArr);        

        return header;
    }

    getFileChunkHash(){
        var obj = this;

        obj.hash = crypto.createHash('sha1');
        obj.hash.setEncoding('hex');

        obj.hash.update(obj.data); //this is synchronous.     
        obj.hash.end();
        var hashVal = obj.hash.read();           
        return hashVal;
    }       

    makeFooter(){
        var obj = this;
        var hashVal = obj.getFileChunkHash();
        var hashBuffer = Buffer.alloc(FIELD_SIZE_BYTES_HASH);        
        hashBuffer.write(hashVal.toString());

        var endKeyBuffer = Buffer.from(PACKET_END_KEY);

        var bufferArr = [hashBuffer,endKeyBuffer];
        var footer = Buffer.concat(bufferArr);

        return footer;
    }
}


module.exports = CustomTransform;

Enter fullscreen mode Exit fullscreen mode

Every time the transform stream receives a new chunk of data the _transform method is called. Then we call the make header, footer and packet ultimately pushing the created packet out to the next item which consumes it. In this case the next item to consume the data is the socket which then sends the data from the client to the server.

Our server code is fairly similar to our previous post. The key differences here are to simply print some debug items at the end of receiving a file.

server.js

var outFile = 'testOut.mp4';
var fs = require('fs');
const net = require('net');


//setup transform stream
var CustomTransform = require('./serverDecodePacketTransform.js');
transformStream = new CustomTransform();



var writeStream = fs.createWriteStream(outFile);

writeStream.on('error',function(err){
  console.log(err);
})

net.createServer(function (socket) {
  console.log("connected");

  socket.pipe(transformStream).pipe(writeStream);

  socket.on('data',function(data){
    console.log(data.length)
  })



  transformStream.on('close',function(){
    console.log("transform stream closed!");
    writeStream.destroy()
    console.log("chunks rx'd: " + transformStream.chunkRxCount + " received: " + transformStream.fileDataReceivedBytes + " bytes");
  })

  transformStream.on('error',function(err){
    console.log(err);
  })

  socket.on('close',function(){
    console.log('socket has closed!')
  })

})
.listen(8082);


Enter fullscreen mode Exit fullscreen mode

Note on events - Initially I had my writeStream.destroy() call triggered by my socket.on('finish') event. When you run this though, you will see that the socket gets marked as finished before the transform stream emits closed. What happens as a result if you destroy your write stream when your socket gets closed, you will not process and write the last chunk received. This is why the code above doesn't destroy the writeStream to file until the transform stream emits close. At this point we know the transform stream is done processing and we are safe to close up.

Now for the last part, we need to decode the bytes received by the server into the packet.

This code creates a new hash with each chunk received, looks for a start key (not very robust approach yet), and then parses the data. From the parsed data we compute the hash of the file data received with the value sent in the footer. If they don't match we emit an error and everything stops. Otherwise, we push it out of this transform stream and it gets written to the file via our write stream in the server.js file.

serverDecodePacketTransform.js

const stream = require('stream');
const crypto = require('crypto');

const PACKET_START_KEY = 'DEADBEEF';
const PACKET_END_KEY = 'FEEBDAED';
const FIELD_SIZE_BYTES_MAX_DATA_LENGTH = 8;
const FIELD_SIZE_BYTES_HASH = 40;

class DecodePacketTransform extends stream.Transform{
    constructor(){
        super();
        this.rxData = {};
        this.fileDataReceivedBytes = 0;
        this.chunkRxCount = 0;
    }
    initializeNewHash(){
        this.hash = crypto.createHash('sha1');
        this.hash.setEncoding('hex');
        this.leftoversBuffer = Buffer.alloc(0);
    }
    _transform(chunk,encoding,callback){
        var obj = this;      
        obj.chunkRxCount++;

        obj.initializeNewHash(); //computing new hash for each chunk.



        obj.rxData = Buffer.concat([obj.leftoversBuffer,chunk]);        

        if (obj.foundStartKey()){

            var parsedData = obj.parse();

            obj.hash.update(parsedData.dataChunk); 

            obj.hash.end();

            var ourComputedHash = obj.hash.read();        
            var sentHash = parsedData.sentFileChunkHashVal.toString();

            if(ourComputedHash !== sentHash ){
                obj.emit('error',new Error('Data mismatch in hash! ' + ourComputedHash + ' != ' + sentHash))                
            }else{
                obj.fileDataReceivedBytes += parsedData.dataChunk.length;
                obj.push(parsedData.dataChunk);
            }  

        }else{ 
            console.log("start key not found!");
            obj.leftovers = obj.rxData;
        }

        callback();
    }    
    foundStartKey(){
        var obj = this;
        var startIdx = 0;
        var endIdx = PACKET_START_KEY.length;
        var tmp = obj.rxData.toString('utf8',startIdx,endIdx)

        if(tmp === PACKET_START_KEY){
            return true;
        }else{
            return false;
        }
    }  
    parse(){
        var obj = this;

        var numBytesInData = obj.rxData.readInt32BE(PACKET_START_KEY.length);            

        var dataStartByteIdx = PACKET_START_KEY.length + FIELD_SIZE_BYTES_MAX_DATA_LENGTH;
        var dataEndByteIdx = dataStartByteIdx + numBytesInData;

        var dataChunk = obj.rxData.subarray(dataStartByteIdx,dataEndByteIdx);

        var startByteIdx = dataEndByteIdx ; 
        var endByteIdx = startByteIdx + FIELD_SIZE_BYTES_HASH;
        var sentFileChunkHashVal = obj.rxData.subarray(startByteIdx,endByteIdx);
        var parsed={
            numBytesInData: numBytesInData,
            dataChunk: dataChunk,
            sentFileChunkHashVal: sentFileChunkHashVal
        }
        return parsed;
    }  
}

module.exports = DecodePacketTransform;
Enter fullscreen mode Exit fullscreen mode

Probably the least straightforward pieces to this relate to the nodeJS Buffer object. We are using:

-Buffer.readInt32BE
-Buffer.subarray
-Buffer.toString
-Buffer.concat

Take a look at my binary blog if you aren't sure how this works.

Note - we didn't actually check for the end key. This is not quite production ready, but worked to prove the example of sending a ~1 GB file successfully.

Summary

In this part 2, we demonstrated a much more practical example of how to use NodeJS streams with read streams, write streams, transform streams, events, and a custom protocol that utilizes the nodeJS Buffer object heavily.

Hopefully this gives you a useful practical reference and exposes more of the power of NodeJS custom streams for your future use!

💖 💪 🙅 🚩
thebridge2
theBridge2

Posted on March 28, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related