Orchestrate AWS Lambdas using MongoDB - Part 2
Sateesh Madagoni
Posted on November 15, 2023
Continuation to the first part. This post assumes you are a developer with working knowledge on AWS, Lambda, EventBridge, MongoDB, NodeJs.
Technical Implementation:
1. Design State-Machine
This step you need to figure out, how do you want to orchestrate your jobs and create an object like below.
const stateMachine = {
id: '1',
purposeId: 'catch_the_fish',
currentPhase: 'thinking',
phases: {
thinking: {
steps: {
`thinking`: {
jobs: ['buy_fishing_gear'],
},
buy_fishing_gear: {
jobs: ['rent_a_boat'],
},
rent_a_boat: {
jobs: ['go_to_fishing_ground'],
},
go_to_fishing_ground: {
jobs: ['cast_fishing_pole', 'drink_beer', 'catch_fish'],
},
catch_fish: {
jobs: ['go_home'],
},
},
finished: [],
next: 'end',
waitFor: [
'buy_fishing_gear',
'rent_a_boat',
'go_to_fishing_ground',
'cast_fishing_pole',
'drink_beer',
'catch_fish',
'go_home',
],
},
},
};
In the above example we are running buy_fishing_gear
after thinking
job is finished in a sequential manner, but we run cast_fishing_pole
, drink_beer
and catch_fish
in parallel after go_to_fishing_ground
is successful.
2. Start the Process:
Creation of the statemachine could be anything from API to a cron job. Lets take API as an example as POST /process/catch-the-fish.
router.post('/process/catch-the-fish', async (req, res, next) => {
const state = {}; // above mentioned state
// Insert into state-machines
const createdState = await db.collection('state-machines').insertOne(state);
// Start the process by sending first event. As thinking is success we start the process of catching fish
await db.collection('statuses').insertOne({
stateId: createdState._id,
status: 'success',
job: 'thinking',
date: new Date(),
// additional parameters as needed
});
});
3. State Machine Job:
exports.handler = async (event, ctx, callback) => {
// Mongodb Document from the trigger
const document = event.detail.fullDocument;
// Find the state machine
const state = await db
.collection('state-machines')
.findOne({ _id: document.stateId });
// if its ended do nothing
if (state?.currentPhase === 'end') {
callback();
}
// Update the job as finished for the state machine
const currentPhase = state.phases[state.currentPhase];
currentPhase.finished.push(document.job);
await db
.collection('state-machines')
.findOneAndUpdate(
{ _id: document.stateId },
{ $set: { [`phases.${state.currentPhase}`]: currentPhase } }
);
const jobsRemaining = currentPhase.waitFor.filter(
(s) => !currentPhase.finished.includes(s)
);
// If there are no remaining jobs in the current phase
if (jobsRemaining.length === 0) {
// Update the currentPhase to be the next Phase.
await db.collection('state-machines').findOneAndUpdate(
{ _id: document.stateId },
{
$set: {
currentPhase: currentPhase.next,
},
}
);
// If the next phase is not end trigger the phase.
if (currentPhase.next !== 'end') {
await db.collection('statuses').insertOne({
...document,
job: `${currentPhase.next}Start`,
status: 'success',
});
}
}
};
Above, whenever a job succeeds this will update the finished jobs and check if the phase is finished and move onto next phase until it meets the end phase.
4. Orchestrator Job:
exports.handler = async (event, ctx, callback) => {
ctx.callbackWaitsForEmptyEventLoop = false;
const document = event.detail.fullDocument;
const state = await db
.collection('pipeline-state-machines')
.findOne({ _id: document.stateId });
const currentPhase = state[state.currentPhase];
if (!document.stateId) {
console.log('Cannot execute the pipeline without the state id:', state);
return { success: false };
}
if (state.currentPhase === 'end') {
console.log('Cannot execute the pipeline without the state id:', state);
return { success: false };
}
if (!currentPhase) {
console.log('Cannot execute the pipeline without the state id:', state);
return { success: false };
}
const step = currentPhase.steps[document.job];
if (step && step.jobs && step.jobs.length > 0) {
await Promise.all(
step.jobs.map(async (job) => {
const payload = {}
const event = {
FunctionName: job,
InvocationType: 'Event',
LogType: 'Tail',
Payload: JSON.stringify(payload),
};
return lambda.invoke(event);
})
);
}
callback();
};
The above job receives the success event and finds the next jobs to trigger and invoke them.
5. Notifications Job:
As above the job receives all the events so we can use the data and structure the message to send.
What if there are multiple phases:
Then we just have to add another phase to existing state machine configuration.
const stateMachine = {
id: '1',
purposeId: 'catch_the_fish',
currentPhase: 'thinking',
phases: {
thinking: {
// Prev things
},
cooking: {
steps: {
cookingStart: {
jobs: ['clean'],
},
clean: {
jobs: ['cook'],
},
},
waitFor: ['cook', 'clean'],
finished: [],
next: 'end',
},
},
};
I hope the above code and explanation gives you a way to implement your own solutions. For anymore details please do comment, I would be happy to help. Thanks.
Posted on November 15, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.