Skip to content

Commit 0af3d1f

Browse files
authored
Merge pull request #370 from pkjr002/bugfix/total
🐛 multi-threaded seems to cause idle/sleep
2 parents 03dfcaf + 88c2eae commit 0af3d1f

File tree

1 file changed

+28
-3
lines changed

1 file changed

+28
-3
lines changed

modules/facts/total/total_workflow.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import yaml
88
import xarray as xr
99
import dask.array as da
10+
import dask.diagnostics
11+
import warnings
1012

1113

1214
def TotalSamplesInDirectory(directory, pyear_start, pyear_end, pyear_step, chunksize):
@@ -108,7 +110,7 @@ def TotalSamples(infiles, outfile, targyears, chunksize):
108110
target_infiles,
109111
combine="nested",
110112
concat_dim="file",
111-
chunks={"locations":chunksize},
113+
chunks=None,
112114
)
113115
ds = ds.sel(years=targyears)
114116
# Sums everything across the new "file" dimension.
@@ -135,7 +137,30 @@ def TotalSamples(infiles, outfile, targyears, chunksize):
135137
# This actually carries out the delayed calculations and operations.
136138
# SBM: FYI Double check the numbers to ensure everything is summing across dims correctly.
137139
# SBM: FYI Also, check to see if output as something huge like float64.
138-
total_out.to_netcdf(outfile, encoding={"sea_level_change": {"dtype": "f4", "zlib": True, "complevel":4, "_FillValue": nc_missing_value}})
140+
#total_out.to_netcdf(outfile, encoding={"sea_level_change": {"dtype": "f4", "zlib": True, "complevel":4, "_FillValue": nc_missing_value}})
141+
142+
# New .to_netcdf run mechanic that allows dask the ability to control the chunking and reduces runtime, also places a progress bar in the task.out section.
143+
dask.config.set({"array.slicing.split_large_chunks": True})
144+
warnings.filterwarnings("ignore", category=FutureWarning)
145+
146+
write_job = total_out.to_netcdf(
147+
outfile,
148+
encoding={
149+
"sea_level_change": {
150+
"dtype": "f4",
151+
"zlib": True,
152+
"complevel": 4,
153+
"_FillValue": nc_missing_value,
154+
}
155+
},
156+
compute=False,
157+
)
158+
159+
with dask.config.set(scheduler="single-threaded"):
160+
with dask.diagnostics.ProgressBar():
161+
print(" >> Writing to File...")
162+
write_job.compute()
163+
139164

140165
return(outfile)
141166

@@ -167,4 +192,4 @@ def TotalSamples(infiles, outfile, targyears, chunksize):
167192
# Total up the workflow in the provided directory
168193
TotalSamplesInDirectory(args.directory, args.pyear_start, args.pyear_end, args.pyear_step, args.chunksize)
169194

170-
exit()
195+
exit()

0 commit comments

Comments
 (0)