Skip to content

Commit 87b3bb5

Browse files
authored
More queue improvements (#699)
* Fix progress bar buglet and update on every process * Separate progress bar for retries + throttling * Unify `errored` and `finishing` states * Drop circuit-breaker (since unlikely to work for same reason as retries)
1 parent 0a2caa0 commit 87b3bb5

File tree

3 files changed

+66
-64
lines changed

3 files changed

+66
-64
lines changed

R/req-perform-parallel.R

+37-46
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#'
88
#' While running, you'll get a progress bar that looks like:
99
#' `[working] (1 + 4) -> 5 -> 5`. The string tells you the current status of
10-
#' the queue (e.g. working, waiting, errored, finishing) followed by (the
10+
#' the queue (e.g. working, waiting, errored) followed by (the
1111
#' number of pending requests + pending retried requests) -> the number of
1212
#' active requests -> the number of complete requests.
1313
#'
@@ -24,7 +24,8 @@
2424
#'
2525
#' Additionally, it does not respect the `max_tries` argument to `req_retry()`
2626
#' because if you have five requests in flight and the first one gets rate
27-
#' limited, it's likely that all the others do too.
27+
#' limited, it's likely that all the others do too. This also means that
28+
#' the circuit breaker is never triggered.
2829
#'
2930
#' @inherit req_perform_sequential params return
3031
#' @param pool `r lifecycle::badge("deprecated")`. No longer supported;
@@ -154,7 +155,7 @@ RequestQueue <- R6::R6Class(
154155
total = n,
155156
format = paste0(
156157
"[{self$queue_status}] ",
157-
"({self$n_pending} + {self$n_retried}) -> {self$n_active} -> {self$n_complete} | ",
158+
"({self$n_pending} + {self$n_retries}) -> {self$n_active} -> {self$n_complete} | ",
158159
"{cli::pb_bar} {cli::pb_percent}"
159160
),
160161
.envir = error_call
@@ -209,67 +210,58 @@ RequestQueue <- R6::R6Class(
209210
# Exposed for testing, so we can manaully work through one step at a time
210211
process1 = function(deadline = Inf) {
211212
if (self$queue_status == "done") {
212-
FALSE
213-
} else if (self$queue_status == "waiting") {
213+
return(FALSE)
214+
}
215+
216+
if (!is.null(self$progress)) {
217+
cli::cli_progress_update(id = self$progress, set = self$n_complete)
218+
}
219+
220+
if (self$queue_status == "waiting") {
214221
request_deadline <- max(self$token_deadline, self$rate_limit_deadline)
215-
if (request_deadline <= deadline) {
216-
# Assume we're done waiting; done_failure() will reset if needed
222+
if (request_deadline <= unix_time()) {
217223
self$queue_status <- "working"
218-
pool_wait_for_deadline(self$pool, request_deadline)
219-
NULL
224+
return()
225+
}
226+
227+
if (self$rate_limit_deadline > self$token_deadline) {
228+
waiting <- "for rate limit"
220229
} else {
221-
pool_wait_for_deadline(self$pool, deadline)
222-
TRUE
230+
waiting <- "for throttling"
223231
}
232+
pool_wait_for_deadline(self$pool, min(request_deadline, deadline), waiting)
233+
NULL
224234
} else if (self$queue_status == "working") {
225-
if (self$n_pending == 0) {
226-
self$queue_status <- "finishing"
227-
} else if (self$n_active < self$max_active) {
235+
if (self$n_pending == 0 && self$n_active == 0) {
236+
self$queue_status <- "done"
237+
} else if (self$n_pending > 0 && self$n_active <= self$max_active) {
228238
if (!self$submit_next(deadline)) {
229239
self$queue_status <- "waiting"
230240
}
231241
} else {
232242
pool_wait_for_one(self$pool, deadline)
233243
}
234244
NULL
235-
} else if (self$queue_status == "finishing") {
236-
pool_wait_for_one(self$pool, deadline)
237-
238-
if (self$rate_limit_deadline > unix_time()) {
239-
self$queue_status <- "waiting"
240-
} else if (self$n_pending > 0) {
241-
# we had to retry
242-
self$queue_status <- "working"
243-
} else if (self$n_active > 0) {
244-
# keep going
245-
self$queue_status <- "finishing"
245+
} else if (self$queue_status == "errored") {
246+
# Finish out any active requests but don't add any more
247+
if (self$n_active > 0) {
248+
pool_wait_for_one(self$pool, deadline)
246249
} else {
247250
self$queue_status <- "done"
248251
}
249252
NULL
250-
} else if (self$queue_status == "errored") {
251-
# Finish out any active request but don't add any more
252-
pool_wait_for_one(self$pool, deadline)
253-
self$queue_status <- if (self$n_active > 0) "errored" else "done"
254-
NULL
255253
}
256254
},
257255

258256
submit_next = function(deadline) {
259-
next_i <- which(self$status == "pending")[[1]]
257+
i <- which(self$status == "pending")[[1]]
260258

261-
self$token_deadline <- throttle_deadline(self$reqs[[next_i]])
259+
self$token_deadline <- throttle_deadline(self$reqs[[i]])
262260
if (self$token_deadline > unix_time()) {
263-
throttle_return_token(self$reqs[[next_i]])
261+
throttle_return_token(self$reqs[[i]])
264262
return(FALSE)
265263
}
266264

267-
self$submit(next_i)
268-
},
269-
270-
submit = function(i) {
271-
retry_check_breaker(self$reqs[[i]], self$tries[[i]], error_call = error_call)
272-
273265
self$set_status(i, "active")
274266
self$resps[i] <- list(NULL)
275267
self$tries[[i]] <- self$tries[[i]] + 1
@@ -305,6 +297,7 @@ RequestQueue <- R6::R6Class(
305297

306298
self$set_status(i, "pending")
307299
self$n_retries <- self$n_retries + 1
300+
self$queue_status <- "waiting"
308301
} else if (resp_is_invalid_oauth_token(req, resp) && self$can_reauth(i)) {
309302
# This isn't quite right, because if there are (e.g.) four requests in
310303
# the queue and the first one fails, we'll clear the cache for all four,
@@ -336,10 +329,6 @@ RequestQueue <- R6::R6Class(
336329
)
337330

338331
self$status[[i]] <- status
339-
340-
if (!is.null(self$progress)) {
341-
cli::cli_progress_update(id = self$progress, set = self$n_complete)
342-
}
343332
},
344333

345334
can_retry = function(i) {
@@ -357,7 +346,7 @@ pool_wait_for_one <- function(pool, deadline) {
357346
pool_wait(pool, poll = TRUE, timeout = timeout)
358347
}
359348

360-
pool_wait_for_deadline <- function(pool, deadline) {
349+
pool_wait_for_deadline <- function(pool, deadline, waiting_for) {
361350
now <- unix_time()
362351
timeout <- deadline - now
363352
if (timeout <= 0) {
@@ -368,8 +357,10 @@ pool_wait_for_deadline <- function(pool, deadline) {
368357

369358
# pool might finish early; we still want to wait out the full time
370359
remaining <- timeout - (unix_time() - now)
371-
if (remaining > 0) {
372-
# cat("Sleeping for ", remaining, " seconds\n", sep = "")
360+
if (remaining > 2) {
361+
# Use a progress bar
362+
sys_sleep(remaining, waiting_for)
363+
} else if (remaining > 0) {
373364
Sys.sleep(remaining)
374365
}
375366

man/req_perform_parallel.Rd

+3-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/testthat/test-req-perform-parallel.R

+26-16
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,14 @@ test_that("both curl and HTTP errors become errors on continue", {
116116
test_that("errors can cancel outstanding requests", {
117117
reqs <- list2(
118118
request_test("/status/:status", status = 404),
119-
request_test("/delay/:secs", secs = 2),
119+
request_test("/delay/:secs", secs = 1),
120+
request_test("/delay/:secs", secs = 1),
120121
)
121122
out <- req_perform_parallel(reqs, on_error = "return", max_active = 1)
122123
expect_s3_class(out[[1]], "httr2_http_404")
123-
expect_null(out[[2]])
124+
# second request might succeed or fail depend on the timing, but the
125+
# third request should definitely fail
126+
expect_null(out[[3]])
124127
})
125128

126129
test_that("req_perform_parallel resspects http_error() error override", {
@@ -224,43 +227,50 @@ test_that("can retry a transient error", {
224227

225228
queue <- RequestQueue$new(list(req), progress = FALSE)
226229

227-
# Start processing
230+
# submit the request
228231
expect_null(queue$process1())
229232
expect_equal(queue$queue_status, "working")
230233
expect_equal(queue$n_active, 1)
231234
expect_equal(queue$n_pending, 0)
235+
expect_equal(queue$status[[1]], "active")
232236

233-
# No pending, so switch to finishing
234-
expect_null(queue$process1())
235-
expect_equal(queue$queue_status, "finishing")
236-
237-
# Now we process the request and capture the retry
237+
# process the response and capture the retry
238238
expect_null(queue$process1())
239239
expect_equal(queue$queue_status, "waiting")
240240
expect_equal(queue$rate_limit_deadline, mock_time + 2)
241241
expect_equal(queue$n_pending, 1)
242242
expect_s3_class(queue$resps[[1]], "httr2_http_429")
243243
expect_equal(resp_body_json(queue$resps[[1]]$resp), list(status = "waiting"))
244244

245-
# Now we "wait" 2 seconds
245+
# Starting waiting
246246
expect_null(queue$process1())
247-
expect_equal(queue$queue_status, "working")
247+
expect_equal(queue$queue_status, "waiting")
248248
expect_equal(mock_time, 3)
249249

250-
# Now we go back to working
250+
# Finishing waiting
251251
expect_null(queue$process1())
252252
expect_equal(queue$queue_status, "working")
253+
expect_equal(queue$n_active, 0)
254+
expect_equal(queue$n_pending, 1)
253255

254-
# Then resume finishing again
256+
# Resubmit
255257
expect_null(queue$process1())
256-
expect_equal(queue$queue_status, "finishing")
258+
expect_equal(queue$queue_status, "working")
259+
expect_equal(queue$n_active, 1)
260+
expect_equal(queue$n_pending, 0)
257261

258-
# And we're finally done
262+
# Process the response
263+
expect_null(queue$process1())
264+
expect_equal(queue$queue_status, "working")
265+
expect_equal(queue$n_active, 0)
266+
expect_equal(queue$n_pending, 0)
267+
expect_s3_class(queue$resps[[1]], "httr2_response")
268+
expect_equal(resp_body_json(queue$resps[[1]]), list(status = "done"))
269+
270+
# So we're finally done
259271
expect_null(queue$process1())
260272
expect_equal(queue$queue_status, "done")
261273
expect_false(queue$process1())
262-
263-
expect_equal(resp_body_json(queue$resps[[1]]), list(status = "done"))
264274
})
265275

266276
test_that("throttling is limited by deadline", {

0 commit comments

Comments
 (0)