Hadoop Migration: How we pulled this off together

sophel

Kleinanzeigen & mobile.de

Posted on April 16, 2023

Hadoop Migration: How we pulled this off together

A short guide to help understand the process of migrating old analytical data pipelines to AWS by following the Data Mesh strategy.
by Aydan Rende, Senior Data Engineer at eBay Kleinanzeigen

Hadoop was used as a data warehouse in a few marketplaces in the former eBay Classifieds Group (now part of Adevinta) including eBay Kleinanzeigen for a long time. While it served analytical purposes well, the central teams wanted to say goodbye to this old friend. The reason was simple: it was old and costly.

Before diving into the solution, let’s take a look at eBay Kleinanzeigen’s Hadoop data pipeline:

Deprecated Hadoop data pipeline

  1. The monolith is the main backend service of eBay Kleinanzeigen. It has several Kafka topics and produces analytical events in JSON format to the Kafka Cluster.
  2. These topics are consumed and ingested to Hadoop by the Flume Ingestor.
  3. The monolith runs scheduled jobs every midnight to fetch and aggregate the analytical data.
  4. Finally these aggregates are stored in KTableau which is a MySQL database.

I have marked the problematic components of this pipeline in dark magenta:

  • Hadoop is not maintained by Cloudera and runs as an old version, which means that the maintenance costs extra.
  • Kafka cluster is on-prem and again in the old version (v1). We had a strict deadline from the DevOps team to shut down the cluster because the hardware reached its end of life.
  • KTableau is not a Tableau instance, it's a non-maintained on-prem MySQL. I have marked this in pink because this is the next one to get rid of. (K-Tableau: K comes from Kleinanzeigen)
  • On-prem monolithic service is the main serving point of the eBay Kleinanzeigen platform. It's a bottleneck, however. The service also runs analytical jobs, but mostly fails silently.

The problems I have outlined give good reasons to change the data setup, as the entire company is, as a matter of fact, in the process of cleaning up and moving away from on-prem to AWS, from monolith to microservices etc. So why not clean up the old analytical data pipeline as well? Yet, we had a teeny tiny issue to deal with; our Data Team was relatively small, so the question was „How do we pull this off together in a short time?"

Data Mesh to the Rescue

Data Mesh is a decentralised data management paradigm that allows teams to create their own data products suited to the company policies by using a central data infrastructure platform. This paradigm aligns with Domain Driven Design which eBay Kleinanzeigen successfully implements for the teams. The teams own a domain and they can also own domain data products as well.

Data Mesh is not new to Adevinta (our parent company). Adevinta's central teams already provide a self-serve data platform called DataHub and the marketplaces use this platform autonomously. It has several managed data solutions from data ingestion to data governance. Our task was to learn and create a new data pipeline with these services. However, we also wanted to use dbt for the transformation layer of the ETL process in addition to the services provided because we wanted to keep the transformation layer neat and versioned.

This migration seems to be more important because it's the beginning of the Data Mesh strategy at eBay Kleinanzeigen. It's great that our teams already own domains, but owning data products is new to them. Therefore, we decided to create a proof of concept, migrate the existing datasets from Hadoop to the new design and explain to teams the ownership of data.

The New Design

Data pipeline of a domain

The new design looks more complicated, but, in fact, it's easier to adopt by the teams, as they can reach out to the central services and integrate with the entire data ecosystem that exists in Adevinta. Hence, it provides data ownership out of the box.

In the new design:

  • The backend services already use Kafka to emit events, however, the new Kafka cluster is on AWS and is „managed" which means that maintenance of the cluster is taken care of by the central teams.
  • Scheduled jobs are run in Airflow in a more resilient way. It's better to trace the logs and get notified of errors on Airflow. We no longer need to dive into logs of the big monolith backend service where it is polluted by the other service logs.
  • Data transformation is performed in dbt instead of the backend services. Data analysts can go to the dbt repository and check the SQL queries instead of reading through the backend service code to understand the reporting query.
  • We leverage the central services as much as possible to reduce the DevOps effort and costs.

With these changes, we not only deprecate the old Hadoop instance, but also take the analytics load away from the backend services, which are supposed to be busy with the business transactions anyway, not the analytical transactions.

Managed Kafka

Managed Kafka is a data streaming solution that is an AWS Kafka Cluster and is owned by the Adevinta Storage Team. The central team offers maintained secure Kafka Clusters, provides metrics and on-call services. All we need to do is create new Kafka topics to replace the old Kafka topics running on-prem. We have also changed the record type: it was JSON in the old setup, but we decided to use AVRO to have schemas available in the repositories with the version control system (Github in our case).

Metrics of the Managed Kafka Cluster

DataHub Sink

Sink is an in-house event router that consumes Kafka topics, transforms, filters events and stores them inside the S3 bucket or another Managed Kafka topic. In this phase, we collect the raw data, convert it to Delta format and store it to our AWS S3 bucket with a sink. Delta format gives us ACID (Atomicity, Consistency, Isolation, and Durability) properties that guarantee a consistent version of the tables at any read time, even in case of concurrent writes. It thus avoids inconsistent or incomplete reads.

Databricks

Databricks is an analytical data service that provides data lake & data warehouse capabilities together in one platform. This was not an ideal choice for our setup, if you consider that we already have an AWS Data Lake. Databricks is not offered by Datahub, but by another central team. It has already been used by our data analysts, so we tried to stick with that and mounted Databricks to our S3 bucket instead. Once the delta files are collected under the S3 path, we create a table in Databricks. You can read more about the mounting in this document.

Data Build Tool (dbt)

dbt is a data transformation tool that enables data analysts and scientists to run transformation workflows while benefiting from the software engineering practices such as collaboration on data models, versioning them, testing and documentation. dbt also provides a lineage graph between fact and dimension tables so that dependencies can be visualised in the document generated.

We created a dbt repository that has several SQL models and is integrated with Databricks. We implemented the CI/CD pipeline with Github actions so that every time we release a new model in dbt, a docker image is created together with the entire dbt repository, secrets and dbt profile and then this image is pushed to Artifactory. The image is later fetched by the Airflow operator and is run in a schedule. Another great feature of dbt is that we can easily switch the warehouse setup from Databricks to Redshift in the future by making only a few changes.

Airflow

Airflow is a great job orchestration tool and a managed version of Airflow is offered by Adevinta central teams. Managed Airflow is a managed Kubernetes cluster that comes with the Airflow service and a few operators configured out of the box. In the managed cluster, it is difficult for us to install packages on our own. We need to request this from the owning team. We are also not the only tenants in the cluster, which means that, even if the central team agrees to install the required packages, a package conflict can affect the other tenants. That's why we decided to run dbt within a docker container with the KubernetesPodOperator. It's also a best practice to containerise as much as possible due to Airlflow's instabilities, which are described in this blog post in more detail. KubernetesPodOperator instantiates a pod to run your image within the Kubernetes cluster. This gives us the ability to create an isolated environment so that we can install whatever dependency we want to in order to execute the dbt command.

Here is an example of a DAG in Airflow that we executed to produce a data mart sent by email:

namespace = Variable.get("UNICRON_NAMESPACE", default_var="NO_NAMESPACE")
environment = Variable.get("UNICRON_USER_ENV", default_var="NO_ENV")
docker_image = Variable.get("DBT_DOCKER_IMAGE", default_var="NO_IMAGE")

default_args = {
   'owner': 'det',
   'depends_on_past': False,
   'email': ['some.email@adevinta.com'],
   'email_on_failure': True,
   'email_on_retry': False,
   'retries': 3,
   'retry_delay': timedelta(minutes=5),
}
with DAG(
       dag_id=os.path.basename(__file__).replace(".py", ""),
       default_args=default_args,
       description='Generates Kleinanzeigen email send-out analytics report for the last day',
       schedule_interval="0 3 * * *",
       is_paused_upon_creation=False,
       start_date=datetime(2023, 1, 3),
       on_failure_callback=send_dag_failure_notification,
       catchup=False,
       tags=['belen', 'analytics', 'email_sent', 'dbt_image'],
) as dag:

   test_email_sent = KubernetesPodOperator(
       image=docker_image,
       cmds=["dbt", "test", "--select", "source:ext_ka.email_sent", "-t", environment ],
       namespace=namespace,
       name="test_email_sent",
       task_id="test_email_sent",
       get_logs=True,
          image_pull_secrets=[k8s.V1LocalObjectReference("artifactory-secrets")],
       dag=dag,
       startup_timeout_seconds=1000,
       )

   execution_mart = KubernetesPodOperator(
       image=docker_image,
       cmds=["dbt", "run", "--select", "mart_email_sent", "-t", environment ],
       namespace=namespace,
       name="execution_mart",
       task_id="execution_mart",
       get_logs=True,
      image_pull_secrets=[k8s.V1LocalObjectReference("artifactory-secrets")],
       dag=dag
       )

   test_mart = KubernetesPodOperator(
       image=docker_image,
       cmds=["dbt", "test", "--select", "mart_email_sent", "-t", environment ],
       namespace=namespace,
       name="test_mart",
       task_id="test_mart",
       get_logs=True,
       image_pull_secrets=[k8s.V1LocalObjectReference("artifactory-secrets")],
       dag=dag
       )

   test_email_sent >> execution_mart >> test_mart
Enter fullscreen mode Exit fullscreen mode

The only disadvantage of running dbt in a Kubernetes pod is that you are not able to see the fancy lineage graph of dbt while the steps are executed, as in dbt Cloud. However, the dbt models are generated separately in the Airflow DAG, so you can still see the failing steps and integrate Slack Webhook to receive notifications. Besides, a Github action can be configured to generate dbt docs every time a change is made in the main branch.

Summary

In this blog post, we provide a short guide to help you understand the process of migrating eBay Kleinanzeigen's old analytical data pipeline to AWS by leveraging Adevinta's Data Platform.

Basically, in our new data pipeline, we:

  • collect events with Kafka topics,
  • convert to Delta and store in S3 buckets with a Data Sink,
  • create Databricks tables of the stored S3 locations,
  • create data models with dbt and store in Databricks (again in an S3 bucket)
  • run the dbt models in a schedule with Airflow

The data marts produced are available in Databricks where analysts can easily access and create a Tableau dashboard. While Databricks is an interface to the analysts, it lacks visibility to the other teams. In the future, we plan to integrate a Glue crawler into our S3 bucket so that we can register the datasets in the data catalogue and achieve integration with the access management services of Adevinta.

So, what's next?

Please, keep in mind that Data Mesh is an approach. As such, it may differ based on the company setup. Thanks to the self-service data infrastructure of Adevinta, however, we were able to migrate from Hadoop to AWS in a quite a short period of time with a very small team.

The next step for our team is that we need to:

  • make the pipeline easy to integrate with all domain teams
  • give the teams the necessary technical support
  • explain the Data Mesh strategy and the importance of data products
  • and make sure the domain teams own the data products by testing, documenting and cataloguing properly.

Changes are difficult to make, especially the conceptual ones, but if we want to quickly scale and work with the high volume, diverse and frequently changing data, we will need to do this together.

💖 💪 🙅 🚩
sophel
Kleinanzeigen & mobile.de

Posted on April 16, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related