How to use Databricks managed Delta tables in a Kedro project

astrojuanlu

Juan Luis Cano Rodríguez

Posted on August 17, 2023

How to use Databricks managed Delta tables in a Kedro project

In this blog post, we'll guide you through the specifics of building a Kedro project that uses managed Delta tables in Databricks using the newly-released ManagedTableDataSet.

What is Kedro?

Kedro is a toolbox for production-ready data science. It's an open-source Python framework that enables the development of clean data science code, borrowing concepts from software engineering and applying them to machine-learning projects. A Kedro project provides scaffolding for complex data and machine-learning pipelines. It enables developers to spend less time on tedious "plumbing" and focus on solving new problems.

What is Databricks?

Databricks is a unified data analytics platform designed for simplifying big data processing and free-form data exploration at any scale. Based on Apache Spark, an open-source distributed computing system, Databricks provides a collaborative cloud-based environment where users can process large amounts of data.

The platform provides collaborative workspaces (notebooks) and computational resources (clusters) to run code with. Clusters are groups of nodes that run Apache Spark. Notebooks are collaborative web-based interfaces where users can write and execute code on an attached cluster.

Why use Kedro on Databricks?

As we've described, Kedro offers a framework for building modular and scalable data pipelines, while Databricks provides a platform for running Spark jobs and managing data. You can combine Kedro and Databricks to build and deploy data pipelines and get the best of both worlds. Kedro's open-source framework will help you to build well-organised and maintainable pipelines, while Databricks' platform will provide you with the scalability you need to run your pipeline in production. Check out the recently-updated Kedro documentation for a set of workflow options for integrating Kedro projects and Databricks. (Additionally, the third-party kedro-mlflow plugin integrates mlflow capabilities inside Kedro projects to enhance reproducibility for machine learning experimentation).

What are Kedro datasets?

Kedro datasets are abstractions for reading and loading data, designed to decouple these operations from your business logic. These datasets manage reading and writing data from a variety of sources, while also ensuring consistency, tracking, and versioning. They allow users to maintain focus on core data processing, leaving data I/O tasks to Kedro.

What is managed data in Databricks?

To understand the concept of managed data in Databricks, it is first necessary to outline how Databricks organises data. At the highest level, Databricks uses metastores to store the metadata associated with data objects. Databricks Unity Catalog is one such metastore. It provides data governance and management across multiple Databricks workspaces. The metastore organises tables (where your data is stored) in a hierarchical structure.

The highest level of organisation in this hierarchy is the catalog. Catalogs are a collection of databases (also referred to as schemas in Databricks' terminology). A database is the second level of organisation in the Unity Catalog namespacing model. Databases are a collection of tables. The tables in a database are the third level of organisation in this hierarchy.

A table is structured data, stored as a directory of files on cloud object storage. By default, Databricks creates tables as Delta tables, which store data using the Delta Lake format. Delta Lake is an open-source storage format that offers ACID transactions, time travel and audit history.

Databricks tables belong to one of two categories: managed and unmanaged (external) tables. Databricks manages both the data and associated metadata of managed tables. If you drop a managed table, you will delete the underlying data. The data of a managed table resides in the location of the database to which it is registered.

On the other hand, for unmanaged tables, Databricks only manages the metadata. If you drop an unmanaged table, you will not delete the underlying data. These tables require a specified location during creation.

How to work with managed Delta tables using Kedro

Let's demonstrate how to use the ManagedTableDataSet with a simple example on Databricks. You'll need to open a new Databricks notebook and attach it to a cluster to follow along with the rest of this example, which runs on a workspace using a Hive metastore. We'll create a dataset containing weather readings, save it to a managed Delta table on Databricks, append some data, and access a specific table version to showcase Delta Lake's time travel capabilities.

Run every separate code snippet in this section in a new notebook cell.

The first steps are to set up your workspace by creating a weather database in your metastore and installing Kedro. Run the following SQL code to create the database:

%sql
create database if not exists weather;
Enter fullscreen mode Exit fullscreen mode

To install Kedro and the ManagedTableDataSet, use the %pip magic:

%pip install kedro kedro-datasets[databricks.ManagedTableDataSet]
Enter fullscreen mode Exit fullscreen mode

The first part of our program will create some weather data. We'll create a Spark DataFrame with four columns: date, location, temperature, and humidity to store our weather data. Then, we'll use a new instance of ManagedTableDataSet to save our DataFrame to a Delta table called 2023_06_22 (the day of the readings) in the weather database.

from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType)
from kedro_datasets.databricks import ManagedTableDataSet

spark_session = SparkSession.builder.getOrCreate()

# Define schema
schema = StructType([
    StructField("date", StringType(), True),
    StructField("location", StringType(), True),
    StructField("temperature", IntegerType(), True),
    StructField("humidity", IntegerType(), True),
])

# Create DataFrame
data = [
    ('2023-06-22', 'London', 27, 39),
    ('2023-06-22', 'Warsaw', 28, 40),
    ('2023-06-22', 'Bucharest', 32, 38),
]
spark_df = spark_session.createDataFrame(data, schema)

# Create a ManagedTableDataSet instance using a new table named '2023_06_22'
weather = ManagedTableDataSet(database="weather", table="2023_06_22")

# Save the DataFrame to the table
weather.save(spark_df)
Enter fullscreen mode Exit fullscreen mode

To load our data back into a dataframe, we use the load method on ManagedTableDataSet:

# Load the table data into a DataFrame
reloaded = weather.load()

# Print the first 3 rows of the DataFrame
display(reloaded.take(3))
Enter fullscreen mode Exit fullscreen mode

This code loads the data from the weather table back into a Spark DataFrame and shows the first three rows of the data:

|   date   | location | temperature | humidity |
|:--------:|:--------:|:-----------:|:--------:|
|2023-06-22|Bucharest |     32      |   38     |
|2023-06-22|  London  |     27      |   39     |
|2023-06-22|  Warsaw  |     28      |   40     |
Enter fullscreen mode Exit fullscreen mode

Let's say we take some more weather readings later in the day and want to add them to our Delta table. To do this, we can write to it using a new instance of ManagedTableDataSet initialised with "append" passed in as an argument to write_mode:

# Append new rows to the data
new_rows = [
    ('2023-06-22', 'Cairo', 35, 25),
    ('2023-06-22', 'Lisbon', 28, 44),
]
spark_df = spark_session.createDataFrame(new_rows, schema)

weather = ManagedTableDataSet(
    database="weather",
    table="2023_06_22",
    write_mode="append"
)
weather.save(spark_df)
Enter fullscreen mode Exit fullscreen mode

The code above adds new rows for Cairo and Lisbon to our Delta table, which creates a new version of the table.

The ManagedTableDataSet class allows for saving data with three different write modes: overwrite, append, and upsert:

  • overwrite mode will completely replace the current data in the table with the new data.

  • append mode will add new data to the existing table.

  • upsert mode updates existing rows and inserts new rows, based on a specified primary key. Notably, if the table doesn't exist at save, the upsert mode behaves similarly to append, inserting data into a new table.

Suppose we later want to access our data as it appeared earlier in the day when we had only taken three readings. The ManagedTableDataSet class supports accessing different versions of the Delta table. We can access a specific version by defining a Kedro Version and passing it into a new instance of ManagedTableDataSet:

from kedro.io import Version

# Load version 0 of the table
weather = ManagedTableDataSet(
    database="weather",
    table="2023_06_22",
    version=Version(load=0, save=None)
)
reloaded = weather.load()
display(reloaded)

# Load version 1 of the table
weather = ManagedTableDataSet(
    database="weather",
    table="2023_06_22",
    version=Version(load=1, save=None)
)
reloaded = weather.load()
display(reloaded)
Enter fullscreen mode Exit fullscreen mode

You will see two rendered tables as the output of running this code. The first corresponds to version 0 of the 2023_06_22 table, while the second corresponds to version 1:

|   date   | location | temperature | humidity |
|:--------:|:--------:|:-----------:|:--------:|
|2023-06-22|Bucharest |     32      |   38     |
|2023-06-22|  London  |     27      |   39     |
|2023-06-22|  Warsaw  |     28      |   40     |

|   date   | location | temperature | humidity |
|:--------:|:--------:|:-----------:|:--------:|
|2023-06-22|Bucharest |     32      |   38     |
|2023-06-22|  London  |     27      |   39     |
|2023-06-22|  Warsaw  |     28      |   40     |
|2023-06-22|  Lisbon  |     28      |   44     |
|2023-06-22|  Cairo   |     35      |   25     |
Enter fullscreen mode Exit fullscreen mode

And that's it! We've put together a simple program to show some of the usual tasks that ManagedTableDataSet facilitates, making it easy to save, load, and manage versions of your data in Delta tables on Databricks.

Conclusion

Databricks is a fast-growing deployment vector for Kedro projects. This blog post has demonstrated how to combine the power of both Kedro and Databricks with an open-source ManagedTableDataSet that enables streamlined data I/O operations when deploying a Kedro project on Databricks. ManagedTableDataSet empowers you to spend more time implementing the business logic of your data pipeline or machine learning workflow and less time manually handling data.

💖 💪 🙅 🚩
astrojuanlu
Juan Luis Cano Rodríguez

Posted on August 17, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related