@@ -33,25 +33,22 @@ read_data <- function(
3333 facility_active_proportion
3434) {
3535 rlang :: arg_match(disease )
36-
3736 check_file_exists(data_path )
3837
3938 con <- DBI :: dbConnect(duckdb :: duckdb())
40- on.exit(expr = DBI :: dbDisconnect(con ))
39+ on.exit(DBI :: dbDisconnect(con ), add = TRUE )
4140
42- # Get the schema of the data file, and check if `any_visits_this_day` is
43- # present. If it is, then it is an API v2 file, otherwise it is an API v1
44- # file. We use this to determine the query we need to run.
4541 is_api_v2 <- rlang :: try_fetch(
46- DBI :: dbGetQuery(
47- con ,
48- " SELECT * FROM read_parquet(?) LIMIT 0;" ,
49- params = list (data_path )
50- ) | >
51- names() | >
52- # Does it contain `any_visits_this_day`?
53- stringr :: str_detect(" any_visits_this_day" ) | >
54- any(),
42+ {
43+ cols <- DBI :: dbGetQuery(
44+ con ,
45+ " SELECT * FROM read_parquet(?) LIMIT 0;" ,
46+ params = list (data_path )
47+ ) | >
48+ names()
49+
50+ " any_visits_this_day" %in% cols
51+ },
5552 error = function (con ) {
5653 cli :: cli_abort(
5754 c(
@@ -63,172 +60,130 @@ read_data <- function(
6360 }
6461 )
6562
66- parameters <- list (
63+ is_us <- identical(geo_value , " US" )
64+
65+ cli :: cli_inform(
66+ c(
67+ " Using {if (is_api_v2) 'API v2 (facility filtered)' else 'API v1'}" ,
68+ " query for {.val {geo_value}}"
69+ )
70+ )
71+
72+ disease_param <- if (disease == " COVID-19" ) paste0(disease , " %" ) else disease
73+
74+ base_params <- list (
6775 data_path = data_path ,
68- # If disease is COVID-19, we want to match both COVID-19 and
69- # COVID-19/Omicron when filtering, so we add the % wildcard here
70- disease = ifelse(disease == " COVID-19" , paste0(disease , " %" ), disease ),
76+ disease = disease_param ,
7177 min_ref_date = stringify_date(min_reference_date ),
7278 max_ref_date = stringify_date(max_reference_date ),
7379 report_date = stringify_date(report_date )
7480 )
7581
76- # We need different queries for the states and the US overall. For US overall
77- # we need to aggregate over all the facilities in all the states. For the
78- # states, we need to aggregate over all the facilities in that one state
79- if (geo_value == " US" && ! is_api_v2 ) {
80- query <- "
81- SELECT
82- report_date,
83- reference_date,
84- CASE
85- WHEN disease = 'COVID-19/Omicron' THEN 'COVID-19'
86- ELSE disease
87- END AS disease,
88- -- We want to inject the 'US' as our abbrevation here bc data is not agg'd
89- 'US' AS geo_value,
90- sum(value) AS confirm
91- FROM read_parquet(?)
92- WHERE 1=1
93- AND disease LIKE ?
94- AND metric = 'count_ed_visits'
95- AND reference_date >= ? :: DATE
96- AND reference_date <= ? :: DATE
97- AND report_date = ? :: DATE
98- GROUP BY reference_date, report_date, disease
99- ORDER BY reference_date
100- "
101- } else if (geo_value != " US" && ! is_api_v2 ) {
102- # We want just one state so aggregate over facilites in that one state only
103- query <- "
104- SELECT
105- report_date,
106- reference_date,
107- CASE
108- WHEN disease = 'COVID-19/Omicron' THEN 'COVID-19'
109- ELSE disease
110- END AS disease,
111- geo_value AS geo_value,
112- sum(value) AS confirm,
113- FROM read_parquet(?)
114- WHERE 1=1
115- AND disease LIKE ?
116- AND metric = 'count_ed_visits'
117- AND reference_date >= ? :: DATE
118- AND reference_date <= ? :: DATE
119- AND report_date = ? :: DATE
120- AND geo_value = ?
121- GROUP BY geo_value, reference_date, report_date, disease
122- ORDER BY reference_date
123- "
124- # Append `geo_value` to the query
125- parameters <- c(parameters , list (geo_value = geo_value ))
126- } else if (geo_value == " US" && is_api_v2 ) {
127- # Add a column that is the proportion true over
128- # the whole 8 week modeling period.
129- query <- "
130- WITH facility_checks AS (
131- SELECT *,
132- -- This is the same as `all(any_visits_this_day)`
133- -- when grouped by facility
134- AVG(IF(any_visits_this_day, 1, 0)) OVER
135- (PARTITION BY facility) AS proportion_true
136- FROM read_parquet(?)
137- -- Filter here during the CTE, otherwise the PARTITION BY
138- -- statement will be computationally expensive
139- WHERE 1=1
140- AND disease LIKE ?
141- AND metric = 'count_ed_visits'
142- AND reference_date >= ? :: DATE
143- AND reference_date <= ? :: DATE
144- AND report_date = ? :: DATE
145- ) SELECT
82+ geo_select <- if (is_us ) {
83+ " 'US' AS geo_value"
84+ } else {
85+ " geo_value"
86+ }
87+
88+ geo_filter <- if (is_us ) {
89+ " "
90+ } else {
91+ " AND geo_value = ?"
92+ }
93+
94+ group_by <- if (is_us ) {
95+ " GROUP BY reference_date, report_date, disease"
96+ } else {
97+ " GROUP BY geo_value, reference_date, report_date, disease"
98+ }
99+
100+ if (! is_api_v2 ) {
101+ query <- glue :: glue("
102+ SELECT
146103 report_date,
147104 reference_date,
148105 CASE
149106 WHEN disease = 'COVID-19/Omicron' THEN 'COVID-19'
150107 ELSE disease
151108 END AS disease,
152- -- We want to inject the 'US' as our abbrevation here bc data
153- -- is not agg'd
154- 'US' AS geo_value,
155- sum(value) AS confirm
156- FROM facility_checks
157- WHERE proportion_true >= ?
158- GROUP BY reference_date, report_date, disease
109+ {geo_select},
110+ SUM(value) AS confirm
111+ FROM read_parquet(?)
112+ WHERE disease LIKE ?
113+ AND metric = 'count_ed_visits'
114+ AND reference_date >= ?::DATE
115+ AND reference_date <= ?::DATE
116+ AND report_date = ?::DATE
117+ {geo_filter}
118+ {group_by}
159119 ORDER BY reference_date
160- "
161- # Append `facility_active_proportion` to the query
162- parameters <- c(
163- parameters ,
164- list (
165- facility_active_proportion = facility_active_proportion
166- )
167- )
120+ " )
121+
122+ params <- base_params
123+ if (! is_us ) {
124+ params <- c(params , list (geo_value = geo_value ))
125+ }
168126 } else {
169- # Add a column that is the proportion true over
170- # the whole 8 week modeling period.
171- query <- "
127+ query <- glue :: glue("
172128 WITH facility_checks AS (
173- SELECT *,
174- -- This is the same as `all(any_visits_this_day)`
175- -- when grouped by facility
176- AVG(IF(any_visits_this_day, 1, 0)) OVER
177- (PARTITION BY facility ) AS proportion_true
129+ SELECT
130+ *,
131+ AVG(IF(any_visits_this_day, 1, 0)) OVER (
132+ PARTITION BY facility
133+ ) AS proportion_true
178134 FROM read_parquet(?)
179- -- Filter here during the CTE, otherwise the PARTITION BY
180- -- statement will be computationally expensive
181- WHERE 1=1
182- AND disease LIKE ?
135+ WHERE disease LIKE ?
183136 AND metric = 'count_ed_visits'
184- AND reference_date >= ? :: DATE
185- AND reference_date <= ? :: DATE
186- AND report_date = ? :: DATE
187- AND geo_value = ?
188- ) SELECT
137+ AND reference_date >= ?::DATE
138+ AND reference_date <= ?::DATE
139+ AND report_date = ?::DATE
140+ {geo_filter}
141+ )
142+ SELECT
189143 report_date,
190144 reference_date,
191145 CASE
192146 WHEN disease = 'COVID-19/Omicron' THEN 'COVID-19'
193147 ELSE disease
194148 END AS disease,
195- geo_value AS geo_value ,
196- sum (value) AS confirm
149+ {geo_select} ,
150+ SUM (value) AS confirm
197151 FROM facility_checks
198152 WHERE proportion_true >= ?
199- GROUP BY geo_value, reference_date, report_date, disease
153+ {group_by}
200154 ORDER BY reference_date
201- "
202- # Append `geo_value` to the query
203- parameters <- c(
204- parameters ,
205- list (
206- geo_value = geo_value ,
207- facility_active_proportion = facility_active_proportion
208- )
155+ " )
156+
157+ params <- base_params
158+ if (! is_us ) {
159+ params <- c(params , list (geo_value = geo_value ))
160+ }
161+ params <- c(
162+ params ,
163+ list (facility_active_proportion = facility_active_proportion )
209164 )
210165 }
211166
212167 df <- rlang :: try_fetch(
213168 DBI :: dbGetQuery(
214169 con ,
215170 statement = query ,
216- params = unname(parameters )
171+ params = unname(params )
217172 ),
218173 error = function (con ) {
219174 cli :: cli_abort(
220175 c(
221176 " Error fetching data from {.path {data_path}}" ,
222177 " Using parameters:" ,
223- " *" = " data_path: {.path {parameters [['data_path']]}}" ,
224- " *" = " disease: {.val {parameters [['disease']]}}" ,
225- " *" = " min_reference_date: {.val {parameters [['min_ref_date']]}}" ,
226- " *" = " max_reference_date: {.val {parameters [['max_ref_date']]}}" ,
227- " *" = " report_date: {.val {parameters [['report_date']]}}" ,
228- " *" = " geo_value: {.val {parameters[[' geo_value']] }}" ,
178+ " *" = " data_path: {.path {base_params [['data_path']]}}" ,
179+ " *" = " disease: {.val {base_params [['disease']]}}" ,
180+ " *" = " min_reference_date: {.val {base_params [['min_ref_date']]}}" ,
181+ " *" = " max_reference_date: {.val {base_params [['max_ref_date']]}}" ,
182+ " *" = " report_date: {.val {base_params [['report_date']]}}" ,
183+ " *" = " geo_value: {.val {geo_value}}" ,
229184 " *" = paste0(
230- " facility_active_proportion:" ,
231- " {.val {parameters[[' facility_active_proportion']] }}"
185+ " facility_active_proportion: " ,
186+ " {.val {facility_active_proportion}}"
232187 ),
233188 " Original error: {con}"
234189 ),
@@ -237,33 +192,29 @@ read_data <- function(
237192 }
238193 )
239194
240- # Guard against empty return
241195 if (nrow(df ) == 0 ) {
242196 cli :: cli_abort(
243197 c(
244198 " No data matching returned from {.path {data_path}}" ,
245- " Using parameters {parameters }"
199+ " Using parameters {base_params }"
246200 ),
247201 class = " empty_return"
248202 )
249203 }
250- # Warn for incomplete return
204+
251205 n_rows_expected <- as.Date(max_reference_date ) -
252- as.Date(min_reference_date ) +
253- 1
206+ as.Date(min_reference_date ) + 1
207+
254208 if (nrow(df ) != n_rows_expected ) {
255209 expected_dates <- seq.Date(
256210 from = as.Date(min_reference_date ),
257211 to = as.Date(max_reference_date ),
258212 by = " day"
259213 )
260214 missing_dates <- stringify_date(
261- # Setdiff strips the date attribute from the objects; re-add it so that we
262- # can pretty-format the date for printing
263- as.Date(
264- setdiff(expected_dates , df [[" reference_date" ]])
265- )
215+ as.Date(setdiff(expected_dates , df [[" reference_date" ]]))
266216 )
217+
267218 cli :: cli_warn(
268219 c(
269220 " Incomplete number of rows returned" ,
@@ -276,5 +227,5 @@ read_data <- function(
276227 }
277228
278229 cli :: cli_alert_success(" Read {nrow(df)} rows from {.path {data_path}}" )
279- return ( df )
230+ df
280231}
0 commit comments