diff --git a/DESCRIPTION b/DESCRIPTION index 7127afec6..5d780e966 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 0.13.2.9001 +Version: 0.13.2.9002 Description: Lightweight parallel code execution and distributed computing. Designed for simplicity, a 'mirai' evaluates an R expression asynchronously, on local or network resources, resolving automatically upon completion. @@ -23,13 +23,12 @@ Encoding: UTF-8 Depends: R (>= 3.6) Imports: - nanonext (>= 0.13.3) + nanonext (>= 0.13.6.9002) Enhances: parallel, promises Suggests: knitr, - later, markdown VignetteBuilder: knitr RoxygenNote: 7.3.1 diff --git a/NAMESPACE b/NAMESPACE index a360c9962..a233b8ede 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -65,6 +65,7 @@ importFrom(nanonext,reap) importFrom(nanonext,recv) importFrom(nanonext,recv_aio_signal) importFrom(nanonext,request) +importFrom(nanonext,request2) importFrom(nanonext,request_signal) importFrom(nanonext,send) importFrom(nanonext,socket) diff --git a/NEWS.md b/NEWS.md index 0015f3432..a36acce43 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,7 @@ -# mirai 0.13.2.9001 (development) +# mirai 0.13.2.9002 (development) -* `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng #110). +* Re-implements the promises method with completely event-driven (non-polling) promises (possible thanks to improvements in `nanonext` implemented with the help of @jcheng5) +* `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng5 #110). # mirai 0.13.2 diff --git a/R/mirai-package.R b/R/mirai-package.R index 1714fd7b5..d39ad0514 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -44,8 +44,9 @@ #' #' @importFrom nanonext call_aio call_aio_ .context cv cv_value dial #' is_error_value listen lock mclock msleep next_config opt opt<- parse_url -#' pipe_notify random reap recv recv_aio_signal request request_signal send -#' socket stat stop_aio strcat tls_config unresolved until wait write_cert +#' pipe_notify random reap recv recv_aio_signal request request2 +#' request_signal send socket stat stop_aio strcat tls_config unresolved +#' until wait write_cert #' @importFrom parallel nextRNGStream #' @importFrom stats rexp #' @importFrom utils .DollarNames diff --git a/R/mirai.R b/R/mirai.R index 0be41dd37..4c1c1201c 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -145,10 +145,10 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau envir <- ..[[.compute]] if (is.null(envir)) { sock <- ephemeral_daemon(local_url()) - aio <- request(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout) + aio <- request2(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout) `attr<-`(.subset2(aio, "aio"), "sock", sock) } else { - aio <- request_signal(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout) + aio <- request2(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout) } `class<-`(aio, c("mirai", "recvAio")) diff --git a/R/promises.R b/R/promises.R index 28ac2b499..584e2a243 100644 --- a/R/promises.R +++ b/R/promises.R @@ -64,22 +64,33 @@ #' as.promise.mirai <- function(x) { - force(x) - promises::then( - promise = promises::promise( - function(resolve, reject) { - query <- function() - if (unresolved(x)) - later::later(query, delay = 0.1) else - resolve(.subset2(x, "value")) - query() - } - ), - onFulfilled = function(value) - if (is_error_value(value) && !is_mirai_interrupt(value)) - stop(value) else - value - ) + promise <- .subset2(x, "promise") + + if (is.null(promise)) { + + promise <- promises::then( + promises::promise( + function(resolve, reject) + attr(x, "callback") <- function() resolve(.subset2(x, "data")) + ), + onFulfilled = function(value) + if (is_error_value(value) && !is_mirai_interrupt(value)) + stop(value) else + value + ) + + if (!unresolved(x)) { + value <- .subset2(x, "value") + promise <- if (is_error_value(value) && !is_mirai_interrupt(value)) + promises::promise_reject(value) else + promises::promise_resolve(value) + } + + assign("promise", promise, x) + + } + + promise } diff --git a/README.Rmd b/README.Rmd index 4f8446806..be96480a6 100644 --- a/README.Rmd +++ b/README.Rmd @@ -152,7 +152,7 @@ We would like to thank in particular: [Will Landau](https://github.com/wlandau/), for being instrumental in shaping development of the package, from initiating the original request for persistent daemons, through to orchestrating robustness testing for the high performance computing requirements of `crew` and `targets`. -[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` method to make `mirai` work seamlessly within Shiny, and guidance for implementing error stack traces. +[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` method to make `mirai` work seamlessly within Shiny, prototyping non-polling promises and guidance on implementing error stack traces. [Luke Tierney](https://github.com/ltierney/), R Core, for discussion on R's implementation of L'Ecuyer-CMRG streams, used to ensure statistical independence in parallel processing, and collaboration in 'providing an alternative communications backend for R'. diff --git a/README.md b/README.md index a62dff9c0..3f852cd4f 100644 --- a/README.md +++ b/README.md @@ -88,8 +88,8 @@ result. ``` r m$data -#> [1] 1.8451625 -0.2165991 -1.2117962 0.4428216 1.3387124 0.7469864 -#> [7] 2.2582459 -0.8252213 -4.6168235 0.5419577 +#> [1] 2.4181337 1.8674723 6.6228207 -1.0355541 0.8055614 1.2413703 +#> [7] -0.9656666 0.1509931 0.5354832 0.4135421 ``` Alternatively, explicitly call and wait for the result using @@ -97,8 +97,8 @@ Alternatively, explicitly call and wait for the result using ``` r call_mirai(m)$data -#> [1] 1.8451625 -0.2165991 -1.2117962 0.4428216 1.3387124 0.7469864 -#> [7] 2.2582459 -0.8252213 -4.6168235 0.5419577 +#> [1] 2.4181337 1.8674723 6.6228207 -1.0355541 0.8055614 1.2413703 +#> [7] -0.9656666 0.1509931 0.5354832 0.4135421 ``` ### Daemons @@ -184,8 +184,8 @@ for persistent daemons, through to orchestrating robustness testing for the high performance computing requirements of `crew` and `targets`. [Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` -method to make `mirai` work seamlessly within Shiny, and guidance for -implementing error stack traces. +method to make `mirai` work seamlessly within Shiny, prototyping +non-polling promises and guidance on implementing error stack traces. [Luke Tierney](https://github.com/ltierney/), R Core, for discussion on R’s implementation of L’Ecuyer-CMRG streams, used to ensure statistical