Skip to content

Commit 81798c3

Browse files
authored
PR for capstone project-komal azram (#5)
* komal's capstone project * refactored code * updated readme
1 parent 5764f79 commit 81798c3

27 files changed

+1113
-0
lines changed

komalazram/.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[flake8]
2+
max-line-length = 150
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
name: ETL CI/CD
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
pull_request:
8+
9+
jobs:
10+
ci:
11+
name: CI - Lint, Terraform Check, and Plan
12+
runs-on: ubuntu-latest
13+
environment: production
14+
15+
steps:
16+
- uses: actions/checkout@v3
17+
18+
- name: Set up Terraform
19+
uses: hashicorp/setup-terraform@v2
20+
with:
21+
terraform_version: 1.4.6
22+
terraform_wrapper: false
23+
24+
- name: Authenticate to Google Cloud
25+
uses: google-github-actions/auth@v1
26+
with:
27+
credentials_json: ${{ secrets.GCP_SA_KEY }}
28+
29+
- name: Set Terraform Environment Variables
30+
run: |
31+
echo "TF_VAR_project_id=${{ secrets.PROJECT_ID }}" >> $GITHUB_ENV
32+
echo "TF_VAR_bucket_names=${{ secrets.BUCKET_NAME }}" >> $GITHUB_ENV
33+
echo "TF_VAR_bucket_location=${{ secrets.BUCKET_LOCATION }}" >> $GITHUB_ENV
34+
echo "TF_VAR_citibike_composer_name=${{ secrets.citibike_composer_name }}" >> $GITHUB_ENV
35+
echo "TF_VAR_composer_region=${{ secrets.COMPOSER_REGION }}" >> $GITHUB_ENV
36+
echo "TF_VAR_composer_service_account=${{ secrets.COMPOSER_SERVICE_ACCOUNT }}" >> $GITHUB_ENV
37+
38+
- name: Terraform Format Check
39+
run: |
40+
cd terraform
41+
terraform fmt -check -recursive
42+
43+
- name: Terraform Init
44+
run: |
45+
cd terraform
46+
terraform init
47+
48+
- name: Terraform Validate
49+
run: |
50+
cd terraform
51+
terraform validate
52+
53+
- name: Terraform Plan
54+
run: |
55+
cd terraform
56+
terraform plan -input=false
57+
58+
- name: Install Poetry
59+
uses: snok/install-poetry@v1
60+
61+
- name: Install Dependencies
62+
run: poetry install
63+
64+
- name: Python Lint
65+
run: poetry run flake8 .
66+
67+
cd:
68+
name: CD - Deploy Infra and DAGs
69+
needs: ci
70+
if: github.ref == 'refs/heads/master'
71+
runs-on: ubuntu-latest
72+
environment: production
73+
74+
steps:
75+
- uses: actions/checkout@v3
76+
77+
- name: Set up Terraform
78+
uses: hashicorp/setup-terraform@v2
79+
with:
80+
terraform_version: 1.4.6
81+
terraform_wrapper: false
82+
83+
- name: Authenticate to Google Cloud
84+
uses: google-github-actions/auth@v1
85+
with:
86+
credentials_json: ${{ secrets.GCP_SA_KEY }}
87+
88+
- name: Set Terraform Environment Variables
89+
run: |
90+
echo "TF_VAR_project_id=${{ secrets.PROJECT_ID }}" >> $GITHUB_ENV
91+
echo "TF_VAR_bucket_names=${{ secrets.BUCKET_NAME }}" >> $GITHUB_ENV
92+
echo "TF_VAR_bucket_location=${{ secrets.BUCKET_LOCATION }}" >> $GITHUB_ENV
93+
echo "TF_VAR_citibike_composer_name=${{ secrets.citibike_composer_name }}" >> $GITHUB_ENV
94+
echo "TF_VAR_composer_region=${{ secrets.COMPOSER_REGION }}" >> $GITHUB_ENV
95+
echo "TF_VAR_composer_service_account=${{ secrets.COMPOSER_SERVICE_ACCOUNT }}" >> $GITHUB_ENV
96+
97+
- name: Terraform Init
98+
run: |
99+
cd terraform
100+
terraform init
101+
102+
- name: Terraform Apply
103+
run: |
104+
cd terraform
105+
terraform apply -auto-approve
106+
107+
- name: Get Composer Bucket Name
108+
run: |
109+
cd terraform
110+
COMPOSER_BUCKET=$(terraform output -raw composer_bucket_name 2>/dev/null || echo "")
111+
echo "COMPOSER_BUCKET=$COMPOSER_BUCKET" >> $GITHUB_ENV
112+
113+
- name: Deploy DAGs to Composer
114+
run: |
115+
gcloud storage cp --recursive dags gs://$COMPOSER_BUCKET/
116+
117+
- name: Deploy Plugins to Composer
118+
run: |
119+
gcloud storage cp --recursive plugins gs://$COMPOSER_BUCKET/
120+
121+
- name: Deploy Data to Composer
122+
run: |
123+
gcloud storage cp --recursive data gs://$COMPOSER_BUCKET/

komalazram/.gitignore

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Local .terraform directories
2+
.terraform/
3+
4+
# .tfstate files
5+
*.tfstate
6+
*.tfstate.*
7+
8+
# Crash log files
9+
crash.log
10+
crash.*.log
11+
12+
# Exclude all .tfvars files, which are likely to contain sensitive data, such as
13+
# password, private keys, and other secrets. These should not be part of version
14+
# control as they are data points which are potentially sensitive and subject
15+
# to change depending on the environment.
16+
#*.tfvars
17+
#*.tfvars.json
18+
19+
# Ignore override files as they are usually used to override resources locally and so
20+
# are not checked in
21+
override.tf
22+
override.tf.json
23+
*_override.tf
24+
*_override.tf.json
25+
26+
# Ignore transient lock info files created by terraform apply
27+
.terraform.tfstate.lock.info
28+
29+
# Include override files you do wish to add to version control using negated pattern
30+
# !example_override.tf
31+
32+
# Include tfplan files to ignore the plan output of command: terraform plan -out=tfplan
33+
# example: *tfplan*
34+
35+
# Ignore CLI configuration files
36+
.terraformrc
37+
terraform.rc
38+
terraform.tfvars

komalazram/README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Citi Bike ETL Workflow for Analytics-Ready Data
2+
3+
This project builds an ETL pipeline that consumes Citi Bike’s open API data and stores it in a well-structured gold-level table. The cleaned and unified data is designed to support future analytics and decision-making processes, such as:
4+
5+
- Identifying stations that are frequently full or empty.
6+
- Improving resource allocation for bike availability.
7+
- Analyzing station-level usage patterns.
8+
9+
10+
---
11+
12+
## Architecture
13+
14+
![Architecture Diagram](architecture.png)
15+
16+
The pipeline takes a weekly snapshot and provides the updated data. It follows **Bronze → Silver → Gold** data architecture pattern:
17+
18+
### Bronze Layer
19+
- Extracts and loads raw Citi Bike data from the API.
20+
- Stores raw JSON files in Google Cloud Storage (GCS).
21+
- Performs JSON schema validation.
22+
23+
### Silver Layer
24+
- Cleans and flattens raw JSON data using Python, handling timestamp formats, missing values, and data type conversions.
25+
- Writes cleaned data to a GCS Silver bucket and loads it into a BigQuery staging table.
26+
- Uses a MERGE operation to update the master BigQuery table with only new or changed records from the staging table.
27+
28+
### Gold Layer
29+
- Aggregates and curates data for reporting and dashboarding.
30+
31+
---
32+
33+
## Technologies Used
34+
35+
| Tool | Purpose |
36+
|------------------|----------------------------------------|
37+
| Python | Data extraction, validation, transformation |
38+
| Google Cloud Storage | Stores raw and intermediate data |
39+
| BigQuery | Data warehousing and analytics |
40+
| Airflow/Composer | Orchestrates the data pipeline |
41+
| Terraform | Infrastructure as Code (IaC) |
42+
| GitHub Actions | CI/CD for deployment workflows |
43+
44+
---
45+
46+
## Folder Structure
47+
48+
```bash
49+
.
50+
├── dags/ # Airflow DAGs
51+
├── plugins/ # Python scripts for ETL
52+
├── sql/ # Schema definition files for BigQuery tables
53+
├── terraform/ # Infrastructure configuration
54+
├── data/ # SQL for Silver (staging) and Gold (aggregation) layers
55+
├── README.md # Project documentation
56+
57+
```
58+
59+
## Setup Instructions
60+
1. Clone the repository
61+
2. Add your GCP Service Account JSON key as a secret in your GitHub repository ( GCP_SA_KEY)
62+
3. Install dependencies using Poetry
63+
```
64+
poetry install
65+
```
66+
4. Modify variable values in the terraform files to match your GCP project, region, and desired bucket names
67+
5. Update environment secrets in github
68+
6. Pass bucket names and other runtime parameters to the DAGs
69+
7. Push the code to github and it will trigger a CI/CD workflow on master branch
70+

komalazram/architecture.png

114 KB
Loading

komalazram/dags/datalake_dags.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from datetime import datetime
2+
from airflow import DAG
3+
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
4+
GCSToBigQueryOperator,
5+
)
6+
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
7+
from airflow.operators.python import PythonOperator
8+
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
9+
10+
11+
from citi_bike_scrapper_bronze import fetch_and_upload
12+
from data_transformation_silver import raw_transformation
13+
14+
15+
default_args = {
16+
"owner": "airflow",
17+
"depends_on_past": False,
18+
"start_date": datetime(2024, 1, 1),
19+
"retries": 2,
20+
}
21+
22+
with DAG(
23+
dag_id="citi_bike_bronze_ingestion",
24+
default_args=default_args,
25+
schedule_interval="0 10 * * 1", # Every Monday 10:00 AM
26+
catchup=False,
27+
template_searchpath="/home/airflow/gcs/data/",
28+
tags=["citi_bike"],
29+
params={"bronze_bucket": "bronze113", "silver_bucket": "silver113"},
30+
) as dag:
31+
32+
ingest_task = PythonOperator(
33+
task_id="ingest_to_bronze",
34+
python_callable=fetch_and_upload,
35+
op_kwargs={"bucket_name": "{{ params.bronze_bucket }}"},
36+
)
37+
38+
transform_task = PythonOperator(
39+
task_id="transform_to_silver",
40+
python_callable=raw_transformation,
41+
op_kwargs={"bucket_name": "{{ params.silver_bucket }}"},
42+
)
43+
44+
load_parquet_to_staging = GCSToBigQueryOperator(
45+
task_id="load_parquet_to_staging",
46+
bucket="silver113",
47+
source_objects=["citi-bike/*.parquet"],
48+
destination_project_dataset_table="citi-bike-459310.lake_silver._staging_master_bike_station_status",
49+
source_format="PARQUET",
50+
write_disposition="WRITE_APPEND",
51+
create_disposition="CREATE_NEVER",
52+
autodetect=True,
53+
ignore_unknown_values=True,
54+
project_id="citi-bike-459310",
55+
)
56+
57+
clear_silver_task = GCSDeleteObjectsOperator(
58+
task_id="clear_silver_folder",
59+
bucket_name="silver113",
60+
prefix="citi-bike/",
61+
)
62+
merge_staging_to_master = BigQueryInsertJobOperator(
63+
task_id="merge_staging_to_master",
64+
configuration={
65+
"query": {
66+
"query": "{% include 'merge_staging_to_master.sql' %}",
67+
"useLegacySql": False,
68+
}
69+
},
70+
)
71+
72+
populate_gold_table = BigQueryInsertJobOperator(
73+
task_id="populate_gold_station_utilization_weekly",
74+
configuration={
75+
"query": {
76+
"query": "{% include 'populate_gold_station_utilization_weekly.sql' %}",
77+
"useLegacySql": False,
78+
}
79+
},
80+
)
81+
82+
(
83+
ingest_task
84+
>> transform_task
85+
>> load_parquet_to_staging
86+
>> clear_silver_task
87+
>> merge_staging_to_master
88+
>> populate_gold_table
89+
)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE TABLE IF NOT EXISTS lake_gold.gold_station_utilization_weekly (
2+
station_id STRING,
3+
network_id STRING,
4+
week_start DATE,
5+
is_frequently_full BOOL,
6+
is_frequently_empty BOOL,
7+
avg_free_bikes FLOAT64,
8+
avg_empty_slots FLOAT64
9+
);
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
CREATE TABLE IF NOT EXISTS `citi-bike-459310.lake_silver._staging_master_bike_station_status` (
2+
network_id STRING,
3+
network_name STRING,
4+
station_id STRING,
5+
latitude FLOAT64,
6+
longitude FLOAT64,
7+
timestamp TIMESTAMP,
8+
free_bikes INT64,
9+
empty_slots INT64,
10+
extra_uid STRING,
11+
renting BOOL,
12+
returning BOOL,
13+
has_ebikes BOOL,
14+
ebikes INT64,
15+
snapshot_time TIMESTAMP
16+
)
17+
PARTITION BY DATE(timestamp)
18+
CLUSTER BY station_id;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
CREATE TABLE IF NOT EXISTS `citi-bike-459310.lake_silver.master_bike_station_status` (
2+
network_id STRING,
3+
network_name STRING,
4+
station_id STRING,
5+
latitude FLOAT64,
6+
longitude FLOAT64,
7+
timestamp TIMESTAMP,
8+
free_bikes INT64,
9+
empty_slots INT64,
10+
extra_uid STRING,
11+
renting BOOL,
12+
returning BOOL,
13+
has_ebikes BOOL,
14+
ebikes INT64
15+
)
16+
PARTITION BY DATE(timestamp)
17+
CLUSTER BY station_id;

0 commit comments

Comments
 (0)