Looi David
Posted on April 30, 2020
One of the common problems that tech companies have to deal with is periodic tasks. Tasks that run at specific intervals, at a specific time of the day, or a specific time of the month. In CoinJar, we have periodic tasks for fetching rates from various sources, monitor crypto transfers confirmations, publishing our rates, marketing emails, monthly reporting, etc.
We use Sidekiq Enterprise to handle our periodic tasks. Sidekiq Periodic Jobs is great in many ways; it's fast, and easy to set up, with a Cron-like interface. Sidekiq Enterprise Periodic Jobs works well for us. But as our platform grows, the number of periodic tasks also increases. And we realised these jobs don't scale the same way. Fetching rates from various sources, for example, the more coins and fiat currencies we support, the number of rates we need to maintain increases exponentially. This became the focus of my Dev Day.
Enter The Lambda
AWS Lambda now supports scheduling. Just like Cron, the highest frequency it can go is once per minute.
So, what if I want a job to run every 10 seconds? AWS provided a guide for that too.
Serverless
Let's get to it:
I used serverless to simplify my development and deployment. It's easy to set up and works out of the box.
All we need to get lambda up and running with serverless is just a configuration file (serverless.yml) and the function definitions.
Configuration
Serverless provides very extensive configurations. But luckily, we don't need all of them. This is what we need to get started:
# serverless.yml
service: lambda-scheduler
provider:
name: aws
runtime: ruby2.7
functions:
ping:
handler: scheduler.ping
iterate:
handler: scheduler.iterate
This defines the runtime (ruby 2.7), and 2 functions (ping and iterate). Notice that the functions have no triggers. That's because we don't need them. They will be invoked by AWS Step Functions.
So, how does step-functions invoke the lambda functions?
Serverless provides serverless-step-functions that allow us to easily define step functions. Here is what it would look like:
# serverless.yml
plugins:
- serverless-step-functions
stepFunctions:
stateMachines:
ten_seconds:
events:
- schedule: cron(* * * * ? *)
definition:
Comment: Invoke Lambda every 10 seconds
StartAt: ConfigureCount
States:
ConfigureCount:
Type: Pass
Result:
index: 0
count: 6
ResultPath: "$.iterator"
Next: "Iterator"
Iterator:
Type: Task
Resource: arn:aws:lambda:<region>:<account>:function:lambda-scheduler-${self:provider.stage}-iterate
ResultPath: "$.iterator"
Next: IsCountReached
IsCountReached:
Type: Choice
Choices:
- Variable: "$.iterator.continue"
BooleanEquals: true
Next: Wait
Default: Done
Wait:
Type: Wait
Seconds: 10
Next: Iterator
Done:
Type: Pass
End: true
Let's walk through this:
schedule: cron(* * * * ? *)
Standard Cron definition, invoke the step function every minute.
StartAt: ConfigureCount
Setup the initial state of the step function when invoked.
ConfigureCount:
This setup the counter for our iterator, and transition to Iterator
.
Iterator:
This is what invokes the iterate
function that's defined earlier, and transition to IsCountReached
IsCountReached:
This is basically if
condition in step functions. If continue
is true, then transition to Wait
, otherwise, transition to Done.
Wait:
As the name suggests, wait for some time before transitioning to the next step.
As you can see, what I just described is just a simple function, but built as a AWS Step Function.
Permissions
As with any AWS service, everything needs proper permissions before it can be used. In this case, we need 2 specific permissions:
- Create Step Functions
- Allow Lambda Function to be invoked
Serverless makes managing AWS IAM permission a bit less painful. All we need it just to allow those 2 actions. Here's the config:
# serverless.yml
provider:
...
iamRoleStatements:
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: "arn:aws:lambda:<region>:<account>:function:*"
- Effect: Allow
Action:
- states:CreateStateMachine
Resource: "arn:aws:states:<region>:<account>:stateMachine:*"
Handlers
Handlers are just methods in Lambda function that processes events.
Iterate
The iterate
method that gets invoked by the step function looks like this:
# scheduler.rb
def iterate(event:, context:)
idx = event.dig('iterator', 'index') + 1
puts "idx=#{idx}"
require 'aws-sdk-lambda'
client = Aws::Lambda::Client.new(region: ENV.fetch('REGION'))
resp = client.invoke({
function_name: "lambda-scheduler-#{ENV.fetch('STAGE')}-ping",
invocation_type: 'Event',
payload: { index: idx }.to_json
})
{
index: idx,
continue: idx < event.dig('iterator', 'count'),
count: event.dig('iterator', 'count')
}
end
What the method does:
- Increment
index
by 1 - Invoke
ping
function - Return
index
,continue
, andcount
back to the step function
This method gets triggered 6 times every minute by the step function.
Ping
Finally, the method that does what we want it to do every 10 seconds. In this case, my method fetches the latest rates from CoinJar Exchange.
# scheduler.rb
def ping(event:, context:)
require 'net/http'
uri = URI('https://data.exchange.coinjar.com/products/BTCAUD/ticker')
response = Net::HTTP.get_response(uri)
if response.code == '200'
require 'json'
body = JSON.parse(response.body)
tick = {
currency_pair: 'BTCAUD',
current_time: body.fetch('current_time'),
last: body.fetch('last'),
bid: body.fetch('bid'),
ask: body.fetch('ask')
}.to_json
puts "tick=#{tick}"
else
puts "code=#{response.code}"
end
end
Final Result
Both the iterate
and ping
functions would be triggered once every 10 seconds. Here's the output of each of these functions from the logs:
The code is available here: github
CoinJar allow users to buy, sell, store and spend digital assets.
Posted on April 30, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.