Skip to content

Commit 50eebf5

Browse files
committed
Cleans up data storage and organization
- Added new output log for SLURM job ID 17689 detailing the hybrid weather data collection process. - Updated the script to use a consolidated collection strategy, reducing fragmentation and improving efficiency. - Changed the expected completion time message to reflect the new approach. - Executed the new consolidated collection script instead of the previous hybrid collection script.
1 parent ef99a8d commit 50eebf5

8 files changed

+20353
-4
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ auth
1010

1111
# ignoring all output data files - only track input data and documentation
1212
data/output/
13+
data/backup/
1314
data/*.csv*
1415
data/*.gz
1516
!data/input/
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
#!/usr/bin/env Rscript
2+
3+
# collect_all_datasets_consolidated.R
4+
# THREE-FILE STRATEGY: Appends new data to consolidated files
5+
# 1. daily_station_historical.csv - All station daily data
6+
# 2. daily_municipal_extended.csv - All municipal data (forecasts + interpolations)
7+
# 3. hourly_station_ongoing.csv - All hourly data
8+
9+
rm(list=ls())
10+
library(tidyverse)
11+
library(lubridate)
12+
library(data.table)
13+
14+
# Configuration
15+
COLLECT_STATION_DATA = TRUE
16+
COLLECT_MUNICIPAL_DATA = TRUE
17+
COLLECT_HOURLY_DATA = TRUE
18+
19+
start_time = Sys.time()
20+
times = list()
21+
22+
cat("=======================================\n")
23+
cat("HYBRID WEATHER DATA COLLECTION SYSTEM\n")
24+
cat("THREE-FILE CONSOLIDATED STRATEGY\n")
25+
cat("=======================================\n")
26+
cat("Started at:", format(start_time), "\n\n")
27+
28+
# Function to safely append data without duplicates
29+
append_to_consolidated = function(new_data, output_file, key_cols) {
30+
if(nrow(new_data) == 0) {
31+
cat("No new data to append\n")
32+
return(NULL)
33+
}
34+
35+
# Load existing data if file exists
36+
if(file.exists(output_file)) {
37+
cat("Loading existing data from", basename(output_file), "...")
38+
existing_data = fread(output_file)
39+
cat(" (", nrow(existing_data), "rows)\n")
40+
41+
# Combine and remove duplicates
42+
combined_data = rbind(existing_data, new_data, fill = TRUE)
43+
original_rows = nrow(combined_data)
44+
45+
# Remove duplicates based on key columns
46+
if(all(key_cols %in% names(combined_data))) {
47+
combined_data = combined_data[!duplicated(combined_data[, ..key_cols]), ]
48+
}
49+
50+
duplicates_removed = original_rows - nrow(combined_data)
51+
new_rows_added = nrow(combined_data) - nrow(existing_data)
52+
53+
cat("Added", new_rows_added, "new rows (", duplicates_removed, "duplicates removed)\n")
54+
} else {
55+
cat("Creating new file", basename(output_file), "\n")
56+
combined_data = new_data
57+
new_rows_added = nrow(combined_data)
58+
}
59+
60+
# Sort data appropriately
61+
if("date" %in% names(combined_data)) {
62+
combined_data = combined_data[order(date)]
63+
} else if("fhora" %in% names(combined_data)) {
64+
combined_data = combined_data[order(fhora)]
65+
}
66+
67+
# Save updated file
68+
fwrite(combined_data, output_file)
69+
fwrite(combined_data, paste0(output_file, ".gz"))
70+
71+
cat("✅ Updated", basename(output_file), "- Total rows:", nrow(combined_data), "\n")
72+
return(combined_data)
73+
}
74+
75+
if(COLLECT_STATION_DATA) {
76+
cat("=== DATASET 1: DAILY STATION DATA ===\n")
77+
cat("Collecting daily means, minimums, and maximums by weather station\n")
78+
79+
dataset1_start = Sys.time()
80+
81+
tryCatch({
82+
# Collect new station data (temporary file)
83+
source("code/get_station_daily_hybrid.R")
84+
85+
# Load the new data from temporary file
86+
temp_files = list.files("data/output", pattern = "station_daily_data_.*\\.csv$", full.names = TRUE)
87+
if(length(temp_files) > 0) {
88+
latest_temp = temp_files[which.max(file.mtime(temp_files))]
89+
new_station_data = fread(latest_temp)
90+
91+
# Standardize column names
92+
if("indicativo" %in% names(new_station_data)) {
93+
new_station_data$idema = new_station_data$indicativo
94+
}
95+
if("fecha" %in% names(new_station_data)) {
96+
new_station_data$date = as.Date(new_station_data$fecha)
97+
}
98+
99+
# Append to consolidated file
100+
append_to_consolidated(new_station_data, "data/output/daily_station_historical.csv", c("idema", "date"))
101+
102+
# Clean up temporary file
103+
file.remove(latest_temp)
104+
cat("Cleaned up temporary file:", basename(latest_temp), "\n")
105+
}
106+
107+
dataset1_end = Sys.time()
108+
times$station_daily = as.numeric(difftime(dataset1_end, dataset1_start, units = "mins"))
109+
cat("✅ Dataset 1 completed in", round(times$station_daily, 2), "minutes\n\n")
110+
}, error = function(e) {
111+
cat("❌ Dataset 1 failed:", e$message, "\n\n")
112+
times$station_daily = NA
113+
})
114+
}
115+
116+
if(COLLECT_MUNICIPAL_DATA) {
117+
cat("=== DATASET 2: MUNICIPAL FORECASTS ===\n")
118+
cat("Collecting municipal data with 7-day forecasts using climaemet\n")
119+
120+
dataset2_start = Sys.time()
121+
122+
tryCatch({
123+
# Collect new municipal data (temporary file)
124+
source("code/get_forecast_data_hybrid.R")
125+
126+
# Load the new data from temporary file
127+
temp_files = list.files("data/output", pattern = "municipal_forecasts_.*\\.csv$", full.names = TRUE)
128+
if(length(temp_files) > 0) {
129+
latest_temp = temp_files[which.max(file.mtime(temp_files))]
130+
new_municipal_data = fread(latest_temp)
131+
132+
# Standardize date column
133+
if("fecha" %in% names(new_municipal_data)) {
134+
new_municipal_data$date = as.Date(new_municipal_data$fecha)
135+
}
136+
137+
# Append to consolidated file
138+
append_to_consolidated(new_municipal_data, "data/output/daily_municipal_extended.csv", c("id", "fecha"))
139+
140+
# Clean up temporary file
141+
file.remove(latest_temp)
142+
cat("Cleaned up temporary file:", basename(latest_temp), "\n")
143+
}
144+
145+
dataset2_end = Sys.time()
146+
times$municipal_forecasts = as.numeric(difftime(dataset2_end, dataset2_start, units = "mins"))
147+
cat("✅ Dataset 2 completed in", round(times$municipal_forecasts, 2), "minutes\n\n")
148+
}, error = function(e) {
149+
cat("❌ Dataset 2 failed:", e$message, "\n\n")
150+
times$municipal_forecasts = NA
151+
})
152+
}
153+
154+
if(COLLECT_HOURLY_DATA) {
155+
cat("=== DATASET 3: HOURLY DATA ===\n")
156+
cat("Collecting hourly data for building history\n")
157+
158+
dataset3_start = Sys.time()
159+
160+
tryCatch({
161+
# Collect new hourly data (temporary file)
162+
source("code/get_latest_data.R")
163+
164+
# Load the new data from temporary file
165+
temp_files = list.files("data/output", pattern = "latest_weather_.*\\.csv$", full.names = TRUE)
166+
if(length(temp_files) > 0) {
167+
latest_temp = temp_files[which.max(file.mtime(temp_files))]
168+
new_hourly_data = fread(latest_temp)
169+
170+
# Append to consolidated file
171+
append_to_consolidated(new_hourly_data, "data/output/hourly_station_ongoing.csv", c("idema", "fhora"))
172+
173+
# Clean up temporary file
174+
file.remove(latest_temp)
175+
cat("Cleaned up temporary file:", basename(latest_temp), "\n")
176+
}
177+
178+
dataset3_end = Sys.time()
179+
times$hourly_data = as.numeric(difftime(dataset3_end, dataset3_start, units = "mins"))
180+
cat("✅ Dataset 3 completed in", round(times$hourly_data, 2), "minutes\n\n")
181+
}, error = function(e) {
182+
cat("❌ Dataset 3 failed:", e$message, "\n\n")
183+
times$hourly_data = NA
184+
})
185+
}
186+
187+
# === POST-COLLECTION GAP ANALYSIS AND MONITORING ===
188+
cat("=== POST-COLLECTION ANALYSIS ===\n")
189+
190+
# Run gap analysis
191+
tryCatch({
192+
source("code/check_data_gaps.R")
193+
}, error = function(e) {
194+
cat("⚠️ Gap analysis failed:", e$message, "\n")
195+
})
196+
197+
# Update README with current status
198+
tryCatch({
199+
source("code/update_readme_with_summary.R")
200+
}, error = function(e) {
201+
cat("⚠️ README update failed:", e$message, "\n")
202+
})
203+
204+
# Final summary
205+
end_time = Sys.time()
206+
total_time = as.numeric(difftime(end_time, start_time, units = "mins"))
207+
208+
cat("========================================\n")
209+
cat("COLLECTION SUMMARY\n")
210+
cat("========================================\n")
211+
cat("Total execution time:", round(total_time, 2), "minutes\n\n")
212+
213+
cat("Individual dataset times:\n")
214+
if(!is.null(times$station_daily) && !is.na(times$station_daily)) {
215+
cat(" Dataset 1 (Station Daily):", round(times$station_daily, 2), "minutes\n")
216+
}
217+
if(!is.null(times$municipal_forecasts) && !is.na(times$municipal_forecasts)) {
218+
cat(" Dataset 2 (Municipal Forecasts):", round(times$municipal_forecasts, 2), "minutes\n")
219+
}
220+
if(!is.null(times$hourly_data) && !is.na(times$hourly_data)) {
221+
cat(" Dataset 3 (Hourly Data):", round(times$hourly_data, 2), "minutes\n")
222+
}
223+
224+
cat("\nFinal consolidated datasets:\n")
225+
final_files = c(
226+
"data/output/daily_station_historical.csv",
227+
"data/output/daily_municipal_extended.csv",
228+
"data/output/hourly_station_ongoing.csv"
229+
)
230+
231+
for(file in final_files) {
232+
if(file.exists(file)) {
233+
file_size = round(file.size(file) / 1024 / 1024, 2)
234+
rows = nrow(fread(file, nrows = 0))
235+
if(file.exists(file)) {
236+
# Get row count more efficiently
237+
wc_output = system(paste("wc -l", file), intern = TRUE)
238+
row_count = as.numeric(strsplit(wc_output, " ")[[1]][1]) - 1 # subtract header
239+
cat("", basename(file), "(", file_size, "MB,", format(row_count, big.mark=","), "rows )\n")
240+
}
241+
} else {
242+
cat("", basename(file), "(not found)\n")
243+
}
244+
}
245+
246+
cat("\nCompleted at:", format(Sys.time()), "\n")
247+
cat("Three-file strategy: All data consolidated, no duplicates, no fragmentation\n")

0 commit comments

Comments
 (0)