Skip to content

Commit 9d56e26

Browse files
Fix out-of-order processing of native background messages (#561)
* fix: enforce serial processing of native messages Introduces `SerialJobQueue` to ensure `MethodChannel` messages from the native side are processed in order. This prevents race conditions where concurrent `compute` calls for JSON parsing could cause status updates to arrive out of sequence (e.g., 'running' after 'complete'). - Adds `lib/src/queue/serial_job_queue.dart`. - Updates `NativeDownloader` to route background channel messages through the queue. - Minor test change Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> Co-authored-by: bram <[email protected]>
1 parent 1afef5f commit 9d56e26

File tree

3 files changed

+235
-176
lines changed

3 files changed

+235
-176
lines changed

example/integration_test/parallel_download_test.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ void main() {
299299
expect(await FileDownloader().pause(task), isTrue);
300300
await Future.delayed(const Duration(seconds: 1));
301301
expect(lastStatus, equals(TaskStatus.paused));
302+
await Future.delayed(const Duration(seconds: 1));
302303
expect(lastProgress, equals(progressPaused));
303304
await Future.delayed(const Duration(seconds: 2));
304305
expect(await FileDownloader().resume(task), isTrue);

lib/src/native_downloader.dart

Lines changed: 180 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import 'exceptions.dart';
1313
import 'file_downloader.dart';
1414
import 'models.dart';
1515
import 'permissions.dart';
16+
import 'queue/serial_job_queue.dart';
1617
import 'task.dart';
1718

1819
/// Implementation of download functionality for native platforms
@@ -24,6 +25,8 @@ abstract base class NativeDownloader extends BaseDownloader {
2425
static const _backgroundChannel =
2526
MethodChannel('com.bbflight.background_downloader.background');
2627

28+
late final SerialJobQueue<MethodCall, dynamic> _jobQueue;
29+
2730
/// Initializes the background channel and starts listening for messages from
2831
/// the native side
2932
@override
@@ -32,190 +35,191 @@ abstract base class NativeDownloader extends BaseDownloader {
3235
WidgetsFlutterBinding.ensureInitialized();
3336
// listen to the background channel, receiving updates on download status
3437
// or progress.
35-
// First argument is the Task as JSON string, next argument(s) depends
36-
// on the method.
37-
//
38-
// If the task JsonString is empty, a dummy task will be created
39-
_backgroundChannel.setMethodCallHandler((call) async {
40-
final args = call.arguments as List<dynamic>;
41-
var taskJsonString = args.first as String;
42-
final task = taskJsonString.isNotEmpty
43-
? await compute(_taskFromJson, taskJsonString)
44-
: DownloadTask(url: 'url');
45-
final message = (
46-
call.method,
47-
args.length > 2
48-
? args.getRange(1, args.length).toList(growable: false)
49-
: args[1]
50-
);
51-
switch (message) {
52-
// simple status update
53-
case ('statusUpdate', int statusOrdinal):
54-
final status = TaskStatus.values[statusOrdinal];
55-
if (task.group != BaseDownloader.chunkGroup) {
56-
processStatusUpdate(TaskStatusUpdate(task, status));
57-
} else {
58-
// this is a chunk task, so pass to native
59-
Future.delayed(const Duration(milliseconds: 100)).then((_) =>
60-
methodChannel.invokeMethod('chunkStatusUpdate', [
61-
Chunk.getParentTaskId(task),
62-
task.taskId,
63-
status.index,
64-
null,
65-
null
66-
]));
67-
}
38+
// The job queue ensures that messages are processed in order, even though
39+
// the processing itself is asynchronous (using [compute])
40+
_jobQueue = SerialJobQueue(_handleBackgroundMessage);
41+
_backgroundChannel.setMethodCallHandler(_jobQueue.add);
42+
}
6843

69-
// status update with responseBody, responseHeaders, responseStatusCode, mimeType and charSet (normal completion)
70-
case (
71-
'statusUpdate',
72-
[
73-
int statusOrdinal,
74-
String? responseBody,
75-
Map<Object?, Object?>? responseHeaders,
76-
int? responseStatusCode,
77-
String? mimeType,
78-
String? charSet
79-
]
80-
):
81-
final status = TaskStatus.values[statusOrdinal];
82-
if (task.group != BaseDownloader.chunkGroup) {
83-
final Map<String, String>? cleanResponseHeaders = responseHeaders ==
84-
null
85-
? null
86-
: {
87-
for (var entry in responseHeaders.entries.where(
88-
(entry) => entry.key != null && entry.value != null))
89-
entry.key.toString().toLowerCase(): entry.value.toString()
90-
};
91-
processStatusUpdate(TaskStatusUpdate(
92-
task,
93-
status,
44+
/// Handles the background message
45+
///
46+
/// First argument is the Task as JSON string, next argument(s) depends
47+
/// on the method.
48+
///
49+
/// If the task JsonString is empty, a dummy task will be created
50+
Future<dynamic> _handleBackgroundMessage(MethodCall call) async {
51+
final args = call.arguments as List<dynamic>;
52+
var taskJsonString = args.first as String;
53+
final task = taskJsonString.isNotEmpty
54+
? await compute(_taskFromJson, taskJsonString)
55+
: DownloadTask(url: 'url');
56+
final message = (
57+
call.method,
58+
args.length > 2
59+
? args.getRange(1, args.length).toList(growable: false)
60+
: args[1]
61+
);
62+
switch (message) {
63+
// simple status update
64+
case ('statusUpdate', int statusOrdinal):
65+
final status = TaskStatus.values[statusOrdinal];
66+
if (task.group != BaseDownloader.chunkGroup) {
67+
processStatusUpdate(TaskStatusUpdate(task, status));
68+
} else {
69+
// this is a chunk task, so pass to native
70+
Future.delayed(const Duration(milliseconds: 100)).then((_) =>
71+
methodChannel.invokeMethod('chunkStatusUpdate', [
72+
Chunk.getParentTaskId(task),
73+
task.taskId,
74+
status.index,
9475
null,
95-
responseBody,
96-
cleanResponseHeaders,
97-
responseStatusCode,
98-
mimeType,
99-
charSet));
100-
} else {
101-
// this is a chunk task, so pass to native
102-
Future.delayed(const Duration(milliseconds: 100)).then((_) =>
103-
methodChannel.invokeMethod('chunkStatusUpdate', [
104-
Chunk.getParentTaskId(task),
105-
task.taskId,
106-
status.index,
107-
null,
108-
responseBody
109-
]));
110-
}
111-
112-
// status update with TaskException and responseBody
113-
case (
114-
'statusUpdate',
115-
[
116-
int statusOrdinal,
117-
String typeString,
118-
String description,
119-
int httpResponseCode,
120-
String? responseBody
121-
]
122-
):
123-
final status = TaskStatus.values[statusOrdinal];
124-
TaskException? exception;
125-
if (status == TaskStatus.failed) {
126-
exception = TaskException.fromTypeString(
127-
typeString, description, httpResponseCode);
128-
}
129-
if (task.group != BaseDownloader.chunkGroup) {
130-
processStatusUpdate(
131-
TaskStatusUpdate(task, status, exception, responseBody));
132-
} else {
133-
// this is a chunk task, so pass to native
134-
Future.delayed(const Duration(milliseconds: 100))
135-
.then((_) => methodChannel.invokeMethod('chunkStatusUpdate', [
136-
Chunk.getParentTaskId(task),
137-
task.taskId,
138-
status.index,
139-
exception?.toJsonString(),
140-
responseBody
141-
]));
142-
}
143-
144-
case (
145-
'progressUpdate',
146-
[
147-
double progress,
148-
int expectedFileSize,
149-
double networkSpeed,
150-
int timeRemaining
151-
]
152-
):
153-
if (task.group != BaseDownloader.chunkGroup) {
154-
processProgressUpdate(TaskProgressUpdate(
155-
task,
156-
progress,
157-
expectedFileSize,
158-
networkSpeed,
159-
Duration(milliseconds: timeRemaining)));
160-
} else {
161-
// this is a chunk task, so pass parent taskId,
162-
// chunk taskId and progress to native
163-
Future.delayed(const Duration(milliseconds: 100)).then((_) =>
164-
methodChannel.invokeMethod('chunkProgressUpdate',
165-
[Chunk.getParentTaskId(task), task.taskId, progress]));
166-
}
76+
null
77+
]));
78+
}
16779

168-
case ('canResume', bool canResume):
169-
setCanResume(task, canResume);
80+
// status update with responseBody, responseHeaders, responseStatusCode, mimeType and charSet (normal completion)
81+
case (
82+
'statusUpdate',
83+
[
84+
int statusOrdinal,
85+
String? responseBody,
86+
Map<Object?, Object?>? responseHeaders,
87+
int? responseStatusCode,
88+
String? mimeType,
89+
String? charSet
90+
]
91+
):
92+
final status = TaskStatus.values[statusOrdinal];
93+
if (task.group != BaseDownloader.chunkGroup) {
94+
final Map<String, String>? cleanResponseHeaders =
95+
responseHeaders == null
96+
? null
97+
: {
98+
for (var entry in responseHeaders.entries.where(
99+
(entry) => entry.key != null && entry.value != null))
100+
entry.key.toString().toLowerCase():
101+
entry.value.toString()
102+
};
103+
processStatusUpdate(TaskStatusUpdate(task, status, null, responseBody,
104+
cleanResponseHeaders, responseStatusCode, mimeType, charSet));
105+
} else {
106+
// this is a chunk task, so pass to native
107+
Future.delayed(const Duration(milliseconds: 100)).then((_) =>
108+
methodChannel.invokeMethod('chunkStatusUpdate', [
109+
Chunk.getParentTaskId(task),
110+
task.taskId,
111+
status.index,
112+
null,
113+
responseBody
114+
]));
115+
}
170116

171-
// resumeData Android and Desktop variant
172-
case ('resumeData', [String data, int requiredStartByte, String? eTag]):
173-
setResumeData(ResumeData(task, data, requiredStartByte, eTag));
117+
// status update with TaskException and responseBody
118+
case (
119+
'statusUpdate',
120+
[
121+
int statusOrdinal,
122+
String typeString,
123+
String description,
124+
int httpResponseCode,
125+
String? responseBody
126+
]
127+
):
128+
final status = TaskStatus.values[statusOrdinal];
129+
TaskException? exception;
130+
if (status == TaskStatus.failed) {
131+
exception = TaskException.fromTypeString(
132+
typeString, description, httpResponseCode);
133+
}
134+
if (task.group != BaseDownloader.chunkGroup) {
135+
processStatusUpdate(
136+
TaskStatusUpdate(task, status, exception, responseBody));
137+
} else {
138+
// this is a chunk task, so pass to native
139+
Future.delayed(const Duration(milliseconds: 100))
140+
.then((_) => methodChannel.invokeMethod('chunkStatusUpdate', [
141+
Chunk.getParentTaskId(task),
142+
task.taskId,
143+
status.index,
144+
exception?.toJsonString(),
145+
responseBody
146+
]));
147+
}
174148

175-
// resumeData iOS and ParallelDownloads variant
176-
case ('resumeData', String data):
177-
setResumeData(ResumeData(task, data));
149+
case (
150+
'progressUpdate',
151+
[
152+
double progress,
153+
int expectedFileSize,
154+
double networkSpeed,
155+
int timeRemaining
156+
]
157+
):
158+
if (task.group != BaseDownloader.chunkGroup) {
159+
processProgressUpdate(TaskProgressUpdate(
160+
task,
161+
progress,
162+
expectedFileSize,
163+
networkSpeed,
164+
Duration(milliseconds: timeRemaining)));
165+
} else {
166+
// this is a chunk task, so pass parent taskId,
167+
// chunk taskId and progress to native
168+
Future.delayed(const Duration(milliseconds: 100)).then((_) =>
169+
methodChannel.invokeMethod('chunkProgressUpdate',
170+
[Chunk.getParentTaskId(task), task.taskId, progress]));
171+
}
178172

179-
case ('notificationTap', int notificationTypeOrdinal):
180-
final notificationType =
181-
NotificationType.values[notificationTypeOrdinal];
182-
processNotificationTap(task, notificationType);
183-
return true; // this message requires a confirmation
173+
case ('canResume', bool canResume):
174+
setCanResume(task, canResume);
175+
176+
// resumeData Android and Desktop variant
177+
case ('resumeData', [String data, int requiredStartByte, String? eTag]):
178+
setResumeData(ResumeData(task, data, requiredStartByte, eTag));
179+
180+
// resumeData iOS and ParallelDownloads variant
181+
case ('resumeData', String data):
182+
setResumeData(ResumeData(task, data));
183+
184+
case ('notificationTap', int notificationTypeOrdinal):
185+
final notificationType =
186+
NotificationType.values[notificationTypeOrdinal];
187+
processNotificationTap(task, notificationType);
188+
return true; // this message requires a confirmation
189+
190+
// from ParallelDownloadTask
191+
case ('enqueueChild', String childTaskJsonString):
192+
final childTask = await compute(_taskFromJson, childTaskJsonString);
193+
Future.delayed(const Duration(milliseconds: 100))
194+
.then((_) => FileDownloader().enqueue(childTask));
195+
196+
// from ParallelDownloadTask
197+
case ('cancelTasksWithId', String listOfTaskIdsJson):
198+
final taskIds = List<String>.from(jsonDecode(listOfTaskIdsJson));
199+
Future.delayed(const Duration(milliseconds: 100))
200+
.then((_) => FileDownloader().cancelTasksWithIds(taskIds));
201+
202+
// from ParallelDownloadTask
203+
case ('pauseTasks', String listOfTasksJson):
204+
final listOfTasks =
205+
await compute(_downloadTaskListFromJson, listOfTasksJson);
206+
Future.delayed(const Duration(milliseconds: 100)).then((_) async {
207+
for (final chunkTask in listOfTasks) {
208+
await FileDownloader().pause(chunkTask);
209+
}
210+
});
184211

185-
// from ParallelDownloadTask
186-
case ('enqueueChild', String childTaskJsonString):
187-
final childTask = await compute(_taskFromJson, childTaskJsonString);
188-
Future.delayed(const Duration(milliseconds: 100))
189-
.then((_) => FileDownloader().enqueue(childTask));
212+
// for permission request results
213+
case ('permissionRequestResult', int statusOrdinal):
214+
permissionsService
215+
.onPermissionRequestResult(PermissionStatus.values[statusOrdinal]);
190216

191-
// from ParallelDownloadTask
192-
case ('cancelTasksWithId', String listOfTaskIdsJson):
193-
final taskIds = List<String>.from(jsonDecode(listOfTaskIdsJson));
194-
Future.delayed(const Duration(milliseconds: 100))
195-
.then((_) => FileDownloader().cancelTasksWithIds(taskIds));
196-
197-
// from ParallelDownloadTask
198-
case ('pauseTasks', String listOfTasksJson):
199-
final listOfTasks =
200-
await compute(_downloadTaskListFromJson, listOfTasksJson);
201-
Future.delayed(const Duration(milliseconds: 100)).then((_) async {
202-
for (final chunkTask in listOfTasks) {
203-
await FileDownloader().pause(chunkTask);
204-
}
205-
});
206-
207-
// for permission request results
208-
case ('permissionRequestResult', int statusOrdinal):
209-
permissionsService.onPermissionRequestResult(
210-
PermissionStatus.values[statusOrdinal]);
211-
212-
default:
213-
log.warning('Background channel: no match for message $message');
214-
throw ArgumentError(
215-
'Background channel: no match for message $message');
216-
}
217-
return true;
218-
});
217+
default:
218+
log.warning('Background channel: no match for message $message');
219+
throw ArgumentError(
220+
'Background channel: no match for message $message');
221+
}
222+
return true;
219223
}
220224

221225
@override

0 commit comments

Comments
 (0)