Skip to content
13 changes: 9 additions & 4 deletions .github/workflows/pool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,20 @@ jobs:
- uses: dart-lang/setup-dart@65eb853c7ba17dde3be364c3d2858773e7144260
with:
sdk: ${{ matrix.sdk }}
# Node 22 has wasmGC enabled, which allows the wasm tests to run!
- name: Setup Node.js 22
uses: actions/setup-node@v6
with:
node-version: 22
- id: install
name: Install dependencies
run: dart pub get
- name: Run VM tests
run: dart test --platform vm
if: always() && steps.install.outcome == 'success'
- name: Run Chrome tests
run: dart test --platform chrome
run: dart test --platform chrome --compiler dart2js,dart2wasm
if: always() && steps.install.outcome == 'success'
- name: Run node tests
run: dart test --platform node --compiler dart2js,dart2wasm
if: always() && steps.install.outcome == 'success'
- name: Run Chrome tests - wasm
run: dart test --platform chrome -c dart2wasm
if: always() && steps.install.outcome == 'success' && matrix.sdk == 'dev'
5 changes: 5 additions & 0 deletions pkgs/pool/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.5.3

* Fix `Pool.forEach` to ensure all workers complete before the stream is closed, even on error or cancellation.
* Added an example.
Comment thread
kevmoo marked this conversation as resolved.

## 1.5.2

* Require Dart 3.4.
Expand Down
31 changes: 31 additions & 0 deletions pkgs/pool/example/example.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2026, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:pool/pool.dart';

void main() async {
// Create a pool that allows at most 3 concurrent resources.
final pool = Pool(3);

print('Starting tasks with pool size of 3...');

// Use pool.forEach to process a list of items concurrently.
// This is useful for limiting concurrent network requests or file I/O.
final items = List.generate(10, (i) => i);

await for (final result in pool.forEach(items, (item) async {
print(' [Start] Processing item $item');
// Simulate some async work like a network request.
await Future<void>.delayed(const Duration(milliseconds: 100));
print(' [Done] Processing item $item');
return 'Result for $item';
})) {
print('Processed: $result');
}
Comment thread
kevmoo marked this conversation as resolved.

print('All tasks completed!');

// Close the pool.
await pool.close();
}
42 changes: 29 additions & 13 deletions pkgs/pool/lib/pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Pool {
/// allocated.
///
/// See [PoolResource.allowRelease].
final _onReleaseCallbacks = Queue<void Function()>();
final _onReleaseCallbacks = Queue<FutureOr<void> Function()>();

/// Completers that will be completed once `onRelease` callbacks are done
/// running.
Expand Down Expand Up @@ -153,7 +153,7 @@ class Pool {
Stream<T> forEach<S, T>(
Iterable<S> elements, FutureOr<T> Function(S source) action,
{bool Function(S item, Object error, StackTrace stack)? onError}) {
onError ??= (item, e, s) => true;
final errorHandler = onError ?? (item, e, s) => true;

var cancelPending = false;

Expand All @@ -163,7 +163,7 @@ class Pool {
late Iterator<S> iterator;

Future<void> run(int _) async {
while (iterator.moveNext()) {
while (!cancelPending && iterator.moveNext()) {
// caching `current` is necessary because there are async breaks
// in this code and `iterator` is shared across many workers
final current = iterator.current;
Expand All @@ -182,7 +182,7 @@ class Pool {
try {
value = await action(current);
} catch (e, stack) {
if (onError!(current, e, stack)) {
if (errorHandler(current, e, stack)) {
controller.addError(e, stack);
}
continue;
Expand All @@ -197,11 +197,24 @@ class Pool {
iterator = elements.iterator;

assert(doneFuture == null);
var futures = Iterable<Future<void>>.generate(
var futures = List<Future<void>>.generate(
_maxAllocatedResources, (i) => withResource(() => run(i)));
doneFuture = Future.wait(futures, eagerError: true)

// Eagerly forward errors to the stream and trigger cancellation.
Future.wait(futures, eagerError: true)
.onError((Object error, StackTrace stack) {
cancelPending = true;
controller.addError(error, stack);
return <void>[];
});

// Wait for all work to actually complete before closing the stream.
doneFuture = Future.wait(futures, eagerError: false)
.then<void>((_) {})
.catchError(controller.addError);
.catchError((Object e) {
// We handle errors in the eager wait above, so we can ignore them here
// to avoid unhandled exceptions.
});

doneFuture!.whenComplete(controller.close);
}
Expand All @@ -210,8 +223,9 @@ class Pool {
sync: true,
onListen: onListen,
onCancel: () async {
assert(!cancelPending);
cancelPending = true;
resumeCompleter?.complete();
resumeCompleter = null;
await doneFuture;
},
Comment thread
kevmoo marked this conversation as resolved.
onPause: () {
Expand Down Expand Up @@ -275,7 +289,7 @@ class Pool {

/// If there are any pending requests, this will fire the oldest one after
/// running [onRelease].
void _onResourceReleaseAllowed(void Function() onRelease) {
void _onResourceReleaseAllowed(FutureOr<void> Function() onRelease) {
_resetTimer();

if (_requestedResources.isNotEmpty) {
Expand All @@ -297,15 +311,17 @@ class Pool {
///
/// Futures returned by [_runOnRelease] always complete in the order they were
/// created, even if earlier [onRelease] callbacks take longer to run.
Future<PoolResource> _runOnRelease(void Function() onRelease) {
Future<PoolResource> _runOnRelease(FutureOr<void> Function() onRelease) {
var completer = Completer<PoolResource>.sync();
_onReleaseCompleters.add(completer);

Future.sync(onRelease).then((value) {
_onReleaseCompleters.removeFirst().complete(PoolResource._(this));
}).catchError((Object error, StackTrace stackTrace) {
}).onError((Object error, StackTrace stackTrace) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to the better typed version!

_onReleaseCompleters.removeFirst().completeError(error, stackTrace);
_onResourceReleased();
});
Comment thread
kevmoo marked this conversation as resolved.

var completer = Completer<PoolResource>.sync();
_onReleaseCompleters.add(completer);
return completer.future;
}

Expand Down
2 changes: 1 addition & 1 deletion pkgs/pool/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: pool
version: 1.5.2
version: 1.5.3
description: >-
Manage a finite pool of resources. Useful for controlling concurrent file
system or network requests.
Expand Down
Loading
Loading