Empowering Internal Teams: Automating Data Requests with Google Sheets and GCS

Intro

In the data world, we deal with complex pipelines and huge datasets, but one of the odd challenges is how product managers or other internal stakeholders request specific data subsets. Usually, they’ll just send a list of unique identifiers saying, “Can you run these (Unique Identifiers) for me?” It’s not efficient, but we can fix that. By using Google Sheets and Google Cloud Storage (GCS), we can automate the process.

With this setup, product managers can directly manage their requests by editing a shared Google Sheet. From there, we check for changes made on the Google Sheet that were made after the file in GCS was last updated. If updates are found, we send the new sheet to create or replace the GCS file. This way, in an event driven architecture we are only processing when new data comes in. This setup gives the product managers a seamless way of controlling when and which data gets processed, especially when new products are launched shortening the data feedback loop is critical.


What you’ll need before coding

  1. Google Sheets API Enabled in your GCP account

  2. A GCP Service account key with read and write access to your GCS Bucket

  3. “Viewer” access granted to the service account on your google sheet, this is added by clicking the “Share” button

  4. The Sheet ID of your Google Sheet, which is found in the url after the “/d/” and before the “/edit”


The code

The full repository can be found here: https://github.com/CuttrellW/google-sheets-to-gcs

import logging
import flask
import os
import gspread
import pandas as pd

from google.cloud import storage
from google.oauth2.service_account import Credentials
from datetime import datetime as dt

app = flask.Flask(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def get_credentials():
    # Get credentials from service account key
    scopes = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive',
              'https://www.googleapis.com/auth/devstorage.read_write']
    return Credentials.from_service_account_file(os.environ["GCP_SERVICE_ACCOUNT_KEY"], scopes=scopes)


class GoogleSheetsImport:
    def __init__(self, sheet_id: str, bucket_name: str, file_name: str):
        # Get Google credentials for bucket and sheet client
        credentials = get_credentials()
        storage_client = storage.Client(credentials=credentials)

        self.bucket_client = storage_client.bucket(bucket_name)
        self.sheet_client = gspread.authorize(credentials)
        self.file_name = file_name
        self.sheet_id = sheet_id

    def sheet_updated(self, spreadsheet: gspread.Spreadsheet) -> bool:
        # Get Sheet last modified time
        sheet_last_updated = dt.strptime(spreadsheet.lastUpdateTime, "%Y-%m-%dT%H:%M:%S.%fZ")
        # Get GCS file last modified time
        blob = self.bucket_client.blob(self.file_name)
        if not blob.exists():
            return True  # If file does not exist, return True to upload

        # Reload blob to get the updated timestamp
        blob.reload()
        blob_last_updated = blob.updated

        # Check Sheet was modified after file was last updated
        logging.info(f"Sheet modified: {sheet_last_updated} Blob modified: {blob_last_updated}")
        return sheet_last_updated > blob_last_updated if blob_last_updated else True

    def process_sheet(self):
        # Open Google Sheet
        logging.info(f"Processing {self.file_name}...")
        spreadsheet = self.sheet_client.open_by_key(self.sheet_id)
        if not self.sheet_updated(spreadsheet):
            logging.info(f"No updates found for {self.file_name}\n")
            return

        # Create DataFrame from Google Sheet
        df = pd.DataFrame(spreadsheet.sheet1.get_all_records())

        # Save df as file to temporary location
        temp_path = f"/tmp/{self.file_name}"
        if self.file_name.endswith('.csv'):
            df.to_csv(temp_path, index=False, header=True)
        elif self.file_name.endswith('.json'):
            df.to_json(temp_path, index=False)
        else:
            logging.error(f"Unsupported file format: {self.file_name}")
            return

        # Upload local file to GCS
        self.bucket_client.blob(self.file_name).upload_from_filename(temp_path)
        logging.info(f"Uploaded to GCS")


@app.route('/process_sheet', methods=['POST'])
def process_sheet():
    data = flask.request.json
    processor = GoogleSheetsImport(**data)
    processor.process_sheet()
    return "OK", 200

Deploying

In the example above, we have the code ready to be run as a flask app, with an open API endpoint for processing the sheet with some arguments (sheet_id, bucket_name, file_name) passed in as json. There are many other ways to implement this, including deploying as a cloud function if you don’t want to worry about hosting the app yourself. You could also have a YAML or JSON file containing a list of sheet elements to check updates on and trigger it on a schedule. Its totally up to you!

Next
Next

AWS Lambda and SQS: A Complicated Relationship