Skip to content

Gating dispatcher based on uncollected results #454

@t-kalinowski

Description

@t-kalinowski

Problem

mirai launches tasks eagerly. If result consumers are slower than producers, results accumulate in memory with no backpressure.

Proposal

Add a dispatcher option to gate new task launches until a configurable number of results have been collected.

Sketch:

daemons(dispatcher = TRUE, max_uncollected = 10)

Usage example:

mirai::daemons(10, dispatcher = TRUE, max_uncollected = 10)

ps <- lapply(1:20, function(x) {
  mirai::mirai({
    # fast background producer
    lock <- filelock::lock("test.log")
    cat(as.character(Sys.time()), "\n", file = "test.log", append = TRUE)
    filelock::unlock(lock)
  }) |> 
    promises::then(function(x) {
      # slow main-thread consumer
      Sys.sleep(2)
    })
})

p <- promises::promise_all(.list = ps)

Workers are blocked once max_uncollected results are waiting to be collected in the main process. New tasks are dispatched only after the main thread consumes items.

Benefits

  • Prevents unbounded memory growth.
  • Removes need for implementing a manual queue.
  • Matches channel(maxsize) semantics common in other systems.

Example Use Cases

  • ML dataloader: avoid preloading large batches faster than they can be consumed by the model.
  • DB write operations: prevent results from piling up while waiting on a single-threaded writer.

References

cc: @dfalbel

Metadata

Metadata

Assignees

Labels

featurea feature request or enhancement

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions