Skip to content

Commit 89aacbd

Browse files
authored
Merge pull request #111 from shikokuchuo/dev
Implements event-driven (non-polling) promises
2 parents 8e5283f + 5e83033 commit 89aacbd

File tree

8 files changed

+45
-32
lines changed

8 files changed

+45
-32
lines changed

DESCRIPTION

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: mirai
22
Type: Package
33
Title: Minimalist Async Evaluation Framework for R
4-
Version: 0.13.2.9001
4+
Version: 0.13.2.9002
55
Description: Lightweight parallel code execution and distributed computing.
66
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
77
on local or network resources, resolving automatically upon completion.
@@ -23,13 +23,12 @@ Encoding: UTF-8
2323
Depends:
2424
R (>= 3.6)
2525
Imports:
26-
nanonext (>= 0.13.3)
26+
nanonext (>= 0.13.6.9002)
2727
Enhances:
2828
parallel,
2929
promises
3030
Suggests:
3131
knitr,
32-
later,
3332
markdown
3433
VignetteBuilder: knitr
3534
RoxygenNote: 7.3.1

NAMESPACE

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ importFrom(nanonext,reap)
6565
importFrom(nanonext,recv)
6666
importFrom(nanonext,recv_aio_signal)
6767
importFrom(nanonext,request)
68+
importFrom(nanonext,request2)
6869
importFrom(nanonext,request_signal)
6970
importFrom(nanonext,send)
7071
importFrom(nanonext,socket)

NEWS.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
# mirai 0.13.2.9001 (development)
1+
# mirai 0.13.2.9002 (development)
22

3-
* `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng #110).
3+
* Re-implements the promises method with completely event-driven (non-polling) promises (possible thanks to improvements in `nanonext` implemented with the help of @jcheng5)
4+
* `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng5 #110).
45

56
# mirai 0.13.2
67

R/mirai-package.R

+3-2
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@
4444
#'
4545
#' @importFrom nanonext call_aio call_aio_ .context cv cv_value dial
4646
#' is_error_value listen lock mclock msleep next_config opt opt<- parse_url
47-
#' pipe_notify random reap recv recv_aio_signal request request_signal send
48-
#' socket stat stop_aio strcat tls_config unresolved until wait write_cert
47+
#' pipe_notify random reap recv recv_aio_signal request request2
48+
#' request_signal send socket stat stop_aio strcat tls_config unresolved
49+
#' until wait write_cert
4950
#' @importFrom parallel nextRNGStream
5051
#' @importFrom stats rexp
5152
#' @importFrom utils .DollarNames

R/mirai.R

+2-2
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,10 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau
145145
envir <- ..[[.compute]]
146146
if (is.null(envir)) {
147147
sock <- ephemeral_daemon(local_url())
148-
aio <- request(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout)
148+
aio <- request2(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout)
149149
`attr<-`(.subset2(aio, "aio"), "sock", sock)
150150
} else {
151-
aio <- request_signal(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout)
151+
aio <- request2(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout)
152152
}
153153

154154
`class<-`(aio, c("mirai", "recvAio"))

R/promises.R

+27-16
Original file line numberDiff line numberDiff line change
@@ -64,22 +64,33 @@
6464
#'
6565
as.promise.mirai <- function(x) {
6666

67-
force(x)
68-
promises::then(
69-
promise = promises::promise(
70-
function(resolve, reject) {
71-
query <- function()
72-
if (unresolved(x))
73-
later::later(query, delay = 0.1) else
74-
resolve(.subset2(x, "value"))
75-
query()
76-
}
77-
),
78-
onFulfilled = function(value)
79-
if (is_error_value(value) && !is_mirai_interrupt(value))
80-
stop(value) else
81-
value
82-
)
67+
promise <- .subset2(x, "promise")
68+
69+
if (is.null(promise)) {
70+
71+
promise <- promises::then(
72+
promises::promise(
73+
function(resolve, reject)
74+
attr(x, "callback") <- function() resolve(.subset2(x, "data"))
75+
),
76+
onFulfilled = function(value)
77+
if (is_error_value(value) && !is_mirai_interrupt(value))
78+
stop(value) else
79+
value
80+
)
81+
82+
if (!unresolved(x)) {
83+
value <- .subset2(x, "value")
84+
promise <- if (is_error_value(value) && !is_mirai_interrupt(value))
85+
promises::promise_reject(value) else
86+
promises::promise_resolve(value)
87+
}
88+
89+
assign("promise", promise, x)
90+
91+
}
92+
93+
promise
8394

8495
}
8596

README.Rmd

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ We would like to thank in particular:
152152

153153
[Will Landau](https://github.com/wlandau/), for being instrumental in shaping development of the package, from initiating the original request for persistent daemons, through to orchestrating robustness testing for the high performance computing requirements of `crew` and `targets`.
154154

155-
[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` method to make `mirai` work seamlessly within Shiny, and guidance for implementing error stack traces.
155+
[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` method to make `mirai` work seamlessly within Shiny, prototyping non-polling promises and guidance on implementing error stack traces.
156156

157157
[Luke Tierney](https://github.com/ltierney/), R Core, for discussion on R's implementation of L'Ecuyer-CMRG streams, used to ensure statistical independence in parallel processing, and collaboration in 'providing an alternative communications backend for R'.
158158

README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -88,17 +88,17 @@ result.
8888

8989
``` r
9090
m$data
91-
#> [1] 1.8451625 -0.2165991 -1.2117962 0.4428216 1.3387124 0.7469864
92-
#> [7] 2.2582459 -0.8252213 -4.6168235 0.5419577
91+
#> [1] 2.4181337 1.8674723 6.6228207 -1.0355541 0.8055614 1.2413703
92+
#> [7] -0.9656666 0.1509931 0.5354832 0.4135421
9393
```
9494

9595
Alternatively, explicitly call and wait for the result using
9696
`call_mirai()`.
9797

9898
``` r
9999
call_mirai(m)$data
100-
#> [1] 1.8451625 -0.2165991 -1.2117962 0.4428216 1.3387124 0.7469864
101-
#> [7] 2.2582459 -0.8252213 -4.6168235 0.5419577
100+
#> [1] 2.4181337 1.8674723 6.6228207 -1.0355541 0.8055614 1.2413703
101+
#> [7] -0.9656666 0.1509931 0.5354832 0.4135421
102102
```
103103

104104
### Daemons
@@ -184,8 +184,8 @@ for persistent daemons, through to orchestrating robustness testing for
184184
the high performance computing requirements of `crew` and `targets`.
185185

186186
[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises`
187-
method to make `mirai` work seamlessly within Shiny, and guidance for
188-
implementing error stack traces.
187+
method to make `mirai` work seamlessly within Shiny, prototyping
188+
non-polling promises and guidance on implementing error stack traces.
189189

190190
[Luke Tierney](https://github.com/ltierney/), R Core, for discussion on
191191
R’s implementation of L’Ecuyer-CMRG streams, used to ensure statistical

0 commit comments

Comments
 (0)