Automation
Jun 21, 2023

Automating GA4 Cost Data Import - Part 2

Discover how to automate the process of importing advertising cost data into GA4 by building an end-to-end data pipeline

Automating GA4 Cost Data Import - Part 2

Continuing our series on automating GA4 cost data import using SFTP, this article follows up on Part 1, which you can revisit here if you haven't yet done so: Automatically import cost data into GA4 Part 1.

Our two-part guide aims to show you how to connect to the Ads platform API, download ad data, and then upload your CSV file to the SFTP server for Google Analytics 4 to ingest. We've divided this guide into five main sections:

  1. Download and format the Ads data from the Facebook Marketing API
  2. Upload the structured CSV file to the SFTP server
  3. Set up a daily job using Cloud Scheduler
  4. Deploy the pipeline to Cloud Functions
  5. Test the functionality of the pipeline

In Part 1, we've already covered the initial setup of the SFTP server on GCP and how to authenticate GA4 to access this server. Now, it's time for us to tackle building the data pipeline.

Download and format the Ads data from the Facebook Marketing API

In this article, we're going to be working with the Facebook Marketing API, but you can use the same process with other platforms too. If you already have a data warehouse where you keep your ads data, it's a good idea to connect directly to it instead of accessing the Ads API.

Now, let's move on to the real work and start interacting with the Facebook Ads API. The first thing we need to do is generate an access token. This will let us call the Facebook Marketing API. So, let's get started:

Step 1: Go to the Facebook for Developers website and click on the 'Create App' button. Next, choose 'Manage Business Integrations' and fill in the necessary details like the App name and contact information.

Step 2: After the app is created, head over to your business manager settings and click on 'Apps'. Next, click 'Add' and link your newly created app to the business manager.
Next, select ‘Add Assets’ and choose the ad accounts for which you would like to pull reports.

Step 3: After creating the app, navigate to the Marketing API > Tools section and select all three token permissions. Then, click ‘Get Token’ and copy the token that is generated. Be sure to save your token, as you will need it in the following steps.

Step 4: Now that we have the access token, it's time to write a Python function to interact with the Facebook Marketing API. But before we get to that, ensure that the facebook_business pip package is installed on your system. If not, you can install it using the following terminal command:


pip install facebook_business

With the package installed, we can now proceed to write the Python function that will call the Facebook Marketing API and retrieve the necessary data:


import pandas as pd
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookAdsApi
import pandas as pd

def get_fb_ads_data(access_token, account_id):
    # Initiate Facebook Ads API
    FacebookAdsApi.init(access_token=access_token, api_version='v17.0')
    # Connect to Facebook Ads Account
    my_account = AdAccount('act_'+account_id)
    # Define the fields of data we want to retrieve
    fields = [
        'campaign_name',
        'campaign_id',
        'spend',
        'impressions',
        'clicks'
    ]

    # Define the parameters of the data we want to retrieve
    params = {
        'level': 'campaign',
        'date_preset': 'yesterday',

    }

    # Retrieve data from Facebook Ads API
    data = my_account.get_insights(fields=fields,
                                   params=params)

    # Define an empty list to store the results
    results = []
    # Loop through the data retrieved from Facebook Ads API
    for item in data:
        # Convert data into dictionary
        data = dict(item)
        # Append each item to the results list
        results.append(data)

    # Convert the results list into a Pandas DataFrame
    df = pd.DataFrame(results)

    # Return the results as a Pandas DataFrame
    return df

What we're doing here involves importing the necessary modules, such as the facebook_business module we installed earlier. We'll also be utilizing Pandas to assist in formatting and exporting the results into a CSV file. You'll notice that we've set the data range to 'yesterday' since our pipeline will be set to execute daily. However, feel free to modify this setting if you need to run a backfill, for instance.
So, if you execute the function by supplying it with the access token we generated earlier, along with the ad account id, you should be able to obtain your daily split campaigns as follows:

Step 4: Next, we need to format our data to align with the GA4 schema we outlined in Part 1 of this article series. The GA4 schema comprises of the following fields:

  • source
  • medium
  • campaign_id
  • campaign_name
  • date
  • impressions
  • clicks
  • cost

As you can see, most of these columns have already been structured in our Facebook Dataframe. We just need to rename the 'spend' and 'date' columns. Additionally, we need to include two new columns for 'source' and 'medium', based on the UTMs you employ. Let's proceed to construct a straightforward Python function to format this data accordingly:


def format_ads_data(df):

    # rename columns
    df.rename(columns={'spend': 'cost',
                       'date_start': 'date'
                       }, inplace=True)

    # drop date_stop column
    df.drop(columns=['date_stop'], inplace=True)

    # add source column
    df['source'] = 'facebook'

    # add medium column
    df['medium'] = 'cpc'

    return df

The function we created accepts the dataframe we generated earlier and returns it in the desired format.
With that, we have completed this section. Let's move on to the next part, where we'll explore how to connect to our SFTP server and deposit the formatted CSV file for GA4 to ingest.

Upload the structured CSV file to the SFTP server

Now we're going to craft a Python function that exports the dataframe as a CSV file and then uploads it to our SFTP server. We'll employ the 'paramiko' library for this task. If you haven't installed it yet, you can do so using pip. Here's the Python function:


def upload_dataframe_to_sftp(df, filename, host, port, username, password):
    # Convert dataframe to csv
    df.to_csv(filename, index=False)

    # Create an SSH client
    ssh = paramiko.SSHClient()
    
    # Automatically add server key (needed if you're connecting to a server for the first time)
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    # Authenticate with the server
    ssh.connect(host, port, username, password)

    # Create an SFTP client
    sftp = ssh.open_sftp()
    # Upload the file to the server
    sftp.put(filename, f'/home/{username}/{filename}')

    # Close the connections
    sftp.close()
    ssh.close()

    print(f'{filename} successfully uploaded to {host}.')

The function requires 6 parameters:

  • df: This is the dataframe we extracted from the Facebook Ads API.
  • filename: This is the name of the CSV file we're uploading to the SFTP server. In this case, we'll call it 'upload.csv', which is in the directory we provided to GA4 in Part 1.
  • host: This is the IP address of your SFTP server that we set up in Part 1.
  • port: This should be set to 22.
  • username: This is the username we created in Part 1, which in our case is 'ga4-sftp'.
  • password: This is the password you created when setting up the server in Part 1.

After preparing these parameters, try executing the function. If it runs successfully, you're ready to proceed to the next step: deployment.

Set up a daily job using Cloud Scheduler

Deployment will be a two-step process. The first step is to set up a job on Cloud Scheduler to trigger the Cloud Function to run daily at 6 AM. The second step involves creating the actual Cloud Function. We'll start with setting up the Cloud Scheduler job, as we'll need this when creating the Cloud Function. The process is quite straightforward; just follow the steps below:

Step 1: Navigate to Cloud Scheduler then click ‘Create Job

Step 2: Give your job a name, then set the frequency according to your requirements. In our case, we want our pipeline to run at 6 AM every day, so the cron job format for this would be 0 6 * * *. You can use a cron job generator website to help format this. Also, ensure that you select the correct timezone relevant to your location.

Step 3: Set the target type of your job to be 'Cloud Pub/Sub'. Next, create a new topic by giving it a name, such as 'ga4-cost-import'. As for the message body, you can leave it empty by just providing an empty map like this: {}.

Step 4: Lastly, click 'Create' to finalize the process. Now, we're ready to move on to the next step, which is creating our Cloud Function.

Deploy the pipeline to Cloud Functions

Now that we've set up the Cloud Scheduler job, it's time to deploy our pipeline to the Cloud Function.

Step 1: Navigate to Cloud Functions then click Create Function

Step 2: Set the environment to '1st gen' and then give your function a suitable name.

Step 3: Choose 'Cloud Pub/Sub' as the trigger type, and then select the Pub/Sub topic we created earlier.

Step 4: Allocate the memory for your Cloud Function. We recommend setting it to 1 GB, which should be ample for this task. If necessary, this value can be adjusted later. As for the timeout, set it to 180 seconds, which should be more than enough time for our job to complete. After setting these parameters, click 'Next'.

Step 5: In the “Source code” section, select “Inline editor” as the source code option and select ‘Python 3.11’ as the runtime version:

Step 6: It's now time to upload our code to the Cloud Function. I've compiled all the code we've been through for you to simply copy and paste. Just ensure to replace the placeholders with your specific details. While it's recommended to use environment variables rather than hardcoding sensitive details, for the sake of simplicity in this tutorial, we're going to stick with hardcoded values.


import pandas as pd
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookAdsApi
import pandas as pd
import paramiko


def main(event, context):
    
    host = "myhost"
    username = "ga4-sftp"
    password = "my_password"
    filename="upload.csv"
    port = 22
    access_token = 'my_access_token'
    account_id = '123456789'

    # Get Facebook Ads data
    fb_df = get_fb_ads_data(access_token, account_id)

    # check if the dataframe isn't empty
    if not fb_df.empty:
        # Format Facebook Ads data
        fb_df = format_ads_data(fb_df)
        # Upload Facebook Ads data to SFTP
        upload_dataframe_to_sftp(fb_df, filename, host, port, username, password)
    else:
        print('No Facebook Ads data found.')


def get_fb_ads_data(access_token, account_id):
    # Initiate Facebook Ads API
    FacebookAdsApi.init(access_token=access_token, api_version='v17.0')
    # Connect to Facebook Ads Account
    my_account = AdAccount('act_'+account_id)
    # Define the fields of data we want to retrieve
    fields = [
        'campaign_name',
        'campaign_id',
        'spend',
        'impressions',
        'clicks'
    ]

    # Define the parameters of the data we want to retrieve
    params = {
        'level': 'campaign',
        'date_preset': 'yesterday',

    }

    # Retrieve data from Facebook Ads API
    data = my_account.get_insights(fields=fields,
                                   params=params)

    # Define an empty list to store the results
    results = []
    # Loop through the data retrieved from Facebook Ads API
    for item in data:
        # Convert data into dictionary
        data = dict(item)
        # Append each item to the results list
        results.append(data)

    # Convert the results list into a Pandas DataFrame
    df = pd.DataFrame(results)

    # Return the results as a Pandas DataFrame
    return df

def format_ads_data(df):

    # rename columns
    df.rename(columns={'spend': 'cost',
                       'date_start': 'date'
                       }, inplace=True)

    # drop date_stop column
    df.drop(columns=['date_stop'], inplace=True)

    # add source column
    df['source'] = 'facebook'

    # add medium column
    df['medium'] = 'cpc'

    return df

def upload_dataframe_to_sftp(df, filename, host, port, username, password):
    
    # Convert dataframe to csv
    df.to_csv('/tmp/'+filename, index=False)

    # Create an SSH client
    ssh = paramiko.SSHClient()
    
    # Automatically add server key (needed if you're connecting to a server for the first time)
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    # Authenticate with the server
    ssh.connect(host, port, username, password)

    # Create an SFTP client
    sftp = ssh.open_sftp()
    # Upload the file to the server
    sftp.put('/tmp/'+filename, f'/ftproot/{username}/{filename}')

    # Close the connections
    sftp.close()
    ssh.close()

    print(f'{filename} successfully uploaded to {host}.')
    

Please note that I added a '/tmp' directory to our CSV file. The reason for this is that Cloud Functions doesn't permit writing directly to its system. Instead, you need to write and read the file from a temporary directory, which is cleared once the instance is shut down. It's an important consideration when working with Cloud Functions.

Step 7: Modify the entry point function to 'main', which is the main function of our script

Step 8: Update the requirements.txt file with the pip libraries we've used in our script. In our case, you need to add paramiko, facebook_business, pandas

Step 9: Lastly, click 'Create' to finalize the process.

Test the functionality of the pipeline

Now let's try to manually trigger our Cloud Function. To do this, go to Cloud Scheduler and then click on 'Force run'.

Now, go back to your Cloud Function to check the logs. This will allow you to confirm whether the function was executed successfully. If everything has gone according to plan, you should see the successful print message that we incorporated into our script, and the function should display a 'success' status.

Now, let's navigate back to the Google Analytics interface and click on 'Import Now' to start a test run.

If everything goes smoothly, you will receive a success message indicating that the operation was successful.

And that brings us to the end of our article. If you have any questions or need further clarification on any point, feel free to reach out. Your feedback and queries are always welcome.