Thành viên nhóm:
- Phạm Thị Anh Thư - 23643081
- Trần Nhật Tiến - 23673681
One of the main obstacles of Data Engineering is the large and varied technical skills that can be required on a day-to-day basis.
*** Note - If you email a link to your GitHub repo with all the completed exercises, I will send you back a free copy of my ebook Introduction to Data Engineering. ***
This aim of this repository is to help you develop and learn those skills. Generally, here are the high level topics that these practice problems will cover.
- Python data processing.
- csv, flat-file, parquet, json, etc.
- SQL database table design.
- Python + Postgres, data ingestion and retrieval.
- PySpark
- Data cleansing / dirty data.
You will need two things to work effectively on most all of these problems.
Docker
docker-compose
All the tools and technologies you need will be packaged
into the dockerfile
for each exercise.
For each exercise you will need to cd
into that folder and
run the docker build
command, that command will be listed in
the README
for each exercise, follow those instructions.
The first exercise tests your ability to download a number of files
from an HTTP
source and unzip them, storing them locally with Python
.
cd Exercises/Exercise-1
and see README
in that location for instructions.
Tiến hành chạy lệnh cd data-engineering-practice/Exercises/Exercise-1 để thay đổi đường dẫn thư mục Exercise-1
Tiếp tục thực hiện lệnh: docker build --tag=exercise-1 . để build Docker image Quá trình sẽ mất vài phút
import requests
import os
import zipfile
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
from pathlib import Path
import unittest
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
download_uris = [
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2018_Q4.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q1.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q2.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q3.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q4.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2020_Q1.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2220_Q1.zip",
]
DOWNLOAD_DIR = "downloads"
def create_download_directory():
"""Create downloads directory if it doesn't exist."""
Path(DOWNLOAD_DIR).mkdir(exist_ok=True)
def get_filename_from_uri(uri):
"""Extract filename from URI."""
return uri.split('/')[-1]
def extract_zip(zip_path):
"""Extract CSV from zip file and remove the zip."""
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(DOWNLOAD_DIR)
os.remove(zip_path)
logger.info(f"Extracted and removed {zip_path}")
except zipfile.BadZipFile:
logger.error(f"Invalid zip file: {zip_path}")
os.remove(zip_path)
def download_file(uri):
"""Download a single file synchronously."""
filename = get_filename_from_uri(uri)
file_path = os.path.join(DOWNLOAD_DIR, filename)
try:
response = requests.get(uri, stream=True)
if response.status_code == 200:
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Downloaded {filename}")
extract_zip(file_path)
else:
logger.error(f"Failed to download {uri}: Status {response.status_code}")
except requests.RequestException as e:
logger.error(f"Error downloading {uri}: {str(e)}")
async def download_file_async(session, uri):
"""Download a single file asynchronously."""
filename = get_filename_from_uri(uri)
file_path = os.path.join(DOWNLOAD_DIR, filename)
try:
async with session.get(uri) as response:
if response.status == 200:
with open(file_path, 'wb') as f:
f.write(await response.read())
logger.info(f"Downloaded {filename} (async)")
extract_zip(file_path)
else:
logger.error(f"Failed to download {uri}: Status {response.status}")
except aiohttp.ClientError as e:
logger.error(f"Error downloading {uri}: {str(e)}")
async def download_files_async(uris):
"""Download files asynchronously using aiohttp."""
async with aiohttp.ClientSession() as session:
tasks = [download_file_async(session, uri) for uri in uris]
await asyncio.gather(*tasks)
def download_files_threaded(uris):
"""Download files using ThreadPoolExecutor."""
with ThreadPoolExecutor(max_workers=4) as executor:
executor.map(download_file, uris)
def main():
"""Main function to execute the download process."""
create_download_directory()
# Synchronous download
logger.info("Starting synchronous downloads")
for uri in download_uris:
download_file(uri)
# Threaded download
logger.info("Starting threaded downloads")
download_files_threaded(download_uris)
# Async download
logger.info("Starting async downloads")
asyncio.run(download_files_async(download_uris))
class TestDownloadFunctions(unittest.TestCase):
def setUp(self):
create_download_directory()
def test_get_filename_from_uri(self):
uri = "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2018_Q4.zip"
self.assertEqual(get_filename_from_uri(uri), "Divvy_Trips_2018_Q4.zip")
def test_create_download_directory(self):
self.assertTrue(os.path.exists(DOWNLOAD_DIR))
def test_invalid_uri(self):
uri = "https://invalid-url/does_not_exist.zip"
download_file(uri)
filename = get_filename_from_uri(uri)
self.assertFalse(os.path.exists(os.path.join(DOWNLOAD_DIR, filename)))
if __name__ == "__main__":
main()
Đoạn code trên thực hiện các tác vụ:
-
Tạo thư mục downloads nếu chưa tồn tại
-
Tải từng file từ danh sách download_uris
-
Giữ tên gốc của file từ URL
-
Giải nén .zip thành .csv
-
Xóa file .zip sau khi giải nén
-
Bỏ qua URL không hợp lệ (ví dụ: cái Divvy_Trips_2220_Q1.zip không tồn tại)
The second exercise
tests your ability perform web scraping, build uris, download files, and use Pandas to
do some simple cumulative actions.
cd Exercises/Exercise-2
and see README
in that location for instructions.
Sau khi build xong, truy cập file main.py
bằng VS code
The third exercise tests a few skills.
This time we will be using a popular aws
package called boto3
to try to perform a multi-step
actions to download some open source s3
data files.
cd Exercises/Exercise-3
and see README
in that location for instructions.
Thay đổi đường dẫn thư mục tại CMD thành Exercise-3
Chạy lệnh docker build --tag=exercise-3 .
để build image Docker (Quá trình diễn ra trong 2 – 3 phút)
Sau khi build xong, truy cập file main.py
bằng VS code
The fourth exercise
focuses more file types json
and csv
, and working with them in Python
.
You will have to traverse a ragged directory structure, finding any json
files
and converting them to csv
.
The fifth exercise
is going to be a little different than the rest. In this problem you will be given a number of
csv
files. You must create a data model / schema to hold these data sets, including indexes,
then create all the tables inside Postgres
by connecting to the database with Python
.
The sixth exercise
Is going to step it up a little and move onto more popular tools. In this exercise we are going
to load some files using PySpark
and then be asked to do some basic aggregation.
Best of luck!
The seventh exercise
Taking a page out of the previous exercise, this one is focus on using a few of the
more common build in PySpark functions pyspark.sql.functions
and applying their
usage to real-life problems.
Many times to solve simple problems we have to find and use multiple functions available
from libraries. This will test your ability to do that.
The eighth exercise Using new tools is imperative to growing as a Data Engineer. DuckDB is one of those new tools. In this exercise you will have to complete a number of analytical and transformation tasks using DuckDB. This will require an understanding of the functions and documenation of DuckDB.
The ninth exercise Polars is a new Rust based tool with a wonderful Python package that has taken Data Engineering by storm. It's better than Pandas because it has both SQL Context and supports Lazy evalutation for larger than memory data sets! Show your Lazy skills!
The tenth exercise This exercise is to help you learn Data Quality, specifically a tool called Great Expectations. You will be given an existing datasets in CSV format, as well as an existing pipeline. There is a data quality issue and you will be asked to implement some Data Quality checks to catch some of these issues.