Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions .github/workflows/pool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,20 @@ jobs:
- uses: dart-lang/setup-dart@65eb853c7ba17dde3be364c3d2858773e7144260
with:
sdk: ${{ matrix.sdk }}
# Node 22 has wasmGC enabled, which allows the wasm tests to run!
- name: Setup Node.js 22
uses: actions/setup-node@v6
with:
node-version: 22
- id: install
name: Install dependencies
run: dart pub get
- name: Run VM tests
run: dart test --platform vm
if: always() && steps.install.outcome == 'success'
- name: Run Chrome tests
run: dart test --platform chrome
run: dart test --platform chrome --compiler dart2js,dart2wasm
if: always() && steps.install.outcome == 'success'
- name: Run node tests
run: dart test --platform node --compiler dart2js,dart2wasm
if: always() && steps.install.outcome == 'success'
- name: Run Chrome tests - wasm
run: dart test --platform chrome -c dart2wasm
if: always() && steps.install.outcome == 'success' && matrix.sdk == 'dev'
5 changes: 5 additions & 0 deletions pkgs/pool/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.5.3-wip

* Fix `Pool.forEach` to ensure all workers complete before the stream is closed, even on error or cancellation.
* Added an example.
Comment thread
kevmoo marked this conversation as resolved.

## 1.5.2

* Require Dart 3.4.
Expand Down
31 changes: 31 additions & 0 deletions pkgs/pool/example/example.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2026, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:pool/pool.dart';

void main() async {
// Create a pool that allows at most 3 concurrent resources.
final pool = Pool(3);

print('Starting tasks with pool size of 3...');

// Use pool.forEach to process a list of items concurrently.
// This is useful for limiting concurrent network requests or file I/O.
final items = List.generate(10, (i) => i);

await for (final result in pool.forEach(items, (item) async {
print(' [Start] Processing item $item');
// Simulate some async work like a network request.
await Future<void>.delayed(const Duration(milliseconds: 100));
print(' [Done] Processing item $item');
return 'Result for $item';
})) {
print('Processed: $result');
}
Comment thread
kevmoo marked this conversation as resolved.
Outdated

print('All tasks completed!');

// Close the pool.
await pool.close();
}
41 changes: 28 additions & 13 deletions pkgs/pool/lib/pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Pool {
/// allocated.
///
/// See [PoolResource.allowRelease].
final _onReleaseCallbacks = Queue<void Function()>();
final _onReleaseCallbacks = Queue<FutureOr<void> Function()>();

/// Completers that will be completed once `onRelease` callbacks are done
/// running.
Expand Down Expand Up @@ -153,7 +153,7 @@ class Pool {
Stream<T> forEach<S, T>(
Iterable<S> elements, FutureOr<T> Function(S source) action,
{bool Function(S item, Object error, StackTrace stack)? onError}) {
onError ??= (item, e, s) => true;
final errorHandler = onError ?? (item, e, s) => true;
Comment thread
kevmoo marked this conversation as resolved.

var cancelPending = false;

Expand All @@ -163,7 +163,7 @@ class Pool {
late Iterator<S> iterator;

Future<void> run(int _) async {
while (iterator.moveNext()) {
while (!cancelPending && iterator.moveNext()) {
// caching `current` is necessary because there are async breaks
// in this code and `iterator` is shared across many workers
final current = iterator.current;
Expand All @@ -182,7 +182,7 @@ class Pool {
try {
value = await action(current);
} catch (e, stack) {
if (onError!(current, e, stack)) {
if (errorHandler(current, e, stack)) {
controller.addError(e, stack);
}
continue;
Expand All @@ -197,11 +197,24 @@ class Pool {
iterator = elements.iterator;

assert(doneFuture == null);
var futures = Iterable<Future<void>>.generate(
var futures = List<Future<void>>.generate(
_maxAllocatedResources, (i) => withResource(() => run(i)));
doneFuture = Future.wait(futures, eagerError: true)

// Eagerly forward errors to the stream and trigger cancellation.
Future.wait(futures, eagerError: true)
.onError((Object error, StackTrace stack) {
cancelPending = true;
controller.addError(error, stack);
return <void>[];
});

// Wait for all work to actually complete before closing the stream.
Comment thread
kevmoo marked this conversation as resolved.
Outdated
doneFuture = Future.wait(futures, eagerError: false)
.then<void>((_) {})
.catchError(controller.addError);
.catchError((Object e) {
// We handle errors in the eager wait above, so we can ignore them here
// to avoid unhandled exceptions.
});

doneFuture!.whenComplete(controller.close);
}
Expand All @@ -210,8 +223,9 @@ class Pool {
sync: true,
onListen: onListen,
onCancel: () async {
assert(!cancelPending);
cancelPending = true;
resumeCompleter?.complete();
resumeCompleter = null;
await doneFuture;
},
Comment thread
kevmoo marked this conversation as resolved.
onPause: () {
Expand Down Expand Up @@ -275,7 +289,7 @@ class Pool {

/// If there are any pending requests, this will fire the oldest one after
/// running [onRelease].
void _onResourceReleaseAllowed(void Function() onRelease) {
void _onResourceReleaseAllowed(FutureOr<void> Function() onRelease) {
_resetTimer();

if (_requestedResources.isNotEmpty) {
Expand All @@ -297,15 +311,16 @@ class Pool {
///
/// Futures returned by [_runOnRelease] always complete in the order they were
/// created, even if earlier [onRelease] callbacks take longer to run.
Future<PoolResource> _runOnRelease(void Function() onRelease) {
Future<PoolResource> _runOnRelease(FutureOr<void> Function() onRelease) {
var completer = Completer<PoolResource>.sync();
_onReleaseCompleters.add(completer);

Future.sync(onRelease).then((value) {
_onReleaseCompleters.removeFirst().complete(PoolResource._(this));
}).catchError((Object error, StackTrace stackTrace) {
}).onError((Object error, StackTrace stackTrace) {
Comment thread
kevmoo marked this conversation as resolved.
Outdated
_onReleaseCompleters.removeFirst().completeError(error, stackTrace);
});
Comment thread
kevmoo marked this conversation as resolved.
Outdated

var completer = Completer<PoolResource>.sync();
_onReleaseCompleters.add(completer);
return completer.future;
}

Expand Down
2 changes: 1 addition & 1 deletion pkgs/pool/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: pool
version: 1.5.2
version: 1.5.3-wip
description: >-
Manage a finite pool of resources. Useful for controlling concurrent file
system or network requests.
Expand Down
Loading
Loading