Sub-minute AWS lambda scheduler

davelooi

Looi David

Posted on April 30, 2020

Sub-minute AWS lambda scheduler

One of the common problems that tech companies have to deal with is periodic tasks. Tasks that run at specific intervals, at a specific time of the day, or a specific time of the month. In CoinJar, we have periodic tasks for fetching rates from various sources, monitor crypto transfers confirmations, publishing our rates, marketing emails, monthly reporting, etc.

We use Sidekiq Enterprise to handle our periodic tasks. Sidekiq Periodic Jobs is great in many ways; it's fast, and easy to set up, with a Cron-like interface. Sidekiq Enterprise Periodic Jobs works well for us. But as our platform grows, the number of periodic tasks also increases. And we realised these jobs don't scale the same way. Fetching rates from various sources, for example, the more coins and fiat currencies we support, the number of rates we need to maintain increases exponentially. This became the focus of my Dev Day.

Enter The Lambda

AWS Lambda now supports scheduling. Just like Cron, the highest frequency it can go is once per minute.

So, what if I want a job to run every 10 seconds? AWS provided a guide for that too.

Serverless

Let's get to it:

I used serverless to simplify my development and deployment. It's easy to set up and works out of the box.

All we need to get lambda up and running with serverless is just a configuration file (serverless.yml) and the function definitions.

Configuration

Serverless provides very extensive configurations. But luckily, we don't need all of them. This is what we need to get started:

# serverless.yml
service: lambda-scheduler
provider:
  name: aws
  runtime: ruby2.7
functions:
  ping:
    handler: scheduler.ping
  iterate:
    handler: scheduler.iterate

This defines the runtime (ruby 2.7), and 2 functions (ping and iterate). Notice that the functions have no triggers. That's because we don't need them. They will be invoked by AWS Step Functions.

So, how does step-functions invoke the lambda functions?

Serverless provides serverless-step-functions that allow us to easily define step functions. Here is what it would look like:

# serverless.yml
plugins:
  - serverless-step-functions
stepFunctions:
  stateMachines:
    ten_seconds:
      events:
        - schedule: cron(* * * * ? *)
      definition:
        Comment: Invoke Lambda every 10 seconds
        StartAt: ConfigureCount
        States:
          ConfigureCount:
            Type: Pass
            Result:
              index: 0
              count: 6
            ResultPath: "$.iterator"
            Next: "Iterator"
          Iterator:
            Type: Task
            Resource: arn:aws:lambda:<region>:<account>:function:lambda-scheduler-${self:provider.stage}-iterate
            ResultPath: "$.iterator"
            Next: IsCountReached
          IsCountReached:
            Type: Choice
            Choices:
            - Variable: "$.iterator.continue"
              BooleanEquals: true
              Next: Wait
            Default: Done
          Wait:
            Type: Wait
            Seconds: 10
            Next: Iterator
          Done:
            Type: Pass
            End: true

Let's walk through this:

schedule: cron(* * * * ? *)
Standard Cron definition, invoke the step function every minute.

StartAt: ConfigureCount
Setup the initial state of the step function when invoked.

ConfigureCount:
This setup the counter for our iterator, and transition to Iterator.

Iterator:
This is what invokes the iterate function that's defined earlier, and transition to IsCountReached

IsCountReached:
This is basically if condition in step functions. If continue is true, then transition to Wait, otherwise, transition to Done.

Wait:
As the name suggests, wait for some time before transitioning to the next step.

As you can see, what I just described is just a simple function, but built as a AWS Step Function.

Permissions

As with any AWS service, everything needs proper permissions before it can be used. In this case, we need 2 specific permissions:

  1. Create Step Functions
  2. Allow Lambda Function to be invoked

Serverless makes managing AWS IAM permission a bit less painful. All we need it just to allow those 2 actions. Here's the config:

# serverless.yml
provider:
  ...
  iamRoleStatements:
    - Effect: Allow
      Action:
        - lambda:InvokeFunction
      Resource: "arn:aws:lambda:<region>:<account>:function:*"
    - Effect: Allow
      Action:
        - states:CreateStateMachine
      Resource: "arn:aws:states:<region>:<account>:stateMachine:*"

Handlers

Handlers are just methods in Lambda function that processes events.

Iterate

The iterate method that gets invoked by the step function looks like this:

# scheduler.rb
def iterate(event:, context:)
  idx = event.dig('iterator', 'index') + 1
  puts "idx=#{idx}"

  require 'aws-sdk-lambda'
  client = Aws::Lambda::Client.new(region: ENV.fetch('REGION'))
  resp = client.invoke({
    function_name: "lambda-scheduler-#{ENV.fetch('STAGE')}-ping",
    invocation_type: 'Event',
    payload: { index: idx }.to_json
  })

  {
    index: idx,
    continue: idx < event.dig('iterator', 'count'),
    count: event.dig('iterator', 'count')
  }
end

What the method does:

  1. Increment index by 1
  2. Invoke ping function
  3. Return index, continue, and count back to the step function

This method gets triggered 6 times every minute by the step function.

Ping

Finally, the method that does what we want it to do every 10 seconds. In this case, my method fetches the latest rates from CoinJar Exchange.

# scheduler.rb
def ping(event:, context:)
  require 'net/http'
  uri = URI('https://data.exchange.coinjar.com/products/BTCAUD/ticker')
  response = Net::HTTP.get_response(uri)

  if response.code == '200'
    require 'json'
    body = JSON.parse(response.body)
    tick = {
      currency_pair: 'BTCAUD',
      current_time: body.fetch('current_time'),
      last: body.fetch('last'),
      bid: body.fetch('bid'),
      ask: body.fetch('ask')
    }.to_json
    puts "tick=#{tick}"
  else
    puts "code=#{response.code}"
  end
end

Final Result

Both the iterate and ping functions would be triggered once every 10 seconds. Here's the output of each of these functions from the logs:

Alt Text

Alt Text

The code is available here: github

CoinJar allow users to buy, sell, store and spend digital assets.

💖 💪 🙅 🚩
davelooi
Looi David

Posted on April 30, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related