From Messy Data to Super Mario Pipeline: My First Adventure in Data Engineering

jampamatos

Jampa Matos

Posted on June 20, 2024

From Messy Data to Super Mario Pipeline: My First Adventure in Data Engineering

Welcome to the thrilling tale of my very first automated data pipeline!

Imagine you’ve just been handed a database that looks like it’s been through Bowser’s castle and back. Yes, it’s that messy. The mission? To transform this chaos into a clean, analytics-ready dataset with as little human intervention as possible. Sounds like a job for Mario himself, right? Well, buckle up, because this is the story of how I tackled the subscriber data cleanup project, dodged fireballs, and came out victorious.

When I first opened the cademycode.db file, I felt like Mario entering a warp pipe into an unknown world. It was a mess. Missing values, inconsistent data types, and duplicates everywhere! But hey, every great adventure starts with a challenge, and this was mine.

In this post, I’ll take you through my journey of building my very first automated data pipeline. We’ll dive deep into the nitty-gritty details and share the ups and downs along the way. Whether you’re a fellow data plumber or just curious about the magical world of data cleaning, you’re in for a treat.

So grab a 1-Up mushroom, get comfy, and let’s embark on this data adventure together! Spoiler alert: There might be some gold coins and hidden blocks of wisdom along the way.

Ready to jump into the pipe? Let’s start with how I set up the project and the initial hurdles I had to overcome. Spoiler alert: There were quite a few!

Entering the Warp Pipe

Setting up this project was like entering a warp pipe into an unknown world. I knew the journey ahead would be filled with challenges, but I was ready to tackle them head-on.

Getting Started

First things first, I needed to clone the repository and set up my working environment. I created a directory called subscriber-pipeline and jumped right into it.

mkdir -p /home/jampamatos/workspace/codecademy/Data/subscriber-pipeline
cd /home/jampamatos/workspace/codecademy/Data/subscriber-pipeline
Enter fullscreen mode Exit fullscreen mode

Next, I set up a virtual environment to keep my project dependencies isolated. If there's one thing Mario taught me, it's to always be prepared!

python3 -m venv venv
source venv/bin/activate
Enter fullscreen mode Exit fullscreen mode

Tools and Technologies

Since there were no red flower or mushrooms to collect, here’s a list of the tools and technologies I used for this project:

  • Python: The hero of our story. I used Python for data manipulation and scripting.
  • SQLite: Our trusty sidekick. This lightweight database was perfect for managing the data.
  • **Pandas: **The power-up we needed to handle data manipulation with ease.
  • Jupyter Notebook: My go-to tool for exploring and experimenting with data.
  • Bash: The magical spell that automated our pipeline.

Initial Hurdles

Setting up the environment was smooth sailing until I encountered my first Goomba: installing the required Python packages. After a few head bumps, I finally managed to get everything installed.

pip install pandas sqlite3 jupyter
Enter fullscreen mode Exit fullscreen mode

But wait, there’s more! I also needed to install some additional packages for logging and testing.

pip install unittest logging
Enter fullscreen mode Exit fullscreen mode

Facing the First Boss: Database Connection

With everything set up, it was time to connect to the database. This felt like facing the first boss. I opened the cademycode.db file, unsure of what awaited me inside. Using SQLite, I established a connection and was ready to explore the data.

import sqlite3

con = sqlite3.connect('dev/cademycode.db')
print('Database connection established successfully.')
Enter fullscreen mode Exit fullscreen mode

Suffice to say, the database was indeed as messy as Bowser’s castle. But that’s a story for the next section.

In the next part of our adventure, we'll dive into inspecting and cleaning the data. Get ready to battle missing values, inconsistent data types, and duplicates galore!

Battling the Data Monsters

With the setup complete and the database connection established, it was time to dive into the data. This part of the journey felt like battling hordes of Koopa Troopas. Every step revealed new challenges, but with determination (and some Italian pasta), I tackled them head-on.

Data Inspection

The first step was to inspect the data and understand the lay of the land. Using Pandas, I loaded the tables from cademycode.db into DataFrames and took a peek at what I was dealing with.

import pandas as pd

tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table';", con)
table_names = tables['name'].tolist()

df = {table: pd.read_sql_query(f"SELECT * FROM {table}", con) for table in table_names}

for table, data in df.items():
    print(f"Table: {table}")
    print(data.head())
Enter fullscreen mode Exit fullscreen mode

The output revealed the initial state of the data – missing values, inconsistent data types, and duplicates galore. It was like entering a haunted house in Luigi's Mansion!

(If you're curious about the output messages, you can check them at the project's Jupyter Notebook.)

Handling Missing Values

Next, I identified and handled the missing values. This was akin to collecting power-ups to boost my chances of success. For some columns, I filled the missing values with zeros, while for others, I used the median value.

students_df = df['cademycode_students'].copy()

# Fill missing values
students_df['job_id'].fillna(0, inplace=True)
students_df['current_career_path_id'].fillna(0, inplace=True)
students_df['num_course_taken'].fillna(students_df['num_course_taken'].median(), inplace=True)
students_df['time_spent_hrs'].fillna(students_df['time_spent_hrs'].median(), inplace=True)
Enter fullscreen mode Exit fullscreen mode

Again, the choice on whether I filled data with zeroes or with median values is explained in the project's Jupyter Notebook, but in a nutshell it was because job_id and current_career_path_idwere given 0 to indicate 'unemployed' and 'not enrolled' status.

Correcting Data Types

The next challenge was correcting inconsistent data types. This felt like trying to fit puzzle pieces together. With a bit of Pandas magic, I converted columns to their appropriate data types.

# Convert data types
students_df['dob'] = pd.to_datetime(students_df['dob'], errors='coerce')
students_df['job_id'] = pd.to_numeric(students_df['job_id'], errors='coerce')
students_df['num_course_taken'] = pd.to_numeric(students_df['num_course_taken'], errors='coerce')
students_df['current_career_path_id'] = pd.to_numeric(students_df['current_career_path_id'], errors='coerce')
students_df['time_spent_hrs'] = pd.to_numeric(students_df['time_spent_hrs'], errors='coerce')
Enter fullscreen mode Exit fullscreen mode

Dealing with Duplicates

No Mario adventure is complete without encountering duplicates – like those pesky Bullet Bills that keep reappearing! I identified and removed duplicate records to ensure the data was clean.

# Remove duplicates
students_df.drop_duplicates(inplace=True)
jobs_df_cleaned = df['cademycode_student_jobs'].drop_duplicates()
Enter fullscreen mode Exit fullscreen mode

Extracting Nested Data

One of the trickiest parts was dealing with nested data in the contact_info column. It was like the water stage. To nail it, I had to write a function to extract the nested information and split it into separate columns.

Before we continue, we should notice that the data in contact_info was in the form of a json object, containing the mailing address and an email, such as:

{"mailing_address": "470 Essex Curve, Copan, Mississippi, 86309", "email": "cleopatra_singleton7791@inlook.com"} 
Enter fullscreen mode Exit fullscreen mode

(Here is a nice place to add that the data used in this project is fictional, so don't worry!)

So, in order to extract that, we could treat it like a json object after all. So that's what we did:

import json

def extract_contact_info(contact_info):
    try:
        info = json.loads(contact_info.replace("'", '"'))
        return pd.Series([info.get('mailing_address'), info.get('email')])
    except json.JSONDecodeError:
        return pd.Series([None, None])

students_df[['mailing_address', 'email']] = students_df['contact_info'].apply(extract_contact_info)
students_df.drop(columns=['contact_info'], inplace=True)
Enter fullscreen mode Exit fullscreen mode

With the data cleaned and ready, I felt like I had just collected a Super Star power-up. But the adventure was far from over. Next up, I had to create the output CSV and ensure it was analytics-ready.

The Final Battle for Clean Data

With the data cleaned and ready, it was time for the final showdown: combining the data into a single, analytics-ready CSV. You know, like grabbing giant Bowser by the tail and throwing him around in Super Mario 64. There were obstacles to overcome, but I was determined to save the day (or in this case, the data).

Combining the Data

First, I needed to combine the cleaned data from multiple tables into a single DataFrame. Using Pandas, I performed the necessary joins to bring everything together:

# Merge dataframes
merged_df_cleaned = pd.merge(students_df, jobs_df_cleaned, how='left', left_on='job_id', right_on='job_id')
final_df_cleaned = pd.merge(merged_df_cleaned, df['cademycode_courses'], how='left', left_on='current_career_path_id', right_on='career_path_id')
Enter fullscreen mode Exit fullscreen mode

Validating the Final Dataset

Once the data was combined, I needed the flagpole at the end of the level: to validate the final dataset. I checked for any inconsistencies or missing values that might have slipped through.

# Fill remaining missing values
final_df_cleaned = final_df_cleaned.assign(
    career_path_id=final_df_cleaned['career_path_id'].fillna(0),
    career_path_name=final_df_cleaned['career_path_name'].fillna('Unknown'),
    hours_to_complete=final_df_cleaned['hours_to_complete'].fillna(0)
)
Enter fullscreen mode Exit fullscreen mode

Generating the Output CSV

With the data validated, it was time to generate the final CSV. I mean, even Super Mario Bros had a save game feature, right?:

# Save final DataFrame to CSV
final_df_cleaned.to_csv('dev/final_output.csv', index=False)
Enter fullscreen mode Exit fullscreen mode

Overcoming Challenges

Of course, no epic battle is without its challenges. One of the biggest hurdles was ensuring that the final DataFrame retained all the original rows and that no data was lost during the merges. After some debugging (and a few extra lives), I successfully retained the integrity of the data.

Celebrating the Victory

Finally, with the output CSV generated and validated, it was a triumphant moment. I could rest knowing that the data was now clean and ready for analysis.

Or could I?

With the final CSV in hand, the next step was to ensure that the pipeline could run automatically with minimal intervention. This meant developing unit tests and logs to keep everything in check.

Our Princess Is in Another Castle

After all the hard work of cleaning and combining the data, it might feel like the job is done. But as any Mario fan knows, "our princess is in another castle!" The journey isn't complete until the pipeline is foolproof and can run automatically without constant supervision. This meant developing unit tests and logging to ensure everything runs smoothly.

The Importance of Unit Tests

Unit tests are like Mario's power-ups—they help you tackle challenges and keep you safe from unexpected pitfalls. I implemented unit tests to ensure the data integrity and functionality of the pipeline. These tests checked for things like schema consistency, the presence of null values, and the correct number of rows.

import unittest

class TestDataCleaning(unittest.TestCase):
    def test_no_null_values(self):
        self.assertFalse(final_df_cleaned.isnull().values.any(), "There are null values in the final table")

    def test_correct_number_of_rows(self):
        original_length = len(df['cademycode_students'])
        final_length = len(final_df_cleaned)
        self.assertEqual(original_length, final_length, "The number of rows differs after the merges")

    def test_schema_consistency(self):
        original_schema = set(df['cademycode_students'].columns)
        final_schema = set(final_df_cleaned.columns)
        original_schema.discard('contact_info')
        original_schema.update(['mailing_address', 'email'])
        self.assertTrue(original_schema.issubset(final_schema), "The final table schema does not include all original columns")

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)
Enter fullscreen mode Exit fullscreen mode

Implementing Logging

Logging is essential for tracking the pipeline's execution and troubleshooting issues. Think of it as Mario's map—it helps you see where you've been and identify any trouble spots. I implemented logging to record each step of the pipeline, including updates and errors.

import logging

logging.basicConfig(filename='logs/data_pipeline.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')

def log_update(message):
    logging.info(message)

def log_error(message):
    logging.error(message)

try:
    # Pipeline code...
    log_update("Pipeline executed successfully.")
except Exception as e:
    log_error(f"Error running the pipeline: {e}")
    raise
Enter fullscreen mode Exit fullscreen mode

Creating the Changelog

To keep track of updates, I created a changelog that records version numbers, new rows added, and missing data counts.

def write_changelog(version, new_rows_count, missing_data_count):
    with open('logs/changelog.txt', 'a') as f:
        f.write(f"Version: {version}\n")
        f.write(f"New rows added: {new_rows_count}\n")
        f.write(f"Missing data count: {missing_data_count}\n")
        f.write("\n")
Enter fullscreen mode Exit fullscreen mode

Challenges and Learnings

One of the biggest challenges was ensuring the unit tests covered all edge cases. Oh, when we see 'OK' in the test suite, that feeling is unbeatable! And, with thorough testing and logging, I ensured the pipeline was robust and reliable.

Conclusion

With unit tests and logging in place, I felt confident that my pipeline could handle anything thrown its way. It was a moment of triumph, like finally rescuing Princess Peach after a long adventure.

Now, all that's left was to create a Bash script to automate the pipeline and move the updated files to the production directory. Talk about more data plumbing fun!

Automating the Pipeline—Mario Kart Style

With the data cleaned, combined, and validated, and with unit tests and logging in place, it was time to put the pipeline on autopilot. Like Mario's Kart has to be race—ready, everything needed to run smoothly and efficiently, with no banana peels or red shells in sight.

Purpose of the Bash Script

The Bash script propelled the pipeline forward with speed and precision, you know, like shooting turtle shells at other racers (but a little less fun. Just a little!). It was designed to:

  1. Execute the Python script that runs the data pipeline. 2.Check the changelog to determine if an update occurred. 3.Move the updated files from the working directory to the production directory.

The Script

Here's the Bash script that made it all possible:

#!/bin/bash

# Path to the Python script
PYTHON_SCRIPT="/home/jampamatos/workspace/codecademy/Data/subscriber-pipeline/main.py"

# Path to the production directory
PROD_DIR="/home/jampamatos/workspace/codecademy/Data/subscriber-pipeline/prod"

# Path to the changelog
CHANGELOG="/home/jampamatos/workspace/codecademy/Data/subscriber-pipeline/log/changelog.txt"

# Current version from the changelog
CURRENT_VERSION=$(grep -oP 'Version: \K.*' $CHANGELOG | tail -1)

# Execute the Python script
python3 $PYTHON_SCRIPT

# Check if the script executed successfully
if [ $? -eq 0 ]; then
    echo "Pipeline executed successfully."

    # New version from the changelog
    NEW_VERSION=$(grep -oP 'Version: \K.*' $CHANGELOG | tail -1)

    # Check if there was an update
    if [ "$CURRENT_VERSION" != "$NEW_VERSION" ]; then
        echo "Update detected. Moving files to production."

        # Move updated files to the production directory
        mv /home/jampamatos/workspace/codecademy/Data/subscriber-pipeline/dev/clean_cademycode.db $PROD_DIR/
        mv /home/jampamatos/workspace/codecademy/Data/subscriber-pipeline/dev/final_output.csv $PROD_DIR/

        echo "Files moved to production."
    else
        echo "No updates detected. No files moved to production."
    fi
else
    echo "Pipeline execution failed. Check logs for details."
fi
Enter fullscreen mode Exit fullscreen mode

Challenges on the Track

Like any Mario Kart race, there were more than a few obstacles along the way. One of the trickiest parts was ensuring the script correctly identified updates and moved the files only when necessary. After a few laps of testing and tweaking, I had the script running smoothly.

Creating an Alias

To make running the script as easy as throwing a green shell, I created an alias. This allowed me to execute the script with a simple command, no matter where I was in the terminal.

alias run_pipeline="/home/jampamatos/workspace/codecademy/Data/subscriber-pipeline/run_pipeline.sh"
Enter fullscreen mode Exit fullscreen mode

By adding this line to my ~/.bashrc file and reloading the shell, I could start the pipeline with a single command:

run_pipeline
Enter fullscreen mode Exit fullscreen mode

With the Bash script in place, my pipeline was ready to zoom along the track with minimal human intervention. It was a satisfying moment, like crossing the finish line in first place.

Wrap-up

After navigating through a maze of messy data, cleaning and validating it, automating the process, and ensuring everything runs smoothly with unit tests and logging, we've finally crossed the finish line. It’s been a wild ride, but let's take a moment to reflect on our adventure.

Summary of Tasks

Throughout this project, we aimed to build a data engineering pipeline to transform a messy database of long-term canceled subscribers into a clean, analytics-ready dataset. Here's a summary of the key tasks we accomplished:

  1. Setting Up the Project: We began by setting up our working directory and ensuring all necessary files and tools were in place.
  2. Inspecting and Cleaning the Data: We imported the tables from cademycode.db into dataframes, inspected them for missing or invalid data, and performed various data cleaning operations. This included handling null values, correcting data types, and dealing with duplicates.
  3. Creating the Output CSV: Using the cleaned data, we produced an analytics-ready SQLite database and a flat CSV file. We validated the final table to ensure no data was lost or duplicated during the joins.
  4. Developing Unit Tests and Logs: We converted our Jupyter Notebook into a Python script. The script includes unit tests to check for updates to the database and to protect the update process. It also includes logging to track updates and errors.
  5. Creating the Bash Script: We created a Bash script to handle running the Python script and moving updated files from the working directory to the production directory. The script checks the changelog to determine if an update occurred before moving the files.

Final Thoughts

Building my first automated data pipeline was an exciting and challenging journey. It felt like a typical Super Mario stage, filled with obstacles and power-ups. Along the way, I learned valuable lessons about data cleaning, automation, and the importance of thorough testing and logging.

Conclusion

In conclusion, this project successfully demonstrates how to build a robust data engineering pipeline that automates the transformation of raw data into a clean and usable format. By following a structured approach, we ensured that the pipeline is reliable, maintainable, and easy to understand. The inclusion of unit tests and logging provides additional safeguards and transparency, making it easier to monitor and debug the process.

This project not only serves as a valuable addition to my portfolio but also equips me with practical experience in handling real-world data engineering challenges. The skills and methodologies applied here are transferable to a wide range of data engineering tasks, ensuring I am well-prepared for future projects and roles in the field.

Thank you for joining me on this adventure! If you have any questions or comments, feel free to leave them below. I’d love to hear about your own data engineering experiences and any tips you might have. Until next time, keep racing towards your data goals and may your pipelines always be free of banana peels!

💖 💪 🙅 🚩
jampamatos
Jampa Matos

Posted on June 20, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related