Handling dead workers in future.apply #732
-
I want to parallelize a model fitting process that will reliably randomly crash the R session to many dozens of cores/workers. I was unable to locate the issue that crashes the session as it is not my own code, and as it is not systematic, meaning a participant that crashes in one attempt is modeled perfectly fine in the next. I therefore want to work around this issue, let a worker die, and resurrect the dead worker, and the run the same participant's data on the same worker node again. However, as of now, it seems that there is no option to do so, as Or so it seems. I have namely noticed that after such an error, the function seems to have stopped running and my r-console is free again and I can write commands and work normally again, but logs of my fitting-function continue to be written, although with decreasing frequency, until at some point that stops as well. It seems that Does this behaviour mean that I could use the Catching the error of My current code looks something like this: future::plan(multisession, workers = 4)
future.apply::future_lapply(unique(data$participant), function(x) myfitfunc(data %>% dplyr::filter(participant == x), additional_args = 'many')) Thanks for any help |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
Only got a few minutes, but then f <- future(<expr>)
v <- tryCatch(value(f), FutureError = identity)
if (inherits(v, "FutureError")) {
<do something>
}
... FWIW, the |
Beta Was this translation helpful? Give feedback.
-
UPDATE: As of future 1.40.0 (2025-04-10), there's no longer a need to orchestrate this manually. Futureverse detects when library(future)
cl <- parallelly::makeClusterPSOCK(4)
plan(cluster, workers = cl)
message(sprintf("Number of free workers: %d/%d", nbrOfFreeWorkers(), nbrOfWorkers()))
#> Number of free workers: 4/4
fs <- list()
fs$A <- future(42)
message(sprintf("Number of free workers: %d/%d", nbrOfFreeWorkers(), nbrOfWorkers()))
#> Number of free workers: 3/4
fs$B <- future(tools::pskill(Sys.getpid())) ## emulates a crashed worker
message(sprintf("Number of free workers: %d/%d", nbrOfFreeWorkers(), nbrOfWorkers()))
#> Number of free workers: 2/4
fs$C <- future(3.14)
message(sprintf("Number of free workers: %d/%d", nbrOfFreeWorkers(), nbrOfWorkers()))
#> Number of free workers: 1/4 We will, first of all, get a vs <- lapply(fs, function(f) tryCatch(value(f), FutureError = identity))
str(vs)
#> List of 3
#> $ A: num 42
#> $ B:List of 2
#> ..$ message: chr "Future (NULL) of class ClusterFuture interrupted, while running on 'localhost' (pid 249671)"
#> ..$ call : NULL
#> ..- attr(*, "class")= chr [1:5] "FutureInterruptError" "FutureError" "error" "FutureCondition" ...
#> ..- attr(*, "uuid")= chr [1:2] "861bcd4389679da99fc75f8e73d1face" "5"
#> ..- attr(*, "future")=Classes 'ClusterFuture', 'MultiprocessFuture', 'Future' <environment: 0x5cc117467260>
#> $ C: num 3.14 Note how the value of future B is an message(sprintf("Number of free workers: %d/%d", nbrOfFreeWorkers(), nbrOfWorkers()))
#> Number of free workers: 4/4 Moreover, if we would use: vs <- value(fs) then |
Beta Was this translation helpful? Give feedback.
UPDATE: As of future 1.40.0 (2025-04-10), there's no longer a need to orchestrate this manually. Futureverse detects when
cluster
andmultisession
workers have crashed and automatically relaunch them in the background. For example, if we launch three futures where one causes the parallel worker to terminate abruptly: