Building a distributed scheduler
Clem
Posted on January 19, 2021
For a long time I’ve postponed the decision to introduce a distributed scheduler at MyCoach. I wasn’t sure which one to choose, or how to build one. Post poning this decision meant not building some features.
In July 2020 I got caught up by the feature folks, there were some features requiring scheduling.
For our MyCoach Pro product, we needed to send some push notifications before and after the training in order for the player to reply to the questionnaire. The goal was clear, we needed to make sure that player answer the pre and post training questionnaire. If not we’ll have unhappy physical trainer and coaches because they need data from questionnaire to adapt the training to the current physical state of the player and to assess the impact of the training on the player.
So here we are needing a scheduler.
The functional requirements are the following:
- 90 min, 75 min, 60 min, 45 min, 30 min, 15 min before the training send a notification to all the players that haven’t answered the pre training questionnaire yet
- This means that if the player answer the questionnaire when receiving the 75 min before notification he/she will not get the subsequent notifications
- 15 min, 30 min, 45min , 60 min, 75 min, 90 min, 105 min, 120 min, 135 min, 150 min, 165 min, 180 min after the training send a notification to all the players that haven’t answered the post training questionnaire yet
How do we address this requirement technically ?
We can break this down into two distinct things.
First the job which needs to be done:
- For a given training x get the list of players that haven’t answered the PRE/POST questionnaire
- Send a push notification to this list of players to remind them that they need to answer
Second the scheduler which will trigger the job at some predefined timestamps.
Implementing the job is easy, we already have all the informations we need in the database.
For the scheduling part we need to choose a scheduler.
This scheduler must:
- Accept to schedule job based on ISO8601 dates
- Have a library for either Scala or Go (this is the two languages that we use for backend development at MyCoach)
- Be fault tolerant
First implementation: a not so distributed scheduler
We were in July, team were gathering for the pre-season trainings and games. Some customer were willing to start the season properly and were pushing for this feature to be available ASAP.
I started looking for solutions and the simplest I could find was to use Akka scheduler. We are happy users of Akka Streams and Akka Http and since the Akka scheduler supports timestamp based scheduling, is available in Scala, and we already have an actor system at hand because of Akka Http, it was a quick choice.
Scheduling
Each time a customer on the MyCoach Pro product will:
- Create or Update a training
- We’ll had the training id to a “schedule” rabbit mq queue
- Delete a training
- We’ll had the training id to a “unschedule” rabbit mq queue
These two queues will be consumed by our notification service, when the notification service consumes a message on the schedule queue, it will get the training data (the date) compute all the schedules, and register the job to the Akka scheduler with a key corresponding to the training id. (Note: we use UUIDs as PRIMARY KEY for all our tables they are globally unique).
When the notification service consumes a message from the unschedule queue, it will generate the unique job key based on the training id and will unregister the job from the Akka scheduler.
So far so good, we are able to schedule jobs at predefined time.
What if the service fail or is updated ?
Since Akka scheduler lives in memory, we’ll lose all the schedules.
What can we do about it ?
When we consume the schedule and unschedule message, in addition to submitting/cancelling the job to the Akka Scheduler we will store the fact that this training must have scheduled notifications in some mongodb collection.
On startup the service will lookup the whole collection and resubmit training to the schedule queue. The notification jobs will get re-submitted to the Akka scheduler.
So now we have something that can restore its state when it fails, or when we deploy a new version.
Good enough, it will support the load for the hundreds of MyCoach Pro customers.
Second implementation: a distributed scheduler
Here we are in November 2020, I got caught up once again by the feature folks, for our products in partnership with the federations (MyCoach is a partner of 8 Olympic Federations), we need to be able to send notifications to players to ask them if they will come to the training or not. These notifications must be scheduled 3 days, 2 days, 1 day and 6 hour before the training.
The idea is the same than for the questionnaire notifications, we must send the notifications only to player whose status is PENDING we will not notify players whose status is ACCEPTED or DENIED.
Can we use our already working scheduler ?
Technically we can, but Akka scheduler was already loaded with thousands of jobs for the MyCoach Pro product, which has hundreds of customers. The products with federations has thousands of coaches, which will mean 100 000s of jobs to schedule.
Akka scheduler would work, but since the tendency is to add more scheduled things in the future we might want to switch to something more robust.
What are our options ?
Here’s our requirements:
- Accept to schedule job based on ISO8601 dates
- Have a library for either Scala or Go
- Be fault tolerant
- Scale horizontally
- If possible, rely on technologies that we already host
Ok so now what can we find ?
OVH Metronome seems to check all the requirements, the point is it requires KAFKA and we already host similar services. But I can live with that, the main problem IMO is that it hasn’t been updated since November 2018 and it lacks a documentation on how to set this up, and how to schedule jobs. In the other hand I really like the idea of decoupling scheduling and execution, it’ll allow us to rely on a single scheduler no matter if we want to schedule jobs using Go, Scala or whatever language we’ll had in the future.
Pager Duty Scheduler checks a lot of requirements. The main problem is it uses Cassandra and Kafka; we don’t have any experience for both neither do we have other use cases than the scheduler which will need Cassandra or Kafka. I’m always reluctant to hosting new database systems, database systems are complex by nature and are not easy when it comes to scaling them. It’s a no go then.
Netflix Fenzo checks a lot of requirements too. The main problem is that it relies on Apache Mesos and we use Kubernetes not Mesos as our Orchestrator. Changing our Orchestrator just for the scheduler system is a huge and risky change.
There are plenty of other schedulers out there, but nothing seems to: propose timestamp based scheduling AND integrate in the technologies that we already host.
What if we build our own distributed scheduler using mongodb and RabbitMq ?
Input
A job is defined by:
- A unique key
- A set of timestamps (ISO8601 dates)
- A callback
- A rabbitmq callback will be
- The queue to put the payload in
- The payload for the message
- A rabbitmq callback will be
Or in json:
{
"key":"training-invitations-$trainingId",
"schedules":["2020-12-24T14:00:00Z","2020-12-25T14:00:00Z","2020-12-26T14:00:00Z","2020-12-27T08:00:00Z"],
"callback": {
"type":"rabbitmq",
"data":{
"queue":"training.invitations.notifications",
"payload":{
"id":"$trainingId"
}
}
}
}
The scheduler will receives schedules using an HTTP API.
You might wonder why HTTP ?
- HTTP is supported in almost any language
- HTTP is synchronous, we’ll know right away if our job has been scheduled properly or not
- Since we use Kubernetes, we can horizontally scale the scheduler with a deployment, expose it with a service and we’ll get load balancing for free
Storage
Now that the service receives the input, it needs to store it.
We will generate one document for each timestamp schedule, if we take our previous example:
{...
"schedules":["2020-12-24T14:00:00Z","2020-12-25T14:00:00Z","2020-12-26T14:00:00Z","2020-12-27T08:00:00Z"],
...}
We will generate 4 documents corresponding to each instance of our job; then we’ll store them into our mongodb collection.
Execution
To execute the jobs, we will write a task which will use mongodb findOneAndUpdate to find either the upcoming jobs in the next minute, or the job past in the last minute. It’s your choice here, either you’ll be up to 1 minute early or up to 1 minute late. In our case we chose to be up to 1 minute early, better early than late right ?
This task will be scheduled using GoCron to run every minute. You can tweak this to run more often if you want, in our case we don’t need second precision so one minute is enough.
You might wonder why we use mongodb findOneAndUpdate ? Because this way we can at the same time get the job to execute and mark it as scheduled. Atomicity is guaranteed for this operation, so if we run 10 instance of the service in parallel they will not take the same job to schedule.
Our query will look like this using the Go mongodb driver:
scheduleStore.Collection.FindOneAndUpdate(ctx, bson.M{
"$and": []bson.M{{
"timestamp": bson.M{
"$lte": before,
},
},
{"$or": []bson.M{
{"scheduledAt": nil},
{"scheduledAt": bson.M{"$lte": time.Now().Add(-(1 * time.Minute))}},
},
},
}},
bson.M{
"$set": bson.M{"scheduledAt": time.Now()},
},
nil,
)
The query is the following:
Give me all the jobs that are before some timestamps AND (the jobs that haven’t been scheduled yet OR which have been scheduled more than one minute ago).
The update operation just sets the scheduledAt field to avoid re-scheduling the same job at the same time.
Why do we also take jobs that match this criteria “ which have been scheduled more than one minute ago” ?
Because each time a job is scheduled properly (i.e.: the callback is executed), we remove the job from the collection. A job properly scheduled should not still be in the collection 1 minute after its scheduledAt date. If it is still here we can consider that the schedule hasn’t worked.
The next thing to do now is just to execute the call back, in our case put the proper data into the given rabbitMq queue. On success we remove the job from the collection.
Now that the message is in the queue we can write some consumer that trigger the notification instantly when a message is in the queue.
The good thing with rabbitMq is that error handling and retries are simple, we can just Nack the message on failure and it’ll be re-consumed.
Conclusion
We were able to write a scheduler that can match all our requirements, it can work cross platform, it can be scaled horizontally. It works using the technologies we already have at hand.
We replaced the Akka Scheduler by this new scheduler for the pre/post training questionnaire notifications and have written the tasks for the training invitations.
We plan to open source this scheduler in the coming months, stay tuned !
Posted on January 19, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 29, 2024