Introduction
Prefect is an incredibly powerful Python-based data pipeline orchestration tool! Prefect can work with many different integrations, one particular one is Google Cloud Platform (GCP). In this blogpost, I will give a brief walkthrough of a ELT data pipeline that I built with Prefect and GCP, Sky-Pipe. Additionally, I will walk through the dev-ops portion of the data pipeline.
*Note: If you are not familiar with Prefect, I recommend reviewing my previous article on the basics of Prefect located here.
Let’s dive in!
What is Sky-Pipe?
Sky-Pipe is a Prefect Dataflow Pipeline that integrates Google Cloud Platform and dbt. Specifically, the Sky-Pipe fetches daily USD exchange price data from CoinMarketCap.com and loads this data into Google Cloud Platform, and transforms it with dbt. This pipeline also utilizes GitHub Actions for dev ops.
Technologies Used
Sky-Pipe utilizes Python 3.10 for the foundation of everything. Specifically, it uses Poetry as a package-manager and uses the Requests, Pandas, Prefect, and Prefect-GCP external libraries.
For orchestration, Sky-Pipe uses Prefect. Learn more about the basics of Prefect here.
Sky-Pipe utilizes Docker Images as the deployment foundation for everything; therefore, it uses Artifact Registry for Docker Image registry storage and Cloud Run Jobs for the execution of the Docker Images, as well as Compute Engine (for Agent execution).
For data storage and transformations Sky-Pipe uses a combination of dbt and BigQuery.
Fetching Data and Uploading to Google Cloud Storage
The “extract” step of the ELT pipeline consists of a Cloud Run Job fetching data from CoinMarketCap.com API. Specifically, it fetches recent crypto USD exchange prices for 8,000+ crypto currencies and tokens.
This step converts this fetched data into a Pandas DataFrame, which is then loaded into a local Parquet file. This Parquet file is then uploaded to a Google Cloud Storage bucket.
Landing Parquet File Data into BigQuery from Google Cloud Storage
Next is the “land” step of the ELT pipeline, which consists of a Google Cloud BigQuery Load Job. This Load Job is triggered from the Prefect-GCP Python library and lands/appends the raw data to a landing table in BigQuery.
Transforming Data with BigQuery and dbt
The final step, “transform”, consists of a dbt job running BigQuery SQL queries and tests on the landed data. This results in a final, core table that holds the historical crypto USD exchange prices.
Deployment of Prefect Agent and Prefect Flow(s)
GitHub Actions is responsible for the deployment of the Prefect Agent and the Prefect Flow(s).
The Prefect Agent runs on GCP Compute Engine. The workflow/actions used for this process perform the following:
- Build Docker Image for Prefect Agent
- Push Docker Image to Artifact Registry
- Delete Prefect Agent Compute Engine Instance, if it exists
- Create Prefect Compute Engine Instance from Artifact Registry Docker Image
The Prefect Flow(s) are executed through Google Cloud Run Jobs and the workflow/actions for this process perform the following:
- Build Docker Image for Prefect Flows
- Push Docker Image to Artifact Registry
- Create various Memory/CPU Blocks within Prefect Cloud
- Search for all flows within src/sky-pipe/flows folder
- Deploy any and all flows to Prefect Cloud workspace
Conclusion
This was a brief walkthrough of the Sky-Pipe ELT data pipeline that I built, which utilizes Google Cloud Platform, Prefect, and dbt. This pipeline specify fetches crypto USD exchange prices, writes the data to a Parquet file, lands that file to Google Cloud Storage, loads that Parquet file to BigQuery, and runs dbt transformations within BigQuery.
I hope you enjoyed this, and learned something new/are motivated to give Prefect-GCP a try!
Happy Coding!
*GitHub Project Repository is located here