Effective task management in Habitica using a text file and AWS Serverless

ppabis

Piotr Pabis

Posted on July 13, 2024

Effective task management in Habitica using a text file and AWS Serverless

Creating tasks (to-dos) in Habitica is not the most effective and pleasant thing using the UI (although tools like Trello or Jira are far from better). Compared to editing a simple and single text file, whether formatted as Markdown or not, is much easier and gives a better overview (at least if the list is just a simple title without a long description). In previous post I demonstrated you a way to measure overall productivity or performance in Habitica. However, I myself use it only for daily and repeating tasks - I don't have to use the UI to define it every time. For one-time to-dos the situation is very different. I want simplicity and efficiency. And what is more efficient than simple text? I will use Habitica's API powers again to create a solution that will transform a text file to a list of tasks in Habitica including performing edits. I will also show you how to solve the problem of low API rate limits.

The completed project is tagged v3 and built on top of previous posts.

Processing a list of tasks

The lists of tasks will be uploaded to an S3 bucket. An event notification in this bucket will trigger a Lambda function that will process this file and create and update tasks. The format for the tasks will be the following:

ID    due date   difficulty+attribute - task description
0001. 15/07/2024 TP - Wash the dishes
0002.            HI - Create a new blog post
Enter fullscreen mode Exit fullscreen mode

ID will be used to track the tasks in DynamoDB at a later stage. You can just rely on the line number but I want to ensure consistency. This also allows to split tasks between multiple files. Due date can be a date in DD/MM/YYYY format or empty string in case there's no due date. Difficulties are the following: Trivial, Easy, Medium and Hard. They will be mapped to appropriate priority values in Habitica. Attributes are not visible in Habitica UI - it determines which skill of the player will multiply task's value on completion: Strength, Intelligence, Perception and Constitution. Any line that will not follow this format will be just discarded - we will use regular expressions to verify it. In the above example, the first line will be ignored. This allows us to keep comments or more details for each task and they won't be processed.

Let's define the first function with which we will be able to process the tasks into Python objects. It will split the input data into lines and then parse each line to produce an object with all the task parameters. We will also wrap each line processing call in a try-catch block so that in case the date is formatted badly, we will just print an error and continue.

import re
from datetime import datetime

DIFFICULTIES = { 'T': '0.1', 'E': '1', 'M': '1.5', 'H': '2' }
ATTRIBUTES = { 'S': 'str', 'I': 'int', 'P': 'per', 'C': 'con' }

def line_to_task(line: str) -> dict | None:

    # (Task ID). (Date?) (Difficulty) (Attribute) - (Title), feel free to adapt to your needs
    r = re.match('^(\\d+).\\s+([0-9/]*)\\s*([TEMHtemh])([SIPCsipc])\\s+-\\s+(.*)$', line)

    if r:
        date = None if r.group(2) == '' else datetime.strptime(r.group(2), '%d/%m/%Y')
        difficulty = DIFFICULTIES[r.group(3).upper()]
        attribute = ATTRIBUTES[r.group(4).upper()]

        return {
            'id': r.group(1),
            'date': date,
            'difficulty': difficulty,
            'attribute': attribute,
            'title': r.group(5),
        }

    return None


def parse_task_list(task_list: str) -> list[dict]:
    tasks = []
    for line in task_list.split('\n'):
        try:
            task = line_to_task(line.strip())
            if task:
                tasks.append(task)
        except Exception as e:
            print(f"Error processing line '{line}': {e}")
    return tasks
Enter fullscreen mode Exit fullscreen mode

I tested the function with the following task list and received a correct output when parsing all the values. I formatted the output to be somewhat of a readable format to see if every value has its place.

Input

This is an unrelated lien taht should be ignored
0001. 01/08/2024 MI - Create a list of tasks
0002. 02/08/2024 ES - Create another list of tasks
0003. TS - A task with no due date!

This is not a task
0004. This is also not a task
0005. 03/10/2024 - Also this is also not a task

0006. TC - test 12345 this should be a task - with a hyphen -- extra - hyphens
0007. 05/08/2024 HP - 😂 emojis 🏆
0010. 66/12/3033 EC - this task should be just warned and not crash but is wrong
0234. HC - test 0234 this should be a task
Enter fullscreen mode Exit fullscreen mode

Output

Error processing line '0010. 66/12/3033 EC - this task should be just warned and not crash but is wrong': time data '66/12/3033' does not match format '%d/%m/%Y'
0001: difficulty=1.5 attribute=int date=2024-08-01 00:00:00
        Create a list of tasks
0002: difficulty=1 attribute=str date=2024-08-02 00:00:00
        Create another list of tasks
0003: difficulty=0.1 attribute=str date=None
        A task with no due date!
0006: difficulty=0.1 attribute=con date=None
        test 12345 this should be a task - with a hyphen -- extra - hyphens
0007: difficulty=2 attribute=per date=2024-08-05 00:00:00
        😂 emojis 🏆
0234: difficulty=2 attribute=con date=None
        test 0234 this should be a task
Enter fullscreen mode Exit fullscreen mode

Storing and updating tasks

Now we have to decide which tasks need to be created and which ones need to be updated. We need to map our task IDs to Habitica's UUIDs. We will use DynamoDB for that. This will be also more efficient to compare contents of the task with an entry in DynamoDB in contrast to querying Habitica API for each task. However, to keep things even simpler, I will deliberately skip handling deletion of the tasks - for this we would need to either rely on file differences (lines removed) or scan entire table and compare to what we have loaded from the file. (Alternatively, my idea was a - in front of the ID is to remove task in one file upload and then task can be safely deleted from the file for next uploads, but this post was already very long 😅). Our primary key will be the task ID. As it always increases, it shouldn't pose any problems. The keys don't have to be increased by one - you can use 10000, 20000 and so on to split projects or just keep some order/grouping.

I will first define the simplest function which is creating a new task. It will return a formatted dict that can be directly submitted to DynamoDB. The date field will always be that will be either an empty string or in an ISO standard format such as 2024-07-08T18:33:56. This will make things simpler rather than checking if the date exists in the record.

def create_task(task: dict) -> dict:
    return {
        'id': task['id'],
        'title': task['title'],
        'date': task['date'].isoformat() if task['date'] else "",
        'difficulty': task['difficulty'],
        'attribute': task['attribute']
    }
Enter fullscreen mode Exit fullscreen mode

The next function will be used to update the task found already in DynamoDB table. It will compare each field separately and return the changed object if some changes were made to it or None when we shouldn't update the row.

def compare_and_update(task: dict, item: dict) -> dict | None:
    dirty = False
    task['date'] = task['date'].isoformat() if task['date'] else ""

    if task['title'] != item['title']:
        item['title'] = task['title']
        dirty = True

    if task['date'] != item['date']:
        item['date'] = task['date']
        dirty = True
    # ... continues for other fields...

    return item if dirty else None
Enter fullscreen mode Exit fullscreen mode

As the last point, we will combine the logic to use both of the functions to either create, update or skip the task. However, here comes a twist - Habitica allows for batch creating new tasks but can only update existing ones one by one. To save on time and executions, we will create a list of tasks to be created and do this immediately in this function. Tasks that need to be updated will be updated in DynamoDB and their IDs will be passed further to the Step Function. Why? The problem is that Habitica API is very strict on the amount of calls we can do (30 per minute). We can use simple sleep in the Lambda but this will incur unnecessary costs. Step Function has a Wait block that can save us a bit of money. The process will look something like on the diagram below.

Processing tasks

How to determine if a task is new or need updating? We don't have to reach out to the API. Tasks that are known need to have UUID mapping it to Habitica. So we just need to check this one attribute. If it is there and change was detected with above function, we update the row in DynamoDB and add ID to the list. The next function will simply load it again from DynamoDB and send a request to Habitica.

def store_update_tasks(tasks: list[dict]) -> list[str]:
    updated_ids, new_tasks = [], []
    for task in tasks:
        # Update a task in DynamoDB if the input is different than what is in
        # the database. If the record in the database does not have UUID, it
        # means that it needs to be created in Habitica.
        response = ddb.get_item(Key={'id': task['id']})

        if 'Item' in response:
            updated = compare_and_update(task, response['Item'])
            if updated:
                ddb.put_item(Item=updated)
                if 'habitica_uuid' in updated and updated['habitica_uuid']:
                    # This item exists in Habitica
                    updated_ids.append(updated['id'])
                else:
                    # This item was existing but wasn't submitted to Habitica yet
                    new_tasks.append(updated)
            else:
                print(f"Task {task['id']} is up to date.")

        # Task not found so create a new one
        else:
            ddb.put_item(Item=create_task(task))
            new_tasks.append(task)

    id_uuids = batch_create_tasks(new_tasks) # Dummy function for now

    # Update rows in DynamoDB to hold UUIDs of the recently created tasks
    for local_id, uuid in ids_uuids:
        ddb.update_item(
                Key={'id': task_id},
                UpdateExpression='SET habitica_uuid = :uuid',
                ExpressionAttributeValues={':uuid': uuid}
            )

    return updated_ids

def batch_create_tasks(tasks) -> list[(str, str)]:
    # Dummy function for now
    return [(task['id'], f"123456-{task['id']}") for task in tasks]
Enter fullscreen mode Exit fullscreen mode

As the last step, we have to create Lambda handler which will be the target for S3 event on object upload or update - so in case we create a new task list or update it, the Lambda will be triggered. If you plan on using multiple files in S3, this function should be fine with processing it. However, you need to keep the IDs in all lists unique. We will process all the objects that were sent by the event and do it safely in try-catch block so that one bad file won't crash the entire process. We will iterate through all records in the event in case the function was triggered for multiple uploads.

# imports, clients...
def process_file(record) -> list[str]:
    obj = s3.get_object( Bucket=record['s3']['bucket']['name'], Key=record['s3']['object']['key'] )
    tasks = parse_task_list(obj['Body'].read().decode('utf-8'))
    ids = store_update_tasks(tasks)
    print(f"Tasks to update: {', '.join(ids)}")
    return ids

def lambda_handler(event, context):
    ids = []
    for record in event['Records']:
        if record['eventName'].startswith('ObjectCreated'):
            try:
                _ids = process_file(record)
                ids.extend(_ids)
            except Exception as e:
                print(f"Error processing record: {e}")

    # Here we will start Step Function if ids is not empty so if there are any
    # tasks to be updated in Habitica
    if ids:
        print(f"Dummy - Starting step function for update {', '.join(ids)}")

    return {
            'statusCode': 200,
            'body': ids
        }
Enter fullscreen mode Exit fullscreen mode

Connecting events

This section contains a lot trials and errors on how to connect S3 bucket, S3 event notifications and Lambda. If you want a working solution, navigate to GitHub repository.

Now comes the tricky part. We used Serverless Application Model based on CloudFormation to create previous Habitica related projects. So the natural thing to do would be to: create a bucket, create a DynamoDB table, connect it to Lambda Event property. Let's try to do it.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: A function that processes task list uploaded to S3 and submits it to Habitica

Globals:
  Function:
    Timeout: 15
    MemorySize: 128

Resources:
  # ... resources from previous posts
  HabiticaTaskListBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: habitica-task-list-abcdef123456

  HabiticaTaskListTable:
    Type: AWS::Serverless::SimpleTable
    Properties:
      TableName: HabiticaTaskList

  HabiticaProcessTaskList:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: process_task_list/
      Handler: main.lambda_handler
      Runtime: python3.12
      Architectures:
        - arm64
      Policies:
        - S3ReadPolicy:
            BucketName: !Ref HabiticaTaskListBucket
        - DynamoDBCrudPolicy:
            TableName: !Ref HabiticaTaskListTable
        - AWSSecretsManagerGetSecretValuePolicy:
            SecretArn: !Ref HabiticaSecret
      Environment:
        Variables:
          TABLE_NAME: !Ref HabiticaTaskListTable
          HABITICA_SECRET: !Ref HabiticaSecret
      Events:
        S3Event:
          Type: S3
          Properties:
            Bucket: !Ref HabiticaTaskListBucket
            Events: s3:ObjectCreated:*
Enter fullscreen mode Exit fullscreen mode

I will not filter the event, just let it process anything that lands in the bucket as why not. After we run sam build and sam deploy we will get an error.

Status: FAILED. Reason: Circular dependency between resources:
[HabiticaTaskListBucket, HabiticaProcessTaskListRole, HabiticaProcessTaskListS3EventPermission, HabiticaProcessTaskList]
Enter fullscreen mode Exit fullscreen mode

Ok, so maybe we should create a bucket in a separate stack and then reference its name in this stack. Let's try doing that. I will cut the HabiticaTaskListBucket resource from this stack, deploy it form a different YAML file, export the bucket name as the output and import it into this SAM template. Create a new template.yml in a new directory.

---
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: A bucket for Habitica task lists - bucket.yaml

Resources:
  HabiticaTaskListBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: habitica-task-list-abcdef123456

Outputs:
  BucketName:
    Value: !Ref HabiticaTaskListBucket
    Export:
      Name: HabiticaTaskListBucket
Enter fullscreen mode Exit fullscreen mode

Now instead of Bucket: !Ref HabiticaTaskListBucket we will use Bucket: !ImportValue HabiticaTaskListBucket. This will take the bucket name from global CloudFormation outputs and insert it into this stack.

# ...
Resources:
  # This is now in a separate stack
  #HabiticaTaskListBucket:
  #  Type: AWS::S3::Bucket
  #  Properties:
  #    BucketName: habitica-task-list-abcdef123456

  HabiticaProcessTaskList:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: process_task_list/
      # ... cut for clarity
      Policies:
        - S3ReadPolicy:
            BucketName: !ImportValue HabiticaTaskListBucket
      # ... cut for clarity
      Events:
        S3Event:
          Type: S3
          Properties:
            Bucket: !ImportValue HabiticaTaskListBucket
            Events: s3:ObjectCreated:*
Enter fullscreen mode Exit fullscreen mode
$ cd tasks_bucket
$ sam build
$ sam deploy --guided
$ cd ..
$ sam build
$ sam deploy
...
Invalid Serverless Application Specification document. Number of errors found: 1.
Resource with id [HabiticaProcessTaskList] is invalid. Event with id [S3Event]
is invalid. S3 events must reference an S3 bucket in the same template.
Enter fullscreen mode Exit fullscreen mode

This seems absurd. So, one possibility is to define the bucket first, deploy and then connect it. But this contradicts the whole infrastructure as code concept as it introduces manual steps. Our only hope is to use native CloudFormation AWS::Lambda::Permission to give the S3 events source permission to invoke the function and NotificationConfiguration in S3 resource. Let's try that. Remember to destroy the previously created S3 bucket if you did try the method with importing. In the stack below, Lambda will be created first, then the permission and as the last step the bucket will be created. It has to happen in this order because the S3 notification requires that the permission is already in place. However, there's still one loop in here as well! In the read policy for the Lambda we refer the bucket. However, unlike in case of Event section we can use a simple string parameter and !Ref in both BucketName properties. Essentially, we have to replace all references to the bucket with a string or manually created ARN - which is not ideal but better than double deployment.

Parameters:
  HabiticaTaskListBucketName:
    Type: String
    Description: Name of the S3 bucket that will hold the task list
    Default: habitica-task-list-abcdef123456
# ...
Resources:
  HabiticaTaskListBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Ref HabiticaTaskListBucketName
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: s3:ObjectCreated:*
            Function: !GetAtt HabiticaProcessTaskList.Arn

  HabiticaProcessTaskList:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: process_task_list/
      # ... cut for clarity
      Policies:
        - S3ReadPolicy: - removed
            BucketName: !Ref HabiticaTaskListBucketName
      # ... cut for clarity
      # Events: - removed

  HabiticaProcessTaskListS3Permission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !GetAtt HabiticaProcessTaskList.Arn
      Principal: s3.amazonaws.com
      SourceArn: !Sub "arn:aws:s3:::${HabiticaTaskListBucketName}"
Enter fullscreen mode Exit fullscreen mode
$ cd tasks_bucket
$ sam delete
$ cd ..
$ sam build && sam deploy
Enter fullscreen mode Exit fullscreen mode

Testing the task list processor

I will upload a file to the S3 bucket and expect DynamoDB table to contain all the valid tasks. Next I will replace the file with some updates to task and I would like to see the table updated accordingly. For now, we will not POST or PUT anything into Habitica. We will only track what is happening in the logs and in DynamoDB Table.

$ aws s3 cp tasks.txt s3://habitica-task-list-abcdef123456/tasks.txt
$ # ... checking logs, table
$ aws s3 cp tasks2.txt s3://habitica-task-list-abcdef123456/tasks.txt
$ # ... checking logs and table again
Enter fullscreen mode Exit fullscreen mode

The file that I used is the same as in the example above. As the first step, I checked CloudWatch logs if there's any output from the function. I can see that everything went smoothly in terms of processing the file.

Error processing line '0010. 66/12/3033 EC - this task should be just warned and not crash but is wrong': time data '66/12/3033' does not match format '%d/%m/%Y' 
Tasks found in file = 6
Tasks to update: 
Enter fullscreen mode Exit fullscreen mode

Now I will scan the DynamoDB table items. I will use AWS Console for that. The screenshot below shows the status of both the first version of the file and some updates that were performed after reuploading with some changes. After second run the logs also show expected results - two tasks were existing previously and have to be updated in the next routine. One task was new and was just silently inserted and mock-created in Habitica.

Tasks found in file = 7
Task 0001 is up to date.
Task 0002 is up to date.
Task 0006 is up to date.
Task 0234 is up to date. 
Tasks to update: 0003, 0007
Starting Step Function with 2 tasks.
Enter fullscreen mode Exit fullscreen mode

DynamoDB tasks

Preparing a tag in Habitica

Before we create a function that will store every task in Habitica, I suggest creating a tag that will mark each of the tasks that were created automatically. Go to your Habitica dashboard and select Tags at the top of the lists. On mobile it looks different. Select Edit tags and add a new one that will be used for the purpose.

Habitica tags

Now we need to retrieve the tag's ID. We can automate it in the Lambda on first run but it likely won't change so we can just retrieve it once and hardcode it. To do this, we can use environment variables and curl. For each read paste appropriate value. It won't echo since it is specified with -s flag. The tag you created will likely be the last one in the array.

$ read -s HABITICA_USER
$ read -s HABITICA_KEY
$ export HABITICA_USER HABITICA_KEY
$ curl -s -H "x-api-user: $HABITICA_USER" \
 -H "x-api-key: $HABITICA_KEY" \
 -H "x-client: $HABITICA_USER-taskscheduler10" \
 https://habitica.com/api/v3/tags

...
    {
      "id": "a2e84af7-4b0d-46a3-8dcc-3b94b4205e59",
      "name": "Learning"
    },
    {
      "id": "4aedf1fc-8dd7-4ff9-95f1-1f3112a0b815",
      "name": "Automated"
    }
  ],
  "notifications": [],
  "userV": 21793,
  "appVersion": "5.26.1"
}
Enter fullscreen mode Exit fullscreen mode

Edit your HabiticaProcessTaskList function's environment variables and add
TASK_TAG with the value you received from above commands.

Function for creating tasks in Habitica

I will get the tag ID from an environment variable. The new function will take the list of new tasks, format it appropriately and batch send it to Habitica API. Afterwards it will return the list of tuples - our task ID and UUID in Habitica that we will need to save back to DynamoDB. This function will be called only once per file update in S3.

I will copy auth.py from the previous project and use it to create appropriate headers.

import requests, os
from auth import get_headers

HABITICA_URL="https://habitica.com/api/v3"
TASK_TAG = os.getenv("TASK_TAG", "")
HEADERS = get_headers()

def batch_create_tasks(tasks) -> list[(str, str)]:
    habitica_tasks = [ create_task(task, TASK_TAG) for task in tasks ]
    original_ids = [task['id'] for task in tasks]

    url = f"{HABITICA_URL}/tasks/user"
    response = requests.post(url, json=habitica_tasks, headers=HEADERS)
    code = response.status_code

    if code == 200 or code == 201:
        data = response.json()['data']
        uuids = [ t['id'] for t in data ] if isinstance(data, list) else [data['id']]
        return list(zip(original_ids, uuids))

    raise Exception(response.json()['message'])


def create_task(task: dict, tag: str = "") -> dict:
    # A helper function to format the task as needed
    data = {
        "text": task['title'],
        "type": "todo",
        "priority": task['difficulty'],
        "attribute": task['attribute']
    }

    if 'date' in task and task['date']:
        data['date'] = task['date'].isoformat()

    if tag:
        data['tags'] = [tag]

    return data
Enter fullscreen mode Exit fullscreen mode

I built the SAM template and deployed the updated function again. I checked on Habitica's side if the new tasks were created and was pleasantly surprised. I also checked the DynamoDB table and new UUIDs were in place. Now it's time to implement the last part - updating the tasks in Habitica after they were updated in DynamoDB.

Tasks in Habitica

Lambda for updating

Before we create the Step Function that will be triggered by upstream Lambda that parses the new task list, we will create the downstream Lambda that will be called by this Step Functions. It will read the event that contains the list of task IDs to be updated and will return the same list with the processed item removed. For simplicity the event will also contain Finished - a boolean that will determine if a loop in the Step Function should break or not.

First, we will retrieve the task from DynamoDB and format it so that it fits Habitica's API. We will throw exceptions in case of problems but it won't crash our process. It's just for logging.

import boto3, os

TABLE_NAME = os.getenv('TABLE_NAME')
ddb = boto3.client('dynamodb').Table(TABLE_NAME)

def get_formatted_task(task_id: str) -> tuple[str, dict]:
    row = ddb.get_item(Key={'id': task_id})

    if 'Item' in row:

        if 'uuid' not in row['Item'] or not row['Item']['uuid']:
            raise Exception(f"Task {task_id} does not have a UUID!")

        task = row['Item']
        return task['uuid'], format_task(task)

    raise Exception(f"Task {task_id} not found!")


def format_task(task: dict) -> dict:
    formatted = {
        "text": task['title'],
        "priority": task['difficulty'],
        "attribute": task['attribute'],
        "date": None # Can be used for clearing the due date
    }

    if 'date' in task and task['date']:
        formatted['date'] = task['date']

    return formatted
Enter fullscreen mode Exit fullscreen mode

Now we can create a new function that will just send a request to Habitica to update the task. This one is just a very simple PUT request. auth.py is also needed to be copied into this Lambda's directory.

import requests

HABITICA_URL="https://habitica.com/api/v3"

def update_task(headers: dict, uuid: str, data: dict):
    url = f"{HABITICA_URL}/tasks/{uuid}"

    response = requests.put(url, json=data, headers=headers)
    code = response.status_code
    if code == 200:
        return response.json()
    raise Exception(response.json()['message'])
Enter fullscreen mode Exit fullscreen mode

As the last step, we glue together the functions and manage the list of tasks received from Step Function.

def lambda_handler(event, context):
    tasks = event.get('List', [])

    if not tasks:
        event['Finished'] = True
        return event

    try:
        task = event['List'][0]
        uuid, task = get_formatted_task(task)
        update_task(HEADERS, uuid, task)
    except Exception as e:
        print(e) # We will continue processing and just log problems

    event['List'] = event['List'][1:]
    # We will also control Step Function's loop from here
    event['Finished'] = len(event['List']) == 0

    return event
Enter fullscreen mode Exit fullscreen mode

Step Function for updating and delaying each call

Now it's time to define a Step Function that will be triggered by the first Lambda and will execute the second Lambda in a loop with a delay. We can define it in AWS SAM as AWS::Serverless::StateMachine resource. The definition will look like the code below (we can use YAML for defining the states). Our state machine will also need permissions to execute Lambda. As a loop we will use Choice block that will check if input variable Finished is set to false or otherwise go to Succeed state. The Lambda will output transformed variables that will be given back to Loop Tasks. Wait block simply passes all the values as they are.

  HabiticaUpdateTasksStateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      Policies:
        - LambdaInvokePolicy:
            FunctionName: !Ref HabiticaUpdateTasksLambda
      Definition:
        StartAt: Loop Tasks
        States:
          # Loop for all tasks
          Loop Tasks:
            Type: Choice
            Default: Succeed
            Choices:
              - Variable: "$.Finished"
                BooleanEquals: false
                Next: Update Task

          # Loop start
          Update Task:
            Type: Task
            Resource: !GetAtt HabiticaUpdateTasksLambda.Arn
            Parameters:
              List.$: "$.List"
              Finished: false
            Next: Wait

          Wait:
            Type: Wait
            Seconds: 2
            Next: Loop Tasks
          # Loop end

          Succeed:
            Type: Succeed
Enter fullscreen mode Exit fullscreen mode

Step Function diagram

The diagram above shows how the Step Function look like after deploying it with SAM. As you see, we still have to define a Lambda function for updating tasks. This is a simple setup like for previous function. It will just reference secret and DynamoDB table.

  HabiticaUpdateTasksLambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: update_tasks/
      Handler: main.lambda_handler
      Runtime: python3.12
      Architectures:
        - arm64
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref HabiticaTaskListTable
        - AWSSecretsManagerGetSecretValuePolicy:
            SecretArn: !Ref HabiticaSecret
      Environment:
        Variables:
          HABITICA_SECRET: !Ref HabiticaSecret
          TABLE_NAME: !Ref HabiticaTaskListTable
Enter fullscreen mode Exit fullscreen mode

Now we will edit the first Lambda function to allow it to trigger the Step Function and pass the list of tasks to update. Insert the following policy and environment variable inside. Also add a line at the end of lambda_handler to the main.py of that first function.

  HabiticaProcessTaskList:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: process_task_list/
      # ... cut for clarity
      Policies:
        # ... cut for clarity
        - StepFunctionsExecutionPolicy:
            StateMachineName: !GetAtt HabiticaUpdateTasksStateMachine.Name
      Environment:
        Variables:
          TABLE_NAME: !Ref HabiticaTaskListTable
          HABITICA_SECRET: !Ref HabiticaSecret
          TASK_TAG: 01234567-89ab-cdef-0123-456789abcdef
          STEP_FUNCTION_NAME: !Ref HabiticaUpdateTasksStateMachine
Enter fullscreen mode Exit fullscreen mode
# ... imports

step = client('stepfunctions')
STEP_FUNCTION_NAME = os.getenv('STEP_FUNCTION_NAME')

def lambda_handler(event, context):
    ids = []
    # ... code follows
    # If the list is empty, we don't have to even execute the Step Function
    if len(ids) > 0 and STEP_FUNCTION_NAME:
        print(f"Starting Step Function {STEP_FUNCTION_NAME} with {len(ids)} tasks.")
        step.start_execution(stateMachineArn=STEP_FUNCTION_NAME, input=json.dumps({"List": ids, "Finished": False}))

    return {
      'statusCode': 200,
      'body': {
        'List': ids,
        'Finished': len(ids) == 0
        }
      }
Enter fullscreen mode Exit fullscreen mode

Final test

I deleted all the tasks in Habitica and DynamoDB. I uploaded the first list of tasks and an update to it. The tasks were created as expected and updated in Habitica as well!

Tasks updated

I also checked how the Step Function behaves. It correctly looped twice for two updated tasks. Step Functions allow to look into the execution for each step, check the inputs and outputs during the process. It's very useful for debugging.

Whole execution

Input and output

As mentioned before, this solution does not support deletion of tasks. Another idea is to connect a Git repository with CodePipeline that will update the file in S3. For a developer, such a solution would be even more usable than uploading files to S3 even using AWS CLI. That way you would also be able to track history of changes.

💖 💪 🙅 🚩
ppabis
Piotr Pabis

Posted on July 13, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related