Skip to content

Commit b103d33

Browse files
feat(module 3): add type enforcement to web_to_gcs.py + .env var + add progress bar, and no reupload try if present
- add type enforcement to web_to_gcs.py (like in ingest.py in module 1) - add .env var handling to avoid manual EXPORT - create a new version of web_to_gcs with progress bar, no dl or reupload if already done (in case internet connection or anything fails) - fix types not enforced to parquet columns, leading to do it in BQ later when creating materialized tables
1 parent 435fd7b commit b103d33

File tree

6 files changed

+273
-20
lines changed

6 files changed

+273
-20
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
GCP_GCS_BUCKET="your_bucket_name"
2+
GOOGLE_APPLICATION_CREDENTIALS=Path/to/key/GCP_service_account_key.json
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*.env
2+
*.parquet
3+
*.csv*

03-data-warehouse/extras/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
Quick hack to load files directly to GCS, without Airflow. Downloads csv files from https://nyc-tlc.s3.amazonaws.com/trip+data/ and uploads them to your Cloud Storage Account as parquet files.
22

3-
1. Install pre-reqs (more info in `web_to_gcs.py` script)
4-
2. Run: `python web_to_gcs.py`
3+
1. Install pre-reqs with `uv sync`
4+
2. Run: `uv run python web_to_gcs_with_progress_bar.py`
5+
2. or Run: `uv run python web_to_gcs.py` for less verbose (if you have fast internet connection in upload)
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[project]
2+
name = "extras"
3+
version = "0.1.0"
4+
description = "Add your description here"
5+
readme = "README.md"
6+
requires-python = ">=3.14"
7+
dependencies = [
8+
"google-cloud-storage>=3.8.0",
9+
"pandas>=3.0.0",
10+
"pyarrow>=23.0.0",
11+
"python-dotenv>=1.2.1",
12+
"requests>=2.32.5",
13+
"tqdm>=4.67.1",
14+
]
Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
1-
import io
21
import os
32
import requests
43
import pandas as pd
54
from google.cloud import storage
5+
from dotenv import load_dotenv
6+
67

78
"""
89
Pre-reqs:
9-
1. `pip install pandas pyarrow google-cloud-storage`
10-
2. Set GOOGLE_APPLICATION_CREDENTIALS to your project/service-account key
11-
3. Set GCP_GCS_BUCKET as your bucket or change default value of BUCKET
10+
1. run `uv sync` from this 'extra' folder (create venv and install dependencies from pyproject.toml)
11+
2. rename .env-example to .env (not commited thanks to .gitignore)
12+
3. in .env,
13+
- set GCP_GCS_BUCKET as your bucket or change default value of BUCKET
14+
- Set GOOGLE_APPLICATION_CREDENTIALS to your project/service-account json key
15+
(or don't set it if you use google ADC)
1216
"""
17+
# load env vars from .env
18+
load_dotenv()
1319

1420
# services = ['fhv','green','yellow']
15-
init_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/'
16-
# switch out the bucketname
21+
init_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/"
22+
# if not done in .env, switch out the default bucketname
1723
BUCKET = os.environ.get("GCP_GCS_BUCKET", "dtc-data-lake-bucketname")
1824

1925

@@ -34,9 +40,8 @@ def upload_to_gcs(bucket, object_name, local_file):
3440

3541
def web_to_gcs(year, service):
3642
for i in range(12):
37-
3843
# sets the month part of the file_name string
39-
month = '0'+str(i+1)
44+
month = "0" + str(i + 1)
4045
month = month[-2:]
4146

4247
# csv file_name
@@ -45,22 +50,53 @@ def web_to_gcs(year, service):
4550
# download it using requests via a pandas df
4651
request_url = f"{init_url}{service}/{file_name}"
4752
r = requests.get(request_url)
48-
open(file_name, 'wb').write(r.content)
53+
open(file_name, "wb").write(r.content)
4954
print(f"Local: {file_name}")
5055

5156
# read it back into a parquet file
52-
df = pd.read_csv(file_name, compression='gzip')
53-
file_name = file_name.replace('.csv.gz', '.parquet')
54-
df.to_parquet(file_name, engine='pyarrow')
57+
# enforce types so parquet columns will directly have good types
58+
# (as we did in module 1 in ingest.py script)
59+
dtypes = {
60+
"VendorID": "Int64",
61+
"RatecodeID": "Int64",
62+
"PULocationID": "Int64",
63+
"DOLocationID": "Int64",
64+
"passenger_count": "Int64",
65+
"payment_type": "Int64",
66+
"trip_type": "Int64", # only in green but ignored if missing column
67+
"store_and_fwd_flag": "string",
68+
"trip_distance": "float64",
69+
"fare_amount": "float64",
70+
"extra": "float64",
71+
"mta_tax": "float64",
72+
"tip_amount": "float64",
73+
"tolls_amount": "float64",
74+
"ehailfee": "float64", # only in green but ignored if missing column
75+
"improvement_surcharge": "float64",
76+
"total_amount": "float64",
77+
"congestion_surcharge": "float64",
78+
}
79+
80+
if service == "yellow":
81+
parse_dates = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
82+
else:
83+
parse_dates = ["lpep_pickup_datetime", "lpep_dropoff_datetime"]
84+
85+
df = pd.read_csv(
86+
file_name, dtype=dtypes, parse_dates=parse_dates, compression="gzip"
87+
)
88+
file_name = file_name.replace(".csv.gz", ".parquet")
89+
df.to_parquet(file_name, engine="pyarrow")
5590
print(f"Parquet: {file_name}")
5691

57-
# upload it to gcs
92+
# upload it to gcs
5893
upload_to_gcs(BUCKET, f"{service}/{file_name}", file_name)
5994
print(f"GCS: {service}/{file_name}")
6095

6196

62-
web_to_gcs('2019', 'green')
63-
web_to_gcs('2020', 'green')
64-
# web_to_gcs('2019', 'yellow')
65-
# web_to_gcs('2020', 'yellow')
66-
97+
web_to_gcs("2019", "green")
98+
web_to_gcs("2020", "green")
99+
web_to_gcs("2021", "green") # fail when reach 08 (normal, file not in github :)
100+
# web_to_gcs("2019", "yellow")
101+
# web_to_gcs("2020", "yellow")
102+
# web_to_gcs("2021", "yellow") # fail when reach 08 (normal, file not in github :)
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import os
2+
import requests
3+
import pandas as pd
4+
from google.cloud import storage
5+
from dotenv import load_dotenv
6+
from tqdm import tqdm
7+
import gzip
8+
import pyarrow as pa
9+
import pyarrow.parquet as pq
10+
11+
12+
"""
13+
Pre-reqs:
14+
1. run `uv sync` from this 'extra' folder (create venv and install dependencies from pyproject.toml)
15+
2. rename .env-example to .env (not commited thanks to .gitignore)
16+
3. in .env,
17+
- set GCP_GCS_BUCKET as your bucket or change default value of BUCKET
18+
- Set GOOGLE_APPLICATION_CREDENTIALS to your project/service-account json key
19+
(or don't set it if you use google ADC)
20+
"""
21+
# load env vars from .env
22+
load_dotenv()
23+
24+
# services = ['fhv','green','yellow']
25+
init_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/"
26+
# if not done in .env, switch out the default bucketname
27+
BUCKET = os.environ.get("GCP_GCS_BUCKET", "dtc-data-lake-bucketname")
28+
29+
30+
def download_with_progress(url: str, local_path: str, desc: str = "Downloading"):
31+
with requests.get(url, stream=True) as r:
32+
r.raise_for_status()
33+
total = int(r.headers.get("content-length", 0))
34+
# Configure tqdm for bytes
35+
with (
36+
open(local_path, "wb") as f,
37+
tqdm(
38+
total=total,
39+
unit="B",
40+
unit_scale=True,
41+
unit_divisor=1024,
42+
desc=desc,
43+
) as bar,
44+
):
45+
for chunk in r.iter_content(chunk_size=1024 * 1024): # 1 MB
46+
if not chunk:
47+
continue
48+
size = f.write(chunk)
49+
bar.update(size)
50+
51+
52+
def csv_to_parquet_with_progress(
53+
csv_path: str, parquet_path: str, service_color: str, chunksize: int = 100_000
54+
):
55+
# 1) Count rows (gzip-aware)
56+
with gzip.open(csv_path, mode="rt") as f:
57+
total_rows = sum(1 for _ in f) - 1 # minus header
58+
if total_rows <= 0:
59+
raise ValueError("CSV appears to be empty")
60+
61+
# 2) Read in chunks with fixed dtypes so parquet columns will directly have good types
62+
# (as we did in module 1 in ingest.py script)
63+
dtypes = {
64+
"VendorID": "Int64",
65+
"RatecodeID": "Int64",
66+
"PULocationID": "Int64",
67+
"DOLocationID": "Int64",
68+
"passenger_count": "Int64",
69+
"payment_type": "Int64",
70+
"trip_type": "Int64", # only in green but ignored if missing column
71+
"store_and_fwd_flag": "string",
72+
"trip_distance": "float64",
73+
"fare_amount": "float64",
74+
"extra": "float64",
75+
"mta_tax": "float64",
76+
"tip_amount": "float64",
77+
"tolls_amount": "float64",
78+
"ehailfee": "float64", # only in green but ignored if missing column
79+
"improvement_surcharge": "float64",
80+
"total_amount": "float64",
81+
"congestion_surcharge": "float64",
82+
}
83+
84+
if service_color == "yellow":
85+
parse_dates = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
86+
else:
87+
parse_dates = ["lpep_pickup_datetime", "lpep_dropoff_datetime"]
88+
89+
reader = pd.read_csv(
90+
csv_path,
91+
dtype=dtypes,
92+
parse_dates=parse_dates,
93+
compression="gzip",
94+
chunksize=chunksize,
95+
low_memory=False,
96+
)
97+
98+
writer = None
99+
100+
with tqdm(total=total_rows, unit="rows", desc=f"Parquet {csv_path}") as bar:
101+
for chunk in reader:
102+
table = pa.Table.from_pandas(chunk)
103+
if writer is None:
104+
writer = pq.ParquetWriter(parquet_path, table.schema)
105+
else:
106+
# Optional safety: align to first schema
107+
table = table.cast(writer.schema)
108+
writer.write_table(table)
109+
bar.update(len(chunk))
110+
111+
if writer is not None:
112+
writer.close()
113+
114+
115+
def upload_to_gcs_with_progress(bucket: str, object_name: str, local_file: str):
116+
# # WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.
117+
# # (Ref: https://github.com/googleapis/python-storage/issues/74)
118+
# Optional: tune chunk size (must be multiple of 256 KiB)
119+
storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MB
120+
storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024 # 5 MB
121+
122+
client = storage.Client()
123+
bucket_obj = client.bucket(bucket)
124+
blob = bucket_obj.blob(object_name)
125+
126+
if blob.exists(client):
127+
print(f"Skipping upload, already in GCS: gs://{bucket}/{object_name}")
128+
return
129+
130+
file_size = os.path.getsize(local_file)
131+
132+
with open(local_file, "rb") as f:
133+
with tqdm.wrapattr(
134+
f,
135+
"read",
136+
total=file_size,
137+
miniters=1,
138+
unit="B",
139+
unit_scale=True,
140+
unit_divisor=1024,
141+
desc=f"Uploading {os.path.basename(local_file)}",
142+
) as wrapped_file:
143+
blob.upload_from_file(
144+
wrapped_file,
145+
size=file_size, # important so the library knows total bytes
146+
)
147+
148+
print(f"Uploaded to GCS: gs://{bucket}/{object_name}")
149+
150+
151+
def web_to_gcs(year, service):
152+
client = storage.Client()
153+
bucket_obj = client.bucket(BUCKET)
154+
155+
for i in tqdm(range(12), desc=f"{service} {year}", unit="month"):
156+
month = f"{i + 1:02d}"
157+
158+
csv_file_name = f"{service}_tripdata_{year}-{month}.csv.gz"
159+
parquet_file_name = csv_file_name.replace(".csv.gz", ".parquet")
160+
object_name = f"{service}/{parquet_file_name}"
161+
162+
# 1) Check if parquet already in GCS
163+
blob = bucket_obj.blob(object_name)
164+
if blob.exists(client):
165+
print(f"Already in GCS, skipping: gs://{BUCKET}/{object_name}")
166+
continue
167+
168+
# 2) Check if CSV already downloaded locally
169+
if os.path.exists(csv_file_name):
170+
print(f"CSV already exists locally, skipping download: {csv_file_name}")
171+
else:
172+
request_url = f"{init_url}{service}/{csv_file_name}"
173+
download_with_progress(
174+
request_url, csv_file_name, desc=f"Downloading {csv_file_name}"
175+
)
176+
177+
# 3) Check if Parquet already exists locally
178+
if os.path.exists(parquet_file_name):
179+
print(
180+
f"Parquet already exists locally, skipping conversion: {parquet_file_name}"
181+
)
182+
else:
183+
csv_to_parquet_with_progress(csv_file_name, parquet_file_name, service)
184+
print(f"Parquet: {parquet_file_name}")
185+
186+
# 4) Upload with per-byte progress bar
187+
upload_to_gcs_with_progress(BUCKET, object_name, parquet_file_name)
188+
189+
190+
web_to_gcs("2019", "green")
191+
web_to_gcs("2020", "green")
192+
web_to_gcs(
193+
"2021", "green"
194+
) # will fail when reaching 08 (normal, file does not exists in github :)
195+
# web_to_gcs("2019", "yellow")
196+
# web_to_gcs("2020", "yellow")
197+
# web_to_gcs("2021", "yellow") # will fail when reaching 08 (normal, file does not exists in github :)

0 commit comments

Comments
 (0)