Skip to content

ptathuw231/data-engineering-practice

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

68 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Thành viên nhóm:

  • Phạm Thị Anh Thư - 23643081
  • Trần Nhật Tiến - 23673681

Data Engineering Practice Problems

One of the main obstacles of Data Engineering is the large and varied technical skills that can be required on a day-to-day basis.

*** Note - If you email a link to your GitHub repo with all the completed exercises, I will send you back a free copy of my ebook Introduction to Data Engineering. ***

This aim of this repository is to help you develop and learn those skills. Generally, here are the high level topics that these practice problems will cover.

  • Python data processing.
  • csv, flat-file, parquet, json, etc.
  • SQL database table design.
  • Python + Postgres, data ingestion and retrieval.
  • PySpark
  • Data cleansing / dirty data.

How to work on the problems.

You will need two things to work effectively on most all of these problems.

  • Docker
  • docker-compose

All the tools and technologies you need will be packaged into the dockerfile for each exercise.

For each exercise you will need to cd into that folder and run the docker build command, that command will be listed in the README for each exercise, follow those instructions.

Lab 8

image image image image image

Beginner Exercises

Exercise 1 - Downloading files.

The first exercise tests your ability to download a number of files from an HTTP source and unzip them, storing them locally with Python. cd Exercises/Exercise-1 and see README in that location for instructions.

Tiến hành chạy lệnh cd data-engineering-practice/Exercises/Exercise-1 để thay đổi đường dẫn thư mục Exercise-1 Tiếp tục thực hiện lệnh: docker build --tag=exercise-1 . để build Docker image Quá trình sẽ mất vài phút image image image image image image image

Code sử dụng cho main.py
import requests
import os
import zipfile
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
from pathlib import Path
import unittest

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

download_uris = [
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2018_Q4.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q1.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q2.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q3.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q4.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2020_Q1.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2220_Q1.zip",
]

DOWNLOAD_DIR = "downloads"

def create_download_directory():
    """Create downloads directory if it doesn't exist."""
    Path(DOWNLOAD_DIR).mkdir(exist_ok=True)

def get_filename_from_uri(uri):
    """Extract filename from URI."""
    return uri.split('/')[-1]

def extract_zip(zip_path):
    """Extract CSV from zip file and remove the zip."""
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(DOWNLOAD_DIR)
        os.remove(zip_path)
        logger.info(f"Extracted and removed {zip_path}")
    except zipfile.BadZipFile:
        logger.error(f"Invalid zip file: {zip_path}")
        os.remove(zip_path)

def download_file(uri):
    """Download a single file synchronously."""
    filename = get_filename_from_uri(uri)
    file_path = os.path.join(DOWNLOAD_DIR, filename)
    
    try:
        response = requests.get(uri, stream=True)
        if response.status_code == 200:
            with open(file_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            logger.info(f"Downloaded {filename}")
            extract_zip(file_path)
        else:
            logger.error(f"Failed to download {uri}: Status {response.status_code}")
    except requests.RequestException as e:
        logger.error(f"Error downloading {uri}: {str(e)}")

async def download_file_async(session, uri):
    """Download a single file asynchronously."""
    filename = get_filename_from_uri(uri)
    file_path = os.path.join(DOWNLOAD_DIR, filename)
    
    try:
        async with session.get(uri) as response:
            if response.status == 200:
                with open(file_path, 'wb') as f:
                    f.write(await response.read())
                logger.info(f"Downloaded {filename} (async)")
                extract_zip(file_path)
            else:
                logger.error(f"Failed to download {uri}: Status {response.status}")
    except aiohttp.ClientError as e:
        logger.error(f"Error downloading {uri}: {str(e)}")

async def download_files_async(uris):
    """Download files asynchronously using aiohttp."""
    async with aiohttp.ClientSession() as session:
        tasks = [download_file_async(session, uri) for uri in uris]
        await asyncio.gather(*tasks)

def download_files_threaded(uris):
    """Download files using ThreadPoolExecutor."""
    with ThreadPoolExecutor(max_workers=4) as executor:
        executor.map(download_file, uris)

def main():
    """Main function to execute the download process."""
    create_download_directory()
    
    # Synchronous download
    logger.info("Starting synchronous downloads")
    for uri in download_uris:
        download_file(uri)
    
    # Threaded download
    logger.info("Starting threaded downloads")
    download_files_threaded(download_uris)
    
    # Async download
    logger.info("Starting async downloads")
    asyncio.run(download_files_async(download_uris))

class TestDownloadFunctions(unittest.TestCase):
    def setUp(self):
        create_download_directory()
    
    def test_get_filename_from_uri(self):
        uri = "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2018_Q4.zip"
        self.assertEqual(get_filename_from_uri(uri), "Divvy_Trips_2018_Q4.zip")
    
    def test_create_download_directory(self):
        self.assertTrue(os.path.exists(DOWNLOAD_DIR))
    
    def test_invalid_uri(self):
        uri = "https://invalid-url/does_not_exist.zip"
        download_file(uri)
        filename = get_filename_from_uri(uri)
        self.assertFalse(os.path.exists(os.path.join(DOWNLOAD_DIR, filename)))

if __name__ == "__main__":
    main()

Đoạn code trên thực hiện các tác vụ:

  • Tạo thư mục downloads nếu chưa tồn tại

  • Tải từng file từ danh sách download_uris

  • Giữ tên gốc của file từ URL

  • Giải nén .zip thành .csv

  • Xóa file .zip sau khi giải nén

  • Bỏ qua URL không hợp lệ (ví dụ: cái Divvy_Trips_2220_Q1.zip không tồn tại)

image image image

Exercise 2 - Web Scraping + Downloading + Pandas

The second exercise tests your ability perform web scraping, build uris, download files, and use Pandas to do some simple cumulative actions. cd Exercises/Exercise-2 and see README in that location for instructions.

image image image

Sau khi build xong, truy cập file main.py bằng VS code

image image

Exercise 3 - Boto3 AWS + s3 + Python.

The third exercise tests a few skills. This time we will be using a popular aws package called boto3 to try to perform a multi-step actions to download some open source s3 data files. cd Exercises/Exercise-3 and see README in that location for instructions.

Thay đổi đường dẫn thư mục tại CMD thành Exercise-3 Chạy lệnh docker build --tag=exercise-3 . để build image Docker (Quá trình diễn ra trong 2 – 3 phút)

image image

Sau khi build xong, truy cập file main.py bằng VS code image

Exercise 4 - Convert JSON to CSV + Ragged Directories.

The fourth exercise focuses more file types json and csv, and working with them in Python. You will have to traverse a ragged directory structure, finding any json files and converting them to csv. image image image image image image image image image image

Exercise 5 - Data Modeling for Postgres + Python.

The fifth exercise is going to be a little different than the rest. In this problem you will be given a number of csv files. You must create a data model / schema to hold these data sets, including indexes, then create all the tables inside Postgres by connecting to the database with Python. image image image image image

Intermediate Exercises

Exercise 6 - Ingestion and Aggregation with PySpark.

The sixth exercise Is going to step it up a little and move onto more popular tools. In this exercise we are going to load some files using PySpark and then be asked to do some basic aggregation. Best of luck! image image image image

Exercise 7 - Using Various PySpark Functions

The seventh exercise Taking a page out of the previous exercise, this one is focus on using a few of the more common build in PySpark functions pyspark.sql.functions and applying their usage to real-life problems.

Many times to solve simple problems we have to find and use multiple functions available from libraries. This will test your ability to do that. image image image image image image image

Exercise 8 - Using DuckDB for Analytics and Transforms.

The eighth exercise Using new tools is imperative to growing as a Data Engineer. DuckDB is one of those new tools. In this exercise you will have to complete a number of analytical and transformation tasks using DuckDB. This will require an understanding of the functions and documenation of DuckDB.

Exercise 9 - Using Polars lazy computation.

The ninth exercise Polars is a new Rust based tool with a wonderful Python package that has taken Data Engineering by storm. It's better than Pandas because it has both SQL Context and supports Lazy evalutation for larger than memory data sets! Show your Lazy skills!

Advanced Exercises

Exercise 10 - Data Quality with Great Expectations

The tenth exercise This exercise is to help you learn Data Quality, specifically a tool called Great Expectations. You will be given an existing datasets in CSV format, as well as an existing pipeline. There is a data quality issue and you will be asked to implement some Data Quality checks to catch some of these issues.

About

Data Engineering Practice Problems

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 84.6%
  • Dockerfile 15.4%