Skip to content

Commit b35de7b

Browse files
committed
Process one year at a time
1 parent 66b3514 commit b35de7b

File tree

1 file changed

+23
-37
lines changed

1 file changed

+23
-37
lines changed

downscaled_climate_data/assets/county_measures.py

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,16 @@
1818
os.environ['CONDOR_BIN_DIR'] = "/u/bengal1/.conda/envs/downscaled_climate_data/bin"
1919
gateway = HTCGateway(address="https://dask.software-dev.ncsa.illinois.edu",
2020
proxy_address=8786,
21-
auth = BasicAuth(
22-
username=None,
21+
auth=BasicAuth(
22+
username=None,
2323
password=os.environ['DASK_GATEWAY_PASSWORD'])
24-
)
25-
cluster = gateway.new_cluster(image="bengal1/pangeo-ncsa:dev",
24+
)
25+
cluster = gateway.new_cluster(image="bengal1/pangeo-ncsa:dev",
2626
container_image="/u/bengal1/condor/pangeo.sif")
2727
cluster.scale(200)
2828
client = cluster.get_client()
2929
print(cluster.dashboard_link)
3030

31-
32-
3331
fs = s3fs.S3FileSystem(
3432
endpoint_url=os.environ['S3_ENDPOINT_URL'],
3533
key=os.environ['AWS_ACCESS_KEY_ID'],
@@ -43,44 +41,32 @@
4341

4442
try:
4543
start_time = time.time()
44+
for year in range(1990, 2025):
45+
year_start = time.time()
46+
print(f"Processing year {year}")
47+
era5_processing_start = time.time()
48+
era5 = era5_processing({'2m_temperature',
49+
'total_precipitation',
50+
"sfcWind",
51+
"vapor_pressure",
52+
"surface_pressure"},
53+
year, year, 'analysis_ready', chunks=1000)
54+
df = era5.to_dask_dataframe()
55+
era5_gdf = dgpd.from_dask_dataframe(
56+
df,
57+
geometry=dgpd.points_from_xy(df, 'lon', 'lat')) \
58+
.drop(columns=['lat', 'lon'])
59+
era5_gdf.to_parquet(f's3://ees240146/analysis/era5/year={year}/era5.parquet',
60+
filesystem=fs,
61+
engine='pyarrow')
62+
print(f"Year {year} processing took {time.time() - year_start:.2f} seconds")
4663

47-
era5_processing_start = time.time()
48-
era5 = era5_processing({'2m_temperature',
49-
'total_precipitation',
50-
"sfcWind",
51-
"vapor_pressure",
52-
"surface_pressure"},
53-
1990, 2025, 'analysis_ready', chunks=500).persist()
54-
wait(era5)
55-
print(f"era processing {time.time() - era5_processing_start:.2f} seconds")
56-
print(era5)
57-
58-
to_tabular_start = time.time()
59-
df = era5.to_dask_dataframe()
60-
del era5
61-
df = df.repartition(partition_size='200MB').persist() # Target 200MB per partition
62-
wait(df)
63-
print(df)
64-
print(f"to tabular {time.time() - to_tabular_start:.2f} seconds")
65-
66-
67-
era5_gdf = dgpd.from_dask_dataframe(
68-
df,
69-
geometry=dgpd.points_from_xy(df, 'lon', 'lat')) \
70-
.drop(columns=['lat', 'lon'])
71-
72-
era5_gdf.to_parquet('s3://ees240146/analysis/era5.parquet',
73-
filesystem=fs,
74-
write_metadata_file=True,
75-
schema="infer")
76-
print(f"To Tabular {time.time() - to_tabular_start:.2f} seconds")
7764
print(f"TOTAL TIME {time.time() - start_time:.2f} seconds")
7865

7966
info = client.scheduler_info()
8067
num_workers = len(info['workers'])
8168
print(f"Number of workers: {num_workers}")
8269

8370

84-
8571
finally:
8672
cluster.close()

0 commit comments

Comments
 (0)