Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co

## Overview

mirai is a minimalist async evaluation framework for R that provides asynchronous, parallel and distributed computing. Built on nanonext and NNG (Nanomsg-Next-Generation), it implements a message-passing paradigm where daemons (persistent background processes) execute tasks sent by the host process. Only runtime dependency: nanonext (>= 1.8.0). Requires R >= 3.6.
mirai is a minimalist async evaluation framework for R that provides asynchronous, parallel and distributed computing. Built on nanonext and NNG (Nanomsg-Next-Generation), it implements a message-passing paradigm where daemons (persistent background processes) execute tasks sent by the host process. Only runtime dependency: nanonext. Requires R >= 3.6.

## Development Commands

Expand Down Expand Up @@ -59,9 +59,11 @@ The package uses **litedown** (not knitr) as VignetteBuilder for final rendering
- **mirai()**: Creates an async evaluation, returns immediately with a 'mirai' object
- **daemons()**: Sets up persistent background daemon processes
- **daemon()**: The daemon instance running in background processes
- **dispatcher()**: FIFO scheduler (reimplemented in C in nanonext 1.8.0+ for ~50% less overhead)
- **Dispatcher**: FIFO scheduler implemented in C within nanonext (managed via nanonext's `.dispatcher_start`/`.dispatcher_stop`/`.dispatcher_info`)
- **mirai_map()**: Async parallel map with progress bars and early stopping
- **everywhere()**: Evaluates expressions on all connected daemons
- **collect_mirai()/call_mirai()**: Block until results are available
- **status()**: Query daemon/dispatcher status

### Message-Passing Topology

Expand Down Expand Up @@ -104,9 +106,8 @@ Also caches the Rscript path in `.command` and checks for cli package availabili

- **mirai-package.R**: Package docs, `.onLoad`, global state (`.`, `..`, `.opts`, `._`), constants
- **mirai.R**: Core `mirai()`, `unresolved()`, `call_mirai()`, `stop_mirai()`, `everywhere()`, `race_mirai()`
- **daemons.R**: `daemons()`, remote configs, compute profiles, `with_daemons()`, `local_daemons()`
- **daemons.R**: `daemons()`, dispatcher launch (`launch_dispatcher()`), compute profiles, `with_daemons()`, `local_daemons()`
- **daemon.R**: Daemon instance implementation
- **dispatcher.R**: Dispatcher process (thin wrapper — core logic now in C via nanonext)
- **map.R**: `mirai_map()` with collection options
- **launchers.R**: `launch_local()`, `launch_remote()`, `remote_config()`, `ssh_config()`, `cluster_config()`, `http_config()`
- **parallel.R**: `make_cluster()` — official alternative communications backend for R's parallel package
Expand Down Expand Up @@ -134,9 +135,10 @@ Custom error classes with structured information:
## CI/CD

GitHub Actions in `.github/workflows/`:
- **R-CMD-check.yaml**: 8 OS/R-version combinations (Ubuntu ARM devel, Ubuntu release/oldrel, macOS release/oldrel, Windows release/oldrel)
- **R-CMD-check.yaml**: 8 OS/R-version combinations (Ubuntu ARM devel, Ubuntu/macOS/Windows release+oldrel variants)
- **test-coverage.yaml**: Coverage via covr, uploaded to codecov
- **pkgdown.yaml**: Documentation site with tidytemplate
- **rhub.yaml**: R-Hub checks
- **shiny-coreci.yaml**: Shiny integration tests (manual trigger)
- **pr-commands.yaml**: PR comment commands `/document` (roxygen2) and `/style` (styler)

Expand Down
7 changes: 5 additions & 2 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export(collect_mirai)
export(daemon)
export(daemons)
export(daemons_set)
export(dispatcher)
export(everywhere)
export(host_url)
export(http_config)
Expand Down Expand Up @@ -68,8 +67,12 @@ export(with_daemons)
importFrom(nanonext,"opt<-")
importFrom(nanonext,.advance)
importFrom(nanonext,.context)
importFrom(nanonext,.dispatcher)
importFrom(nanonext,.dispatcher_info)
importFrom(nanonext,.dispatcher_start)
importFrom(nanonext,.dispatcher_stop)
importFrom(nanonext,.dispatcher_wait)
importFrom(nanonext,.keep)
importFrom(nanonext,.limit_gate)
importFrom(nanonext,.mark)
importFrom(nanonext,.unresolved)
importFrom(nanonext,call_aio)
Expand Down
9 changes: 8 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# mirai (development version)

#### New Features

* Dispatcher reimplemented as a thread for lower overhead, removing the separate dispatcher process (#581).
* Adds `capacity` argument to `daemons()` to set the maximum number of tasks at the dispatcher.
New tasks block until existing ones complete, providing backpressure to prevent unbounded memory growth (thanks @t-kalinowski, #454).

#### Updates

* Fixes transfer of large data (> ~2GB) on MacOS and Windows.
* Fixes transfer of large data (> ~2GB) on MacOS and Windows (#579).
* Fixes `mirai_map()` progress bar customization issues (thanks @mcol, #519).
* Fixes `launch_remote()` with `http_config()` failing for TLS connections, where newlines in the PEM certificate produced invalid JSON in the request payload.
* Improved performance and reduced memory consumption through optimizations in the underlying nanonext/NNG transport layer.
* Requires nanonext >= [1.8.2.9000].

# mirai 2.6.1
Expand Down
86 changes: 44 additions & 42 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
#' sound, non-reproducible). An integer value instead initializes a stream per
#' mirai, allowing reproducible results independent of which daemon evaluates
#' it.
#' @param capacity (integer) maximum number of tasks (queued plus executing) at
#' dispatcher. New tasks block until existing ones complete, providing
#' backpressure to control memory usage. `NULL` (default) allows unlimited
#' queuing. Requires dispatcher.
#' @param serial (configuration) for custom serialization of reference objects
#' (e.g. Arrow Tables, torch tensors), created by [serial_config()]. Requires
#' dispatcher. `NULL` applies any configurations from [register_serial()].
Expand All @@ -72,11 +76,12 @@
#'
#' @section Dispatcher:
#'
#' By default `dispatcher = TRUE` launches a background process running
#' [dispatcher()]. Dispatcher connects to daemons on behalf of the host, queues
#' tasks, and ensures optimal FIFO scheduling. Dispatcher also enables (i) mirai
#' cancellation using [stop_mirai()] or when using a `.timeout` argument to
#' [mirai()], and (ii) the use of custom serialization configurations.
#' By default `dispatcher = TRUE` enables optimal FIFO scheduling, queuing
#' tasks and sending to daemons as they become available. The `capacity`
#' argument controls the maximum number of tasks at the dispatcher, providing
#' backpressure to prevent excessive memory usage. Dispatcher also enables
#' (i) mirai cancellation using [stop_mirai()] or a `.timeout` argument to
#' [mirai()], and (ii) custom serialization configurations.
#'
#' With `dispatcher = FALSE`, daemons connect directly to the host and tasks
#' are distributed round-robin, with tasks queued at each daemon. Optimal
Expand Down Expand Up @@ -221,6 +226,7 @@ daemons <- function(
...,
sync = FALSE,
seed = NULL,
capacity = NULL,
serial = NULL,
tls = NULL,
pass = NULL,
Expand Down Expand Up @@ -254,7 +260,7 @@ daemons <- function(
envir <- init_envir_stream(seed)
dots <- parse_dots(envir, ...)
if (dispatcher) {
launch_dispatcher(n, dots, envir, serial)
launch_dispatcher(n, dots, envir, serial, capacity = capacity)
} else {
launch_daemons(seq_len(n), dots, envir)
}
Expand All @@ -267,7 +273,7 @@ daemons <- function(
dots <- parse_dots(envir, ...)
cfg <- configure_tls(url, tls, pass, envir)
if (dispatcher) {
launch_dispatcher(url, dots, envir, serial, tls = cfg[[1L]], pass = pass)
launch_dispatcher(url, dots, envir, serial, tls = cfg[[1L]], pass = pass, capacity = capacity)
} else {
create_sock(envir, url, cfg[[2L]])
}
Expand Down Expand Up @@ -393,11 +399,10 @@ status <- function(.compute = NULL) {
info <- function(.compute = NULL) {
envir <- compute_env(.compute)
is.null(envir) && return()
if (is.null(envir[["dispatcher"]])) {
res <- c(as.integer(stat(envir[["sock"]], "pipes")), NA, NA, NA, NA)
res <- if (is.null(envir[["dispatcher"]])) {
c(as.integer(stat(envir[["sock"]], "pipes")), NA, NA, NA, NA)
} else {
res <- query_dispatcher(envir[["sock"]], c(0L, 0L))
is.object(res) && return()
.dispatcher_info(envir[["dispatcher"]])
}
`names<-`(res, c("connections", "cumulative", "awaiting", "executing", "completed"))
}
Expand Down Expand Up @@ -594,6 +599,9 @@ reset_daemons <- function(.compute, envir, signal = FALSE) {
send_signal(envir)
}
reap(envir[["sock"]])
if (!is.null(envir[["dispatcher"]])) {
.dispatcher_stop(envir[["dispatcher"]])
}
otel_span("daemons reset", envir, links = list(envir[["otel_span"]]))
`[[<-`(.., .compute, NULL)
msleep(.sleep_daemons)
Expand Down Expand Up @@ -652,64 +660,59 @@ args_daemon_disp <- function(url, dots, rs = NULL, tls = NULL) {
sprintf("mirai::daemon(\"%s\"%s%s)", url, dots, parse_tls(tls))
}

args_dispatcher <- function(urld, url, n) {
sprintf(".libPaths(\"%s\");mirai::dispatcher(\"%s\",url=\"%s\",n=%d)", libp(), urld, url, n)
}
inproc_url <- function() sprintf("inproc://%s", random(12L))

launch_daemon <- function(args) system2(.command, args = c("-e", shQuote(args)), wait = FALSE)

query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short) {
r <- send(sock, command, mode = send_mode, block = block)
r && return(r)
recv(sock, mode = recv_mode, block = block)
}

sync_with <- function(cv, message_key, sync = 0L) {
while (!until(cv, .limit_long)) {
message(sprintf(._[[message_key]], sync <- sync + .limit_long_secs))
}
}

launch_dispatcher <- function(url, dots, envir, serial, tls = NULL, pass = NULL) {
cv <- cv()
urld <- local_url()
sock <- req_socket(urld)
pipe_notify(sock, cv, add = TRUE)
launch_dispatcher <- function(url, dots, envir, serial, tls = NULL, pass = NULL, capacity = NULL) {
local <- is.numeric(url)
n <- if (local) url else 0L
if (local) {
url <- local_url()
}
system2(
.command,
args = c("--default-packages=NULL", "--vanilla", "-e", shQuote(args_dispatcher(urld, url, n))),
wait = FALSE
)

if (is.null(serial)) {
serial <- .[["serial"]]
}

tls_cfg <- NULL
if (!local && length(tls)) {
tls_cfg <- tls_config(server = tls, pass = pass)
}

urld <- inproc_url()
sock <- req_socket(urld)

cv <- cv()

disp <- .dispatcher_start(url, urld, tls_cfg, serial, envir[["stream"]], capacity, cv)

if (!local) {
url <- attr(disp, "url")
}
if (is.list(serial)) {
`opt<-`(sock, "serial", serial)
}

`[[<-`(envir, "cv", cv)
`[[<-`(envir, "sock", sock)
`[[<-`(envir, "dispatcher", urld)
data <- list(Sys.getenv("R_DEFAULT_PACKAGES"), tls, pass, serial, envir[["stream"]])

sync_with(cv, "sync_dispatcher")
`[[<-`(envir, "dispatcher", disp)
`[[<-`(envir, "url", url)

pipe_notify(sock, NULL, add = TRUE)
send(sock, data, mode = 1L, block = TRUE)
if (local) {
launch_args <- args_daemon_disp(url, dots)
for (i in seq_len(n)) {
launch_daemon(launch_args)
}
.dispatcher_wait(disp, n)
}
raio <- recv_aio(sock, mode = 2L, cv = cv)
sync_with(cv, "sync_dispatcher")

`[[<-`(envir, "url", collect_aio(raio))
}

launch_daemons <- function(seq, dots, envir) {
Expand Down Expand Up @@ -739,7 +742,7 @@ send_signal <- function(envir) {
signals <- if (is.null(envir[["dispatcher"]])) {
stat(envir[["sock"]], "pipes")
} else {
query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
.dispatcher_info(envir[["dispatcher"]])[1L]
}
for (i in seq_len(signals)) {
send(envir[["sock"]], ._scm_., mode = 2L)
Expand All @@ -748,8 +751,7 @@ send_signal <- function(envir) {
}

dispatcher_status <- function(envir) {
status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
is.object(status) && return(status)
status <- .dispatcher_info(envir[["dispatcher"]])
list(
connections = status[1L],
daemons = envir[["url"]],
Expand Down
73 changes: 0 additions & 73 deletions R/dispatcher.R

This file was deleted.

5 changes: 3 additions & 2 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
#' `vignette("mirai", package = "mirai")`
#'
#' @importFrom nanonext .advance call_aio call_aio_ collect_aio collect_aio_
#' .context cv cv_reset cv_signal cv_value dial .dispatcher handler
#' http_server ip_addr is_error_value .keep listen .mark mclock monitor msleep
#' .context cv cv_reset cv_signal cv_value dial .dispatcher_info
#' .dispatcher_start .dispatcher_stop .dispatcher_wait handler http_server
#' ip_addr is_error_value .keep .limit_gate listen .mark mclock monitor msleep
#' ncurl nng_error opt opt<- parse_url pipe_id pipe_notify race_aio random
#' reap recv recv_aio request send serial_config socket stat stop_aio
#' stop_request tls_config unresolved .unresolved until wait wait_ write_cert
Expand Down
7 changes: 5 additions & 2 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,19 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = NULL)

is.null(envir) && return(ephemeral_daemon(data, .timeout))

disp <- envir[["dispatcher"]]
is.null(disp) || .limit_gate(disp)

req <- request(
.context(envir[["sock"]]),
data,
send_mode = 1L,
recv_mode = 1L,
timeout = .timeout,
cv = envir[["cv"]],
id = envir[["dispatcher"]]
id = disp
)

otel_set_span_id(ctx_spn[[2L]], attr(req, "id"))
envir[["sync"]] && evaluate_sync(envir)
invisible(req)
Expand Down Expand Up @@ -709,5 +713,4 @@ mk_mirai_error <- function(cnd) {
`class<-`(`attributes<-`(msg, cnd), c("miraiError", "errorValue", "try-error"))
}

.connReset <- serialize(`class<-`(19L, c("errorValue", "try-error")), NULL)
.snapshot <- expression(on.exit(mirai:::snapshot(), add = TRUE))
Loading
Loading