Skip to content

Commit b3e9b0d

Browse files
Allow specifying the number of tasks to retrieve for a client (#385)
* Allow specifying the number of tasks to retrieve for a client in ClientGetTask method * Maintain compatibility for older clients by limiting task retrieval to one for versions 1.0 and below * Update Unregister method to unassign tasks before client removal; add SQL schema change for Publisher column * Add Verifying status to task enumeration; update SQL schema for Signatures_Roms * Update Unregister method to unassign tasks based on multiple current statuses
1 parent a4d7e0e commit b3e9b0d

File tree

5 files changed

+79
-21
lines changed

5 files changed

+79
-21
lines changed

hasheous-lib/Classes/TaskManagement.cs

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -258,21 +258,42 @@ public async static Task UpdateClient(string clientAPIKey, string publicId, stri
258258
}
259259

260260
/// <summary>
261-
/// Retrieves the next available task for the specified client, or the currently assigned one if it exists.
261+
/// Retrieves the next available tasks for the specified client. Returns unassigned pending tasks and any tasks already assigned to this client in Assigned or InProgress status.
262262
/// </summary>
263263
/// <param name="clientAPIKey">The API key of the client.</param>
264264
/// <param name="publicId">The public client ID.</param>
265-
public async static Task<QueueItemModel?> ClientGetTask(string clientAPIKey, string publicId)
265+
/// <param name="numberOfTasks">The number of tasks to retrieve (optional).</param>
266+
public async static Task<object?> ClientGetTask(string clientAPIKey, string publicId, int numberOfTasks = 1)
266267
{
268+
if (numberOfTasks <= 0)
269+
{
270+
numberOfTasks = 1; // default to 1 task if invalid number provided
271+
}
272+
else if (numberOfTasks > 20)
273+
{
274+
numberOfTasks = 20; // cap at 20 tasks to prevent overload
275+
}
276+
267277
ClientModel? client = await GetClientByAPIKeyAndPublicId(clientAPIKey, publicId);
268278
if (client == null)
269279
{
270280
throw new Exception("Invalid client API key or public ID.");
271281
}
272282

283+
// clients older than 1.1.0 expect a single task back
284+
if (client.ClientVersion != null)
285+
{
286+
Version clientVersion = new Version(client.ClientVersion);
287+
if (clientVersion.Major == 1 && clientVersion.Minor == 0)
288+
{
289+
numberOfTasks = 1;
290+
}
291+
}
292+
273293
// Use a transaction with SELECT FOR UPDATE SKIP LOCKED to prevent race conditions
274294
// when multiple clients request jobs simultaneously.
275295
// The FOR UPDATE SKIP LOCKED ensures that each concurrent client gets a different task.
296+
// Selects: unassigned pending tasks (status=0, client_id IS NULL) OR tasks assigned to this client in Assigned/InProgress status (status=10/20, client_id=@client_id)
276297
DateTime now = DateTime.UtcNow;
277298

278299
// Step 1: SELECT with FOR UPDATE SKIP LOCKED (locks the row)
@@ -281,8 +302,7 @@ public async static Task UpdateClient(string clientAPIKey, string publicId, stri
281302
string selectSql = @"SELECT tq.id AS id, tq.create_time AS create_time, tq.dataobjectid AS dataobjectid, tq.task_name AS task_name, tq.status AS status, tq.client_id AS client_id, tq.parameters AS parameters, tq.result AS result, tq.error_message AS error_message, tq.start_time AS start_time, tq.completion_time AS completion_time
282303
FROM Task_Queue tq
283304
LEFT JOIN Task_Queue_Capabilities tqc ON tq.id = tqc.task_queue_id
284-
WHERE tq.status = 0
285-
AND (tq.client_id IS NULL OR tq.client_id = @client_id)
305+
WHERE ((tq.status = 0 AND tq.client_id IS NULL) OR ((tq.status = 10 OR tq.status = 20) AND tq.client_id = @client_id))
286306
AND NOT EXISTS (
287307
SELECT 1
288308
FROM Task_Queue_Capabilities tqc_required
@@ -291,7 +311,7 @@ AND tqc_required.capability_id NOT IN (" + string.Join(", ", client.Capabilities
291311
)
292312
GROUP BY tq.id
293313
ORDER BY tq.create_time ASC
294-
LIMIT 1
314+
LIMIT " + numberOfTasks + @"
295315
FOR UPDATE SKIP LOCKED;";
296316

297317
// UPDATE using the same logic to identify the task
@@ -308,8 +328,7 @@ SELECT id FROM (
308328
SELECT tq.id
309329
FROM Task_Queue tq
310330
LEFT JOIN Task_Queue_Capabilities tqc ON tq.id = tqc.task_queue_id
311-
WHERE tq.status = 0
312-
AND (tq.client_id IS NULL OR tq.client_id = @client_id)
331+
WHERE ((tq.status = 0 AND tq.client_id IS NULL) OR ((tq.status = 10 OR tq.status = 20) AND tq.client_id = @client_id))
313332
AND NOT EXISTS (
314333
SELECT 1
315334
FROM Task_Queue_Capabilities tqc_required
@@ -318,7 +337,7 @@ AND tqc_required.capability_id NOT IN (" + string.Join(", ", client.Capabilities
318337
)
319338
GROUP BY tq.id
320339
ORDER BY tq.create_time ASC
321-
LIMIT 1
340+
LIMIT " + numberOfTasks + @"
322341
FOR UPDATE SKIP LOCKED
323342
) AS subquery
324343
);";
@@ -348,18 +367,34 @@ FOR UPDATE SKIP LOCKED
348367
return null;
349368
}
350369

351-
QueueItemModel task = new QueueItemModel(dt.Rows[0]);
370+
List<QueueItemModel> tasks = new List<QueueItemModel>();
371+
foreach (DataRow row in dt.Rows)
372+
{
373+
QueueItemModel task = new QueueItemModel(row);
374+
375+
// The task has already been updated by the transaction
376+
// Update the local object to reflect the database state
377+
task.ClientId = client.Id;
378+
task.Status = QueueItemStatus.Assigned;
379+
task.StartedAt = now;
380+
task.CompletedAt = now;
381+
task.Result = "";
382+
task.ErrorMessage = "";
383+
384+
// clients older than 1.1.0 expect a single task back
385+
if (client.ClientVersion != null)
386+
{
387+
Version clientVersion = new Version(client.ClientVersion);
388+
if (clientVersion.Major == 1 && clientVersion.Minor == 0)
389+
{
390+
return task; // only return the first task for older clients
391+
}
392+
}
352393

353-
// The task has already been updated by the transaction
354-
// Update the local object to reflect the database state
355-
task.ClientId = client.Id;
356-
task.Status = QueueItemStatus.Assigned;
357-
task.StartedAt = now;
358-
task.CompletedAt = now;
359-
task.Result = "";
360-
task.ErrorMessage = "";
394+
tasks.Add(task);
395+
}
361396

362-
return task;
397+
return tasks;
363398
}
364399

365400
/// <summary>

hasheous-lib/Models/TaskClientModel.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,18 @@ public ClientModel Refresh(DataRow? row = null)
250250
/// </summary>
251251
public async Task Unregister()
252252
{
253+
// update the database to unassign any tasks currently assigned to this client
254+
await Config.database.ExecuteCMDAsync("UPDATE Task_Queue SET client_id = NULL, status = @status WHERE client_id = @id AND status IN (@current_status1, @current_status2, @current_status3, @current_status4, @current_status5)", new Dictionary<string, object>
255+
{
256+
{ "@id", this.Id },
257+
{ "@status", QueueItemStatus.Pending },
258+
{ "@current_status1", QueueItemStatus.Pending },
259+
{ "@current_status2", QueueItemStatus.Assigned },
260+
{ "@current_status3", QueueItemStatus.Verifying },
261+
{ "@current_status4", QueueItemStatus.Failed },
262+
{ "@current_status5", QueueItemStatus.Cancelled }
263+
});
264+
253265
// update the database to remove this client
254266
await Config.database.ExecuteCMDAsync("DELETE FROM Task_Clients WHERE id = @id", new Dictionary<string, object>
255267
{ { "@id", this.Id } });

hasheous-lib/Models/TaskQueueModel.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,11 @@ public enum QueueItemStatus
268268
/// <summary>
269269
/// The task has been cancelled.
270270
/// </summary>
271-
Cancelled = 60
271+
Cancelled = 60,
272+
/// <summary>
273+
/// The task is currently being verified.
274+
/// </summary>
275+
Verifying = 70
272276
}
273277

274278
/// <summary>
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
ALTER TABLE `Signatures_Publishers`
2+
CHANGE `Publisher` `Publisher` varchar(255) DEFAULT NULL;
3+
4+
ALTER TABLE `Signatures_Roms`
5+
CHANGE `Languages` `Languages` LONGTEXT DEFAULT NULL,
6+
CHANGE `Countries` `Countries` LONGTEXT DEFAULT NULL;

hasheous/Controllers/V1.0/TaskWorkerController.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,15 @@ public async Task<IActionResult> ClientHeartbeat(string publicid)
197197
/// Retrieves the next job assigned to the specified task worker client. If a job is already assigned, it returns that job.
198198
/// </summary>
199199
/// <param name="publicid">The public identifier of the client requesting a job.</param>
200+
/// <param name="numberOfTasks">The number of tasks to retrieve (optional, default is 1).</param>
200201
/// <returns>An IActionResult containing the job details or null if no job is available.</returns>
201202
[MapToApiVersion("1.0")]
202203
[HttpGet]
203204
[Authentication.TaskWorkerAPIKey.TaskWorkerAPIKey()]
204205
[ProducesResponseType(StatusCodes.Status200OK)]
205206
[ProducesResponseType(StatusCodes.Status404NotFound)]
206207
[Route("clients/{publicid}/job")]
207-
public async Task<IActionResult> GetJobForClient(string publicid)
208+
public async Task<IActionResult> GetJobForClient(string publicid, [FromQuery] int numberOfTasks = 1)
208209
{
209210
// get the api key from the header
210211
string? apiKey = Request.Headers.TryGetValue(Authentication.TaskWorkerAPIKey.APIKeyHeaderName, out var headerValue) ? headerValue.FirstOrDefault() : null;
@@ -213,7 +214,7 @@ public async Task<IActionResult> GetJobForClient(string publicid)
213214
return Unauthorized("API key is missing.");
214215
}
215216

216-
var job = await ClientManagement.ClientGetTask(apiKey, publicid);
217+
var job = await ClientManagement.ClientGetTask(apiKey, publicid, numberOfTasks);
217218

218219
return Ok(job);
219220
}

0 commit comments

Comments
 (0)