Skip to content

Commit 906d086

Browse files
Refine TaskQueue pauseAll to support granular pausing (#564)
* Refine TaskQueue pauseAll/resumeAll to support specific tasks and groups (#563) - Updated TaskQueue interface to accept optional `tasks` and `group` parameters in `pauseAll` and `resumeAll`. - Implemented granular pause logic in `MemoryTaskQueue` using a `_pausedTaskIds` set to track specific paused tasks. - Updated `FileDownloader` to pass these parameters to the task queues. - Ensures backward compatibility: calling `pauseAll()` without arguments still performs a global pause. - Added unit tests in `test/task_queue_test.dart` to verify granular pause behavior.
1 parent d5e6d16 commit 906d086

File tree

3 files changed

+146
-12
lines changed

3 files changed

+146
-12
lines changed

lib/src/file_downloader.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ interface class FileDownloader {
687687
Future<List<DownloadTask>> pauseAll(
688688
{Iterable<DownloadTask>? tasks, String? group}) {
689689
for (final taskQueue in _downloader.taskQueues) {
690-
taskQueue.pauseAll();
690+
taskQueue.pauseAll(tasks: tasks, group: group);
691691
}
692692
return _downloader.pauseAll(tasks: tasks, group: group);
693693
}
@@ -729,7 +729,7 @@ interface class FileDownloader {
729729
await Future.delayed(interval);
730730
}
731731
for (final taskQueue in _downloader.taskQueues) {
732-
taskQueue.resumeAll();
732+
taskQueue.resumeAll(tasks: tasks, group: group);
733733
}
734734
return results;
735735
}

lib/src/queue/task_queue.dart

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,24 @@ abstract interface class TaskQueue {
1212
/// Signals that [task] has finished
1313
void taskFinished(Task task);
1414

15-
/// Pauses all task processing in the queue.
15+
/// Pauses task processing in the queue.
16+
///
17+
/// If [tasks] or [group] are provided, pauses only those tasks.
18+
/// If both are null, pauses all tasks.
1619
///
1720
/// Default implementation is a no-op to ensure backwards compatibility
1821
/// with subclasses that don't override this method
19-
Future<void> pauseAll() async {}
22+
Future<void> pauseAll({Iterable<DownloadTask>? tasks, String? group}) async {}
2023

21-
/// Resumes all task processing in the queue
24+
/// Resumes task processing in the queue
25+
///
26+
/// If [tasks] or [group] are provided, resumes only those tasks.
27+
/// If both are null, resumes all tasks.
2228
///
2329
/// Default implementation is a no-op to ensure backwards compatibility
2430
/// with subclasses that don't override this method
25-
Future<void> resumeAll() async {}
31+
Future<void> resumeAll(
32+
{Iterable<DownloadTask>? tasks, String? group}) async {}
2633
}
2734

2835
/// TaskQueue that holds all information in memory
@@ -56,19 +63,49 @@ class MemoryTaskQueue implements TaskQueue {
5663
final _enqueueErrorsStreamController = StreamController<Task>();
5764

5865
var _paused = false;
66+
final _pausedTaskIds = <String>{};
5967

6068
MemoryTaskQueue() {
6169
_readyForEnqueue.complete();
6270
}
6371

6472
@override
65-
Future<void> pauseAll() async {
66-
_paused = true;
73+
Future<void> pauseAll({Iterable<DownloadTask>? tasks, String? group}) async {
74+
if (tasks == null && group == null) {
75+
_paused = true;
76+
} else {
77+
// pause specific tasks/groups
78+
if (group != null) {
79+
final tasksToPause = waiting.unorderedElements
80+
.where((task) => task.group == group && task is DownloadTask);
81+
_pausedTaskIds.addAll(tasksToPause.map((e) => e.taskId));
82+
}
83+
if (tasks != null) {
84+
_pausedTaskIds.addAll(tasks.map((e) => e.taskId));
85+
}
86+
}
6787
}
6888

6989
@override
70-
Future<void> resumeAll() async {
71-
_paused = false;
90+
Future<void> resumeAll({Iterable<DownloadTask>? tasks, String? group}) async {
91+
if (tasks == null && group == null) {
92+
_paused = false;
93+
_pausedTaskIds.clear();
94+
} else {
95+
// resume specific tasks/groups
96+
if (group != null) {
97+
final tasksToResume = waiting.unorderedElements
98+
.where((task) => task.group == group && task is DownloadTask);
99+
for (final task in tasksToResume) {
100+
_pausedTaskIds.remove(task.taskId);
101+
}
102+
}
103+
if (tasks != null) {
104+
for (final task in tasks) {
105+
_pausedTaskIds.remove(task.taskId);
106+
}
107+
}
108+
}
72109
advanceQueue();
73110
}
74111

@@ -171,6 +208,10 @@ class MemoryTaskQueue implements TaskQueue {
171208
final tasksThatHaveToWait = <Task>[];
172209
while (waiting.isNotEmpty) {
173210
var task = waiting.removeFirst();
211+
if (_pausedTaskIds.contains(task.taskId)) {
212+
tasksThatHaveToWait.add(task);
213+
continue;
214+
}
174215
if (numActiveWithHostname(task.hostName) < maxConcurrentByHost &&
175216
numActiveWithGroup(task.group) < maxConcurrentByGroup) {
176217
waiting.addAll(tasksThatHaveToWait); // put back in queue

test/task_queue_test.dart

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ const workingUrl = 'https://google.com';
3131

3232
void main() {
3333
WidgetsFlutterBinding.ensureInitialized();
34-
final tq = TestTaskQueue();
34+
late TestTaskQueue tq;
3535

3636
var task = DownloadTask(url: 'testUrl');
3737

3838
setUp(() {
39+
tq = TestTaskQueue();
3940
tq.probFailure = 0;
4041
tq.maxConcurrent = 10000000;
4142
tq.minInterval = const Duration(milliseconds: 550);
42-
tq.reset();
4343
});
4444

4545
group('Add to queue', () {
@@ -215,4 +215,97 @@ void main() {
215215
expect(errorCount, greaterThan(3));
216216
});
217217
});
218+
219+
group('Pause and Resume', () {
220+
test('pauseAll global', () async {
221+
tq.maxConcurrent = 2;
222+
for (var n = 0; n < 10; n++) {
223+
task = DownloadTask(taskId: '$n', url: 'testUrl');
224+
tq.add(task);
225+
}
226+
expect(tq.waiting.length, equals(9));
227+
228+
// Initial processing
229+
await Future.delayed(const Duration(seconds: 1));
230+
// Should have dequeued 2 tasks
231+
expect(tq.waiting.length, lessThan(10));
232+
expect(tq.numActive, equals(2));
233+
234+
await tq.pauseAll();
235+
236+
// Wait a bit - queue should not advance
237+
final waitingBefore = tq.waiting.length;
238+
await Future.delayed(const Duration(seconds: 6)); // wait for active tasks to finish
239+
// Active tasks finish, but no new tasks should be dequeued
240+
expect(tq.waiting.length, equals(waitingBefore));
241+
expect(tq.numActive, equals(0));
242+
243+
await tq.resumeAll();
244+
await Future.delayed(const Duration(seconds: 1));
245+
expect(tq.numActive, equals(2));
246+
});
247+
248+
test('pauseAll by group', () async {
249+
tq.maxConcurrent = 4;
250+
for (var n = 0; n < 5; n++) {
251+
task = DownloadTask(taskId: 'A-$n', url: 'testUrl', group: 'A');
252+
tq.add(task);
253+
}
254+
for (var n = 0; n < 5; n++) {
255+
task = DownloadTask(taskId: 'B-$n', url: 'testUrl', group: 'B');
256+
tq.add(task);
257+
}
258+
259+
// Pause group A immediately
260+
await tq.pauseAll(group: 'A');
261+
262+
// Wait for queue to process
263+
await Future.delayed(const Duration(seconds: 2));
264+
265+
// Tasks from group A should be waiting, group B should be active
266+
expect(tq.numActiveWithGroup('B'), greaterThan(0));
267+
expect(tq.numActiveWithGroup('A'), lessThan(2)); // A-0 may be active
268+
269+
// Wait for B to finish
270+
await Future.delayed(const Duration(seconds: 10));
271+
expect(tq.numActiveWithGroup('B'), equals(0));
272+
273+
// A should still be waiting
274+
expect(tq.numWaitingWithGroup('A'), equals(4));
275+
276+
await tq.resumeAll(group: 'A');
277+
await Future.delayed(const Duration(seconds: 1));
278+
expect(tq.numActiveWithGroup('A'), greaterThan(0));
279+
});
280+
281+
test('pauseAll by specific tasks', () async {
282+
tq.maxConcurrent = 5;
283+
final tasksA = <DownloadTask>[];
284+
final tasksB = <DownloadTask>[];
285+
286+
for (var n = 0; n < 5; n++) {
287+
var t = DownloadTask(taskId: 'A-$n', url: 'testUrl', group: 'A');
288+
tasksA.add(t);
289+
tq.add(t);
290+
}
291+
for (var n = 0; n < 5; n++) {
292+
var t = DownloadTask(taskId: 'B-$n', url: 'testUrl', group: 'B');
293+
tasksB.add(t);
294+
tq.add(t);
295+
}
296+
297+
// Pause specific tasks from group A
298+
await tq.pauseAll(tasks: tasksA);
299+
300+
await Future.delayed(const Duration(seconds: 2));
301+
302+
// Group B should be running, Group A should be paused
303+
expect(tq.numActiveWithGroup('B'), greaterThan(0));
304+
expect(tq.numActiveWithGroup('A'), lessThan(2)); // A-0 may be active
305+
306+
await tq.resumeAll(tasks: tasksA);
307+
await Future.delayed(const Duration(seconds: 2));
308+
expect(tq.numActiveWithGroup('A'), greaterThan(0));
309+
});
310+
});
218311
}

0 commit comments

Comments
 (0)