Skip to content

[async/pool] Proposal: Standalone async Pool class for non-iterable tasks #7178

@dev-a-loper

Description

@dev-a-loper

Is your feature request related to a problem? Please describe.
Managing concurrent asynchronous tasks in Deno currently relies heavily on pooledMap from @std/async. While powerful, pooledMap requires data to be formatted as an iterable or async iterator. This creates friction and boilerplate when dealing with dynamic data sources (e.g., event listeners, streams, or queues) where tasks don't exist in a pre-defined array. Additionally, handling individual task errors or checking the current state of pending/working tasks is not ergonomic with pooledMap.

Describe the solution you'd like
I am proposing an ergonomic, standalone Pool class that allows tasks to be executed dynamically without requiring iterables. This would allow developers to seamlessly "push" tasks into a concurrency-limited pool and optionally await their individual resolutions.

Here is my proposed implementation. It utilizes a FIFOQueue (a simple linked list) to ensure $O(1)$ complexity for enqueueing/dequeueing tasks:

import { FIFOQueue } from "./FIFOQueue.ts";

type Task<T> = () => Promise<T>;

class Deferred<T> {
  promise: Promise<T>;
  resolve!: (value: T | PromiseLike<T>) => void;
  reject!: (reason?: any) => void;

  constructor() {
    this.promise = new Promise((resolve, reject) => {
      this.resolve = resolve;
      this.reject = reject;
    });
  }
}

export class Pool {
  private workingCount = 0;
  private waitQueue: FIFOQueue<Deferred<void>> = new FIFOQueue();
  
  constructor(private size: number) {
    if (size < 1) {
      throw new Error("Pool size must be greater than 0");
    }
  }

  public async exec<T>(task: Task<T>): Promise<T> {
    if (this.workingCount >= this.size) {
      await this.waitForAvailability();
    }

    this.workingCount++;
    try {
      return await task();
    } finally {
      this.workingCount--;
      if (this.workingCount < this.size) {
        this.releaseFirstWaiter();
      }
    }
  }

  private releaseFirstWaiter() {
    this.waitQueue.dequeue()?.resolve();
  }

  private async waitForAvailability() {
    const wait = new Deferred<void>();
    this.waitQueue.enqueue(wait);
    await wait.promise;
  }

  public getSize(): number {
    return this.size;
  }

  public getWorkingCount(): number {
    return this.workingCount;
  }

  public getPendingCount(): number {
    return this.waitQueue.getSize();
  }
}
type Node<T> = {
  value: T;
  next: Node<T> | null;
};

export class FIFOQueue<T> {
  private head: Node<T> | null = null;
  private tail: Node<T> | null = null;
  private size = 0;

  constructor() {
    this.clear();
  }

  enqueue(value: T) {
    const node = { value, next: null };
    if (this.head) {
      this.tail!.next = node;
      this.tail = node;
    } else {
      this.head = node;
      this.tail = node;
    }
    this.size++;
  }

  dequeue(): T | undefined {
    const current = this.head;
    if (!current) return undefined;

    this.head = this.head!.next;
    this.size--;
    return current.value;
  }

  clear() {
    this.head = null;
    this.tail = null;
    this.size = 0;
  }

  getSize() {
    return this.size;
  }

  *[Symbol.iterator]() {
    let current = this.head;
    while (current) {
      yield current.value;
      current = current.next;
    }
  }
}

Usage Example & Comparison with pooledMap

To illustrate why this is more ergonomic, consider a scenario where tasks are coming in from an external event emitter (e.g., a message broker or a webhook).

The pooledMap Approach (Less Ergonomic for Streams):
Because pooledMap expects an iterable, integrating it with event-driven architectures requires wrapping the events in an async generator or accumulating them into arrays first. Error handling also requires wrapping a try/catch around the entire for await loop or inside the mapping function.

import { pooledMap } from "https://deno.land/std/async/pool.ts";

// Requires data to already be iterable
const items = [1, 2, 3, 4, 5];
const results = pooledMap(2, items, async (item) => {
  return await processItem(item);
});

for await (const result of results) {
  console.log(result);
}

The Proposed Pool Approach (More Ergonomic & Flexible):
The Pool allows for a fire-and-forget style or specific awaiting of individual tasks. You can dynamically push to the pool at any time, and easily peek at the system's current load via getPendingCount().

const pool = new Pool(2);

// 1. Dynamic / Event-Driven Usage
eventEmitter.on("message", async (msg) => {
  try {
    // Tasks are pushed directly; pool manages concurrency limits automatically
    const result = await pool.exec(() => processMessage(msg));
    console.log(result);
  } catch (err) {
    // Error handling is isolated to the specific task
    console.error("Task failed:", err);
  }
});

// 2. Standard Array mapping (still very clean)
const promises = items.map(item => pool.exec(() => processItem(item)));
const allResults = await Promise.all(promises);

// 3. Observability
console.log(`Currently processing: ${pool.getWorkingCount()}`);
console.log(`Waiting in queue: ${pool.getPendingCount()}`);

You can view test cases for this implementation at this repository.

Describe alternatives you've considered

  • Continuing to use pooledMap: Works for static arrays, but forces unnatural conversions (like async generators) for streams/events and lacks introspection methods like getPendingCount().
  • Using raw arrays with Promise.all: Does not limit concurrency, which can overwhelm system resources or hit rate limits.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions