Recover from Connection Errors and Timeouts in Rev AI Streaming Transcription Sessions

vikram_rev

Vikram Vaswani

Posted on May 9, 2022

Recover from Connection Errors and Timeouts in Rev AI Streaming Transcription Sessions

By Kyle Bridburg, Engineering Manager and Vikram Vaswani, Developer Advocate

This tutorial was originally published at https://docs.rev.ai/resources/tutorials/recover-connection-streaming-api/ on May 09, 2022.

Introduction

Rev AI's Streaming Speech-to-Text API enables real-time transcription for streaming audio. It works with all major English accents and includes key features such as punctuation, capitalization, speaker diarization, custom vocabulary and profanity filtering.

The Streaming Speech-to-Text API can be used with both WebSocket and RTMP streams, with a time limit of 3 hours per stream. While this is more than sufficient for most scenarios, there are cases where live streams can run longer than 3 hours - for example, live transcription of commentary for a day-long sporting event.

With Rev AI, the recommended practice when a stream approaches the 3-hour limit is to initialize a new concurrent WebSocket connection and switch to it. This sounds simple but in practice, application developers often struggle with implementing solutions that handle connection disruption correctly (whether due to session length timeouts or other network connectivity interruptions).

This tutorial proposes some solutions for the above challenge, with a view to helping developers implement better real-time transcription solutions for long-running audio streams.

Assumptions

This tutorial assumes that:

Key challenges

When integrating Rev AI live transcription with long-running audio streams, developers have to be cognizant of the following issues:

Connection disruption

Rev AI's Streaming Speech-to-Text API sets a time limit per stream of 3 hours. When a stream's 3-hour limit is reached, the streaming connection will be terminated by the API. Apart from this, the streaming connection may also be disrupted due to external factors, such as network failures or bandwidth limitations.

In both these cases, the application will need to initialize a new WebSocket connection and start a new streaming session. Once the new WebSocket connection is accepted and the connected message is received, the application can begin streaming audio to it.

Data loss

When reconnecting to the API for any of the reasons described above, there is invariably a period of time when audio data is produced, but not transferred to the API for transcription. It is important for the application developer to have a strategy in place to avoid losing this audio data during the connection recovery process.

In this case, the application will need to store the audio data in a buffer until such time as the connection to the API is re-established and the data can be sent for (delayed) transcription.

Timestamp corruption

Rev AI's transcripts include timestamps for every transcribed word. Timestamps correspond to when the words are spoken within the audio. Every (re)connection to the API is treated as a new connection, and audio is therefore timestamped starting from 00:00:00. However, re-aligning the timestamps correctly to the audio stream is a critical task when restarting an interrupted streaming session.

In this case, the application will need to provide a starting timestamp to offset all hypotheses timings by adding start_ts as a query parameter to the connection request. This will ensure that output hypotheses have their timestamps offset by the number of seconds provided in the start_ts parameter.

Technical approach

The following example can be used to configure a streaming client to transcribe a long-duration stream using a RAW-format audio file. It handles reconnects (whether due to session length timeouts or other connectivity interruption) without losing audio. It also re-aligns timestamp offsets to the new streaming session when reconnecting.

To use this example, replace the <FILEPATH> placeholder with the path to the audio file (RAW format) you wish to stream and the <REVAI_ACCESS_TOKEN> placeholder with your Rev AI account's access token.

const fs = require('fs');
const revai = require('revai-node-sdk');
const { Writable } = require('stream');

const token = '<REVAI_ACCESS_TOKEN>';
const filePath = '<FILEPATH>';
const bytesPerSample = 2;
const samplesPerSecond = 16000;
const chunkSize = 8000;

// initialize client with audio configuration and access token
const audioConfig = new revai.AudioConfig(
    /* contentType */ 'audio/x-raw',
    /* layout */      'interleaved',
    /* sample rate */ samplesPerSecond,
    /* format */      'S16LE',
    /* channels */    1
);

// optional config to be provided.
const sessionConfig = new revai.SessionConfig(
    metadata='example metadata', /* (optional) metadata */
    customVocabularyID=null,  /* (optional) custom_vocabulary_id */
    filterProfanity=false,    /* (optional) filter_profanity */
    removeDisfluencies=false, /* (optional) remove_disfluencies */
    deleteAfterSeconds=0,     /* (optional) delete_after_seconds */
    startTs=0,                /* (optional) start_ts */
    transcriber='machine',    /* (optional) transcriber */
    detailedPartials=false    /* (optional) detailed_partials */
);

// begin streaming session
let client = null;
let revaiStream = null;

let audioBackup = [];
let audioBackupCopy = [];
let newStream = true;
let lastResultEndTsReceived = 0.0;

function handleData(data) {
    switch (data.type){
        case 'connected':
            console.log("Received connected");
            break;
        case 'partial':
            console.log(`Partial: ${data.elements.map(x => x.value).join(' ')}`);
            break;
        case 'final':
            console.log(`Final: ${data.elements.map(x => x.value).join('')}`);
            const textElements = data.elements.filter(x => x.type === "text");
            lastResultEndTsReceived = textElements[textElements.length - 1].end_ts;
            console.log(lastResultEndTsReceived * samplesPerSecond * bytesPerSample / 1024);
            break;
        default:
            // all messages from the API are expected to be one of the previous types
            console.error('Received unexpected message');
            break;
    }
}

function startStream() {
    client = new revai.RevAiStreamingClient(token, audioConfig);

    // create event responses
    client.on('close', (code, reason) => {
        console.log(`Connection closed, ${code}: ${reason}`);
        if (code !== 1000 || reason == 'Reached max session lifetime'){
            console.log('Restarting stream');
            restartStream();
        }
        console.log(bytesWritten);
    });
    client.on('httpResponse', code => {
        console.log(`Streaming client received HTTP response with code: ${code}`);
    });
    client.on('connectFailed', error => {
        console.log(`Connection failed with error: ${error}`);
    });
    client.on('connect', connectionMessage => {
        console.log(`Connected with job ID: ${connectionMessage.id}`);
    });

    audioBackup = [];
    sessionConfig.startTs = lastResultEndTsReceived;

    revaiStream = client.start(sessionConfig);
    revaiStream.on('data', data => {
        handleData(data);
    });
    revaiStream.on('end', function () {
        console.log('End of stream');
    });
}

let bytesWritten = 0;

const audioInputStreamTransform = new Writable({
    write(chunk, encoding, next) {
        if (newStream && audioBackupCopy.length !== 0) {
            // approximate math to calculate time of chunks
            const bitsSent = lastResultEndTsReceived * samplesPerSecond * bytesPerSample;
            const chunksSent = Math.floor(bitsSent / chunkSize);
            if (chunksSent !== 0) {
                for (let i = chunksSent; i < audioBackupCopy.length; i++) {
                    revaiStream.write(audioBackupCopy[i][0], audioBackupCopy[i][1]);
                }
            }
            newStream = false;
        }

        audioBackup.push([chunk, encoding]);

        if (revaiStream) {
            revaiStream.write(chunk, encoding);
            bytesWritten += chunk.length;
        }

        next();
    },

    final() {
        if (client && revaiStream) {
            client.end();
            revaiStream.end();
        }
    }
});

function restartStream() {
    if (revaiStream) {
        client.end();
        revaiStream.end();
        revaiStream.removeListener('data', handleData);
        revaiStream = null;
    }

    audioBackupCopy = [];
    audioBackupCopy = audioBackup;

    newStream = true;

    startStream();
}

// read file from disk
let file = fs.createReadStream(filePath);

startStream();

file.on('end', () => {
    chunkInputTransform.end();
})

// array for data left over from chunking writes into chunks of 8000
let leftOverData = null;

const chunkInputTransform = new Writable({
    write(chunk, encoding, next) {
        if (encoding !== 'buffer'){
            console.log(`${encoding} is not buffer, writing directly`);
            audioInputStreamTransform.write(chunk, encoding);
        }
        else {
            let position = 0;

            if (leftOverData != null) {
                let audioChunk = Buffer.alloc(chunkSize);
                const copiedAmount = leftOverData.length;
                console.log(`${copiedAmount} left over, writing with next chunk`);
                leftOverData.copy(audioChunk);
                leftOverData = null;
                chunk.copy(audioChunk, chunkSize - copiedAmount);
                position += chunkSize - copiedAmount;
                audioInputStreamTransform.write(audioChunk, encoding);
            }

            while(chunk.length - position > chunkSize) {
                console.log(`${chunk.length - position} bytes left in chunk, writing with next audioChunk`);
                let audioChunk = Buffer.alloc(chunkSize);
                chunk.copy(audioChunk, 0, position, position+chunkSize);
                position += chunkSize;
                audioInputStreamTransform.write(audioChunk, encoding);
            }

            if (chunk.length > 0) {
                leftOverData = Buffer.alloc(chunk.length - position);
                chunk.copy(leftOverData, 0, position);
            }
        }

        next();
    },

    final() {
        if (leftOverData != null) {
            audioInputStreamTransform.write(leftOverData);
            audioInputStreamTransform.end();
        }
    }
})

// stream the file
file.pipe(chunkInputTransform);
Enter fullscreen mode Exit fullscreen mode

NOTE: This code sample is illustrative and not intended for production use.

The following sections explain this code listing with reference to the specific problems described earlier.

Connection disruption

Refer to the following code segments:

function startStream() {
    client = new revai.RevAiStreamingClient(token, audioConfig);

    client.on('close', (code, reason) => {
        console.log(`Connection closed, ${code}: ${reason}`);
        if (code !== 1000 || reason == 'Reached max session lifetime'){
            console.log('Restarting stream');
            restartStream();
        }
    });

    // ...

    revaiStream = client.start(sessionConfig);

    // ...
}

function restartStream() {
    if (revaiStream) {
        client.end();
        revaiStream.end();
        revaiStream.removeListener('data', handleData);
        revaiStream = null;
    }

    // ...

    newStream = true;

    startStream();
}
Enter fullscreen mode Exit fullscreen mode

The startStream() function creates a new Rev AI streaming client and initializes a streaming session as revAiStream. It also defines an event handler for a WebSocket close event, which could be generated either due to a connectivity failure or due to a stream timeout. This event handler invokes the restartStream() method, which checks if the revaiStream session was correctly terminated and, if not, restarts it.

Data loss

Refer to the following code segments:

let audioBackup = [];
let audioBackupCopy = [];

const audioInputStreamTransform = new Writable({
    write(chunk, encoding, next) {
        if (newStream && audioBackupCopy.length !== 0) {

            // ...

            if (chunksSent !== 0) {
                for (let i = chunksSent; i < audioBackupCopy.length; i++) {
                    revaiStream.write(audioBackupCopy[i][0], audioBackupCopy[i][1]);
                }
            }
            newStream = false;
        }

        audioBackup.push([chunk, encoding]);

        // ...
    },

    // ...
});

function restartStream() {

    // ...

    audioBackupCopy = [];
    audioBackupCopy = audioBackup;

    newStream = true;

    startStream();
}
Enter fullscreen mode Exit fullscreen mode

Here, audioBackup acts as a data store backup for the streamed audio. If a streaming session ends unexpectedly, two things are needed to restart and continue without data loss:

  • The backup of the audio to resend from, to ensure no data is lost
  • A new backup for the restarted stream

When a stream is restarted with the restartStream() function, the contents of audioBackup is copied into audioBackupCopy and then cleared in readiness for the new backup. Data is then sent to the revAiStream streaming session from audioBackupCopy.

Timestamp corruption

Refer to the following code segments:

let lastResultEndTsReceived = 0.0;

function startStream() {
    client = new revai.RevAiStreamingClient(token, audioConfig);

    // ...

    sessionConfig.startTs = lastResultEndTsReceived;
    revaiStream = client.start(sessionConfig);
    revaiStream.on('data', data => {
        handleData(data);
    });

    // ...
}

function handleData(data) {
    switch (data.type){

        // ...

        case 'final':
            const textElements = data.elements.filter(x => x.type === "text");
            lastResultEndTsReceived = textElements[textElements.length - 1].end_ts;
            break;

        // ...
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, the lastResultEndTsReceived variable holds the timestamp received, updated continuously with each final hypotheses. When the streaming session restarts, the start_ts parameter is set to the value of lastResultEndTsReceived, to re-align timestamps to the stream audio.

NOTE: One important point to note here is that this could potentially result in some audio getting resent to the API. Since only final hypotheses have timestamps, all audio since the last final hypothesis will be resent which could lead to some small number of words being duplicated.

Next steps

Transcribing live audio comes with numerous challenges around connection recovery, data protection and timestamp alignment. For developers working with Rev AI's Streaming Speech-to-Text API, this tutorial provided a technical approach and sample implementation to resolve these challenges.

Learn more about the topics discussed in this tutorial by visiting the following links:

💖 💪 🙅 🚩
vikram_rev
Vikram Vaswani

Posted on May 9, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related