Automation
Jun 13, 2023

Scale your Marketing Data Warehouse Using Airflow - Part 1

Learn how to use Airflow to construct a large-scale marketing data warehouse

Scale your Marketing Data Warehouse Using Airflow - Part 1

Having your own marketing data warehouse is now unquestionable. There are typically two approaches to building a marketing warehouse: purchasing a connector tool like Supermetrics or Fivetran that connects, for example, with BigQuery, or choosing to develop your own system.

Purchasing a connector tool offers a ready-made solution that can save you time, but it can be expensive and lacks customization. On the other hand, building your own system provides greater control and flexibility over your data, but it comes with its own set of challenges.

Our focus here is on companies that have chosen the second route – those that have developed their own systems for pulling data from marketing platforms for multiple accounts. A perfect example is agencies.

They typically use Python scripts that loop over a list of accounts, extracting data from different sources like Google Ads, Facebook Ads, Google Analytics, etc. Each data extraction is treated as a task within the script, and these tasks run sequentially for each client. These scripts can be deployed on Cloud Functions, Lambda, or using classic cron jobs.

Consider a scenario where you have individual Python scripts for each step in your ETL process:

  • Google Ads data extractor (google_ads.py)
  • Facebook Ads data extractor (facebook_ads.py)
  • Google Analytics data extractor (google_analytics.py)
  • Data cleaning script (clean_data.py)
  • Data transformation script (transform_data.py)
  • Data loading script to load the data into BigQuery (load_to_bigquery.py)

This approach might work fine when you're dealing with a smaller scale or fewer platforms. But as your marketing operations grow, managing these individual scripts can become daunting and  comes with several problems:

  • Manual Monitoring: Each task within the script must be individually monitored to ensure it has successfully run. As the number of clients and channels (and, therefore, the tasks) grows, the process requires more attention and can quickly become complex and cumbersome.
  • Error Handling: Failure in any task will break the loop, causing a halt to the data extraction process for all the subsequent tasks and clients. Diagnosing the problem, fixing it, and manually restarting the task is required. If a task depends on the output of a previous one, the whole pipeline might need to be restarted from the point of failure.
  • Lack of Centralized Logging: With each task potentially having its own logging system, it is difficult to have a unified view of the data extraction process. Consolidating logs from different tasks for effective debugging and understanding the overall health of the system is quite challenging.
  • Limited Retry Mechanisms: Transient failures like network issues or API rate limits can cause tasks to fail. Handling such issues requires the implementation of a retry mechanism within each task, adding an additional layer of complexity to the scripts.
  • Scaling Challenges: As the agency grows and manages more clients, the number of tasks grows proportionally. Managing individual tasks for each client and for each type of data quickly becomes unmanageable.
  • Deployment Complexity: When these scripts are deployed as cloud functions, each task must be deployed, managed, and monitored separately. This results in increased complexity and overhead, further hampering the process's efficiency and scalability.

Manually managing these scripts means executing each one in the correct sequence, ensuring each one finishes successfully before starting the next. If a script fails, you must diagnose the problem, fix it, and then restart the sequence from that point. This is where Apache Airflow comes into the picture.

Apache Airflow is an open-source platform that programmatically orchestrates your workflows, creating pipelines of tasks that execute in a specified order, and automatically managing the data flow between tasks. There are 2 core concepts of Airflow:

  1. DAG: Directed Acyclic Graph. An Airflow DAG is a workflow defined as a graph, where all dependencies between nodes are directed and nodes do not self-reference
  2. Task: A step in a DAG describing a single unit of work.
DAG consists of 4 tasks a, b, c, and d

In the context of our marketing data warehouse, each Python script becomes a task in the Airflow DAG. You can set dependencies that ensure your data is extracted, cleaned, transformed, and loaded into BigQuery in the correct sequence.
Here are the key features of Apache Airflow that address the challenges mentioned above:

  • Centralized Monitoring: Airflow comes with a user-friendly interface for monitoring. Here, you can easily visualize your DAGs, track task progress, view past runs, and identify any failures.
  • Automated Error Handling: With Airflow, if a task fails, the system can be set up to alert you via email, Slack, or other mediums. This way, you're instantly notified when an issue arises.
  • Advanced Retry Mechanisms: Airflow has built-in support for task retries. You can define the number of retries and the time interval between them. In case of transient failures, this can ensure your pipeline recovers without manual intervention.
  • Scalability: Airflow is designed to scale, whether you have five tasks or five hundred. It can distribute tasks across a cluster of workers and scale based on the workload. As your data sources or volumes grow, Airflow grows with you.
  • Scheduled Execution: Apache Airflow has powerful scheduling capabilities. You can schedule your workflows to run at fixed intervals (say, every day at 3 AM), on specific days (like every Monday), or even define more complex schedules. You can also backfill data for a specific period in the past, which is beneficial for loading historical data. Unlike running scripts manually or via cron jobs.

In this article, we will cover how to convert your classic Python ETL scripts into Airflow DAGs. The article is split into 3 parts:

  • Part 1: We will cover installing Airflow using Docker and creating your first DAG.
  • Part 2: We will cover how to structure your DAG, how to run backfill, and how to use dynamic task mapping to execute tasks for hundreds of accounts.
  • Part 3: We will cover deployment on Kubernetes and how it can save you a lot of money instead of using managed services like Composer.

Installing Apache Airflow Using Docker

The simplest way to install Airflow locally on your machine is to use Docker. If you're not familiar with Docker, I'd recommend checking this out. Just make sure to download and install the Docker desktop app, and I will guide you step by step through the installation process.

Step 1: Create a new project or folder and name it 'airflow'

Step 2: Within your project, execute the following command in the terminal to fetch the docker-compose.yaml file:


curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.1/docker-compose.yaml'

Step 3: Create three folders inside your project for the following purposes:

  • ./dags -  This folder will contain your DAG files.
  • ./logs - This folder will store logs from task execution and the scheduler.
  • ./plugins - You can place your custom plugins in this folder.

You have two options to create these folders: either manually create them or use the following command in the terminal:


mkdir -p ./dags ./logs ./plugins

Your project structure should now resemble the following:

Step 4: Let's initialize the project by executing the following command:


docker compose up airflow-init

This process may take approximately 5 to 10 minutes to install all the containers. Once the initialization is complete, you should see a message similar to the following:


start_airflow-init_1 exited with code 0

Step 5: Now that we're done, let's start Airflow by running the following command:


docker compose up

After starting Airflow, you can access the web interface by visiting: http://0.0.0.0:8080/. You will be prompted to enter a username and password, both of which should be set as "airflow".

Once you are inside Airflow, you will notice a set of example DAGs displayed. 

Since we don’t need any of these examples, let’s clean them up by disabling DAG examples in the docker-compose.yaml file. Set the value of AIRFLOW__CORE__LOAD_EXAMPLES to "false" to prevent the examples from being loaded.

To apply the changes and ensure that the Docker containers are restarted, follow these steps:
Execute the following command to stop the Docker containers:


docker compose down

Once the containers are stopped, start them again using the following command:


docker compose up

After the containers are restarted, you should now observe that there are no DAGs displayed in Airflow, as the examples have been disabled and removed.

Creating your first DAG

To create your first DAG in Airflow, follow these steps.Go to the ./dags folder and create a new file. Name it, for example: ga4_dag.py.

There are two syntaxes to create a DAG in Airflow. The first one involves using standard constructors as shown below:


from airflow import DAGfrom airflow.operators.python 
import PythonOperator
from datetime import datetime, timedelt

ga4_dag = DAG(dag_id="ga4_dag",
              start_date=datetime(2023, 1, 1),
              schedule_interval="@daily", 
              catchup=False)

The second approach, which is the preferred way, is to use decorators. This method will be used throughout the articles. To create a DAG, you can use the @dag decorator and @task decorator for tasks.

Let's create a GA4 DAG with three dummy tasks: extract, transform, and load.


from airflow import DAGfrom airflow.operators.python
from datetime import datetime, timedelt
import PythonOperator
import datetime
from airflow.decorators import dag, task


@dag(schedule_interval='@daily', start_date=datetime.datetime(2021, 1, 1), catchup=False)
def ga4_dag():

    @task()
    def extract():
        print('extracting data')

    @task()
    def transform():
        print('transforming data')

    @task()
    def load():
        print('loading data')

    extract() >> transform() >> load()


ga4_dag = ga4_dag()

Note the >> between the tasks, which signifies that the tasks should run in order. If you want them to run in parallel, you can enclose them in a list, like [extract(), transform(), load()].

Once you access the Airflow web interface, you should be able to see your DAG displayed.

Now, if you click into it and go to the ‘Graph’ tab, you should see the diagram of your tasks:

To run the DAG, click on the play button. If everything goes well, you should observe the DAG completing successfully.

To inspect the logs of each task, you can click on the individual tasks within the DAG. This will allow you to access the full logs and examine any outputs or errors encountered during task execution.

That concludes the first part of this article series. Stay tuned for the next part, where we will dive deeper into how to structure your DAG, how to run backfill, and how to use dynamic task mapping to execute tasks for hundreds of accounts.