Skip to content

Commit a83ef62

Browse files
authored
Resets ActionBlock if faulted or cancelled. (#385)
* Resets ActionBlock if faulted or cancelled. * Log number of uploaded files. Signed-off-by: Victor Chang <[email protected]>
1 parent 1e63847 commit a83ef62

18 files changed

+328
-140
lines changed

src/Api/Storage/Payload.cs

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public TimeSpan Elapsed
7676
public string? CalledAeTitle { get => Files.OfType<DicomFileStorageMetadata>().Select(p => p.CalledAeTitle).FirstOrDefault(); }
7777

7878
public int FilesUploaded { get => Files.Count(p => p.IsUploaded); }
79+
7980
public int FilesFailedToUpload { get => Files.Count(p => p.IsUploadFailed); }
8081

8182
public Payload(string key, string correlationId, uint timeout)

src/Common/ExtensionMethods.cs

+10
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,15 @@ public static async Task<bool> Post<TInput>(this ActionBlock<TInput> actionBlock
7676
await Task.Delay(delay).ConfigureAwait(false);
7777
return actionBlock.Post(input);
7878
}
79+
80+
/// <summary>
81+
/// Checks if a given task is faulted or cancelled.
82+
/// </summary>
83+
/// <param name="task">The task object</param>
84+
/// <returns>True if canceled or faulted. False otherwise.</returns>
85+
public static bool IsCanceledOrFaulted(this Task task)
86+
{
87+
return task.IsCanceled || task.IsFaulted;
88+
}
7989
}
8090
}

src/Configuration/DicomWebConfiguration.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class DicomWebConfiguration
3939
/// Gets or sets the maximum number of simultaneous DICOMweb connections.
4040
/// </summary>
4141
[ConfigurationKeyName("maximumNumberOfConnections")]
42-
public int MaximumNumberOfConnection { get; set; } = 2;
42+
public ushort MaximumNumberOfConnection { get; set; } = 2;
4343

4444
/// <summary>
4545
/// Gets or set the maximum allowed file size in bytes with default to 2GiB.

src/Configuration/ScuConfiguration.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class ScuConfiguration
5454
/// Gets or sets the maximum number of simultaneous DICOM associations for the SCU service.
5555
/// </summary>
5656
[ConfigurationKeyName("maximumNumberOfAssociations")]
57-
public int MaximumNumberOfAssociations { get; set; } = 8;
57+
public ushort MaximumNumberOfAssociations { get; set; } = 8;
5858

5959
public ScuConfiguration()
6060
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2023 MONAI Consortium
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Runtime.Serialization;
19+
using Monai.Deploy.InformaticsGateway.Api.Storage;
20+
21+
namespace Monai.Deploy.InformaticsGateway.Common
22+
{
23+
internal class PostPayloadException : Exception
24+
{
25+
public Payload.PayloadState TargetQueue { get; }
26+
public Payload Payload { get; }
27+
28+
public PostPayloadException()
29+
{
30+
}
31+
32+
public PostPayloadException(Api.Storage.Payload.PayloadState targetState, Payload payload)
33+
{
34+
TargetQueue = targetState;
35+
Payload = payload;
36+
}
37+
38+
public PostPayloadException(string message) : base(message)
39+
{
40+
}
41+
42+
public PostPayloadException(string message, Exception innerException) : base(message, innerException)
43+
{
44+
}
45+
46+
protected PostPayloadException(SerializationInfo info, StreamingContext context) : base(info, context)
47+
{
48+
}
49+
}
50+
}

src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 MONAI Consortium
2+
* Copyright 2022-2023 MONAI Consortium
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,8 +33,8 @@ public static partial class Log
3333
[LoggerMessage(EventId = 3004, Level = LogLevel.Trace, Message = "Number of incomplete payloads waiting for processing: {count}.")]
3434
public static partial void BucketsActive(this ILogger logger, int count);
3535

36-
[LoggerMessage(EventId = 3005, Level = LogLevel.Trace, Message = "Checking elapsed time for bucket: {key} with timeout set to {timeout}s. Elapsed {elapsed}s with {failedFiles} failures out of {totalNumberOfFiles}.")]
37-
public static partial void BucketElapsedTime(this ILogger logger, string key, uint timeout, double elapsed, int totalNumberOfFiles, int failedFiles);
36+
[LoggerMessage(EventId = 3005, Level = LogLevel.Trace, Message = "Checking elapsed time for bucket: {key} with timeout set to {timeout}s. Elapsed {elapsed}s with {succeededFiles} uplaoded and {failedFiles} failures out of {totalNumberOfFiles}.")]
37+
public static partial void BucketElapsedTime(this ILogger logger, string key, uint timeout, double elapsed, int totalNumberOfFiles, int succeededFiles, int failedFiles);
3838

3939
[LoggerMessage(EventId = 3007, Level = LogLevel.Information, Message = "Bucket {key} sent to processing queue with {count} files.")]
4040
public static partial void BucketReady(this ILogger logger, string key, int count);

src/InformaticsGateway/Logging/Log.500.ExportService.cs

+6
Original file line numberDiff line numberDiff line change
@@ -123,5 +123,11 @@ public static partial class Log
123123

124124
[LoggerMessage(EventId = 533, Level = LogLevel.Error, Message = "Recovering messaging service connection due to {reason}.")]
125125
public static partial void MessagingServiceErrorRecover(this ILogger logger, string reason);
126+
127+
[LoggerMessage(EventId = 534, Level = LogLevel.Error, Message = "Error posting export job for processing correlation ID {correlationId}, export task ID {exportTaskId}.")]
128+
public static partial void ErrorPostingExportJobToQueue(this ILogger logger, string correlationId, string exportTaskId);
129+
130+
[LoggerMessage(EventId = 535, Level = LogLevel.Warning, Message = "Exceeded maximum number of worker in {serviceName}: {count}.")]
131+
public static partial void ExceededMaxmimumNumberOfWorkers(this ILogger logger, string serviceName, ulong count);
126132
}
127133
}

src/InformaticsGateway/Logging/Log.700.PayloadService.cs

+12
Original file line numberDiff line numberDiff line change
@@ -136,5 +136,17 @@ public static partial class Log
136136

137137
[LoggerMessage(EventId = 743, Level = LogLevel.Error, Message = "Exception moving payload.")]
138138
public static partial void PayloadMoveException(this ILogger logger, Exception ex);
139+
140+
[LoggerMessage(EventId = 744, Level = LogLevel.Warning, Message = "PayloadNotification move payload queue: faulted: {isFauled}, cancelled: {isCancelled}.")]
141+
public static partial void MoveQueueFaulted(this ILogger logger, bool isFauled, bool isCancelled);
142+
143+
[LoggerMessage(EventId = 745, Level = LogLevel.Warning, Message = "PayloadNotification publishing payload queue: faulted: {isFauled}, cancelled: {isCancelled}.")]
144+
public static partial void PublishQueueFaulted(this ILogger logger, bool isFauled, bool isCancelled);
145+
146+
[LoggerMessage(EventId = 746, Level = LogLevel.Error, Message = "Error posting payload to move queue.")]
147+
public static partial void ErrorPostingJobToMovePayloadsQueue(this ILogger logger);
148+
149+
[LoggerMessage(EventId = 747, Level = LogLevel.Error, Message = "Error posting payload to publish queue.")]
150+
public static partial void ErrorPostingJobToPublishPayloadsQueue(this ILogger logger);
139151
}
140152
}

src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
135135
var payload = await _payloads[key].Task.ConfigureAwait(false);
136136
using var loggerScope = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "CorrelationId", payload.CorrelationId } });
137137

138-
_logger.BucketElapsedTime(key, payload.Timeout, payload.ElapsedTime().TotalSeconds, payload.Files.Count, payload.FilesFailedToUpload);
138+
_logger.BucketElapsedTime(key, payload.Timeout, payload.ElapsedTime().TotalSeconds, payload.Files.Count, payload.FilesUploaded, payload.FilesFailedToUpload);
139139
// Wait for timer window closes before sending payload for processing
140140
if (payload.HasTimedOut)
141141
{

src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs

+9-2
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ public async Task MoveFilesAsync(Payload payload, ActionBlock<Payload> moveQueue
8686
var action = await UpdatePayloadState(payload, ex, cancellationToken).ConfigureAwait(false);
8787
if (action == PayloadAction.Updated)
8888
{
89-
await moveQueue.Post(payload, _options.Value.Storage.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false);
89+
if (!await moveQueue.Post(payload, _options.Value.Storage.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false))
90+
{
91+
throw new PostPayloadException(Payload.PayloadState.Move, payload);
92+
}
9093
}
9194
}
9295
finally
@@ -111,7 +114,11 @@ private async Task NotifyIfCompleted(Payload payload, ActionBlock<Payload> notif
111114
await repository.UpdateAsync(payload, cancellationToken).ConfigureAwait(false);
112115
_logger.PayloadSaved(payload.PayloadId);
113116

114-
notificationQueue.Post(payload);
117+
if (!notificationQueue.Post(payload))
118+
{
119+
throw new PostPayloadException(Payload.PayloadState.Notify, payload);
120+
}
121+
115122
_logger.PayloadReadyToBePublished(payload.PayloadId);
116123
}
117124
else // we should never hit this else block.

src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,11 @@ public async Task NotifyAsync(Payload payload, ActionBlock<Payload> notification
8080
var action = await UpdatePayloadState(payload, cancellationToken).ConfigureAwait(false);
8181
if (action == PayloadAction.Updated)
8282
{
83-
await notificationQueue.Post(payload, _options.Value.Messaging.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false);
8483
_logger.FailedToPublishWorkflowRequest(payload.PayloadId, ex);
84+
if (!await notificationQueue.Post(payload, _options.Value.Messaging.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false))
85+
{
86+
throw new PostPayloadException(Payload.PayloadState.Notify, payload);
87+
}
8588
}
8689
}
8790
}

src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs

+103-24
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,38 @@ public PayloadNotificationService(IServiceScopeFactory serviceScopeFactory,
8888
_cancellationTokenSource = new CancellationTokenSource();
8989
}
9090

91-
public async Task StartAsync(CancellationToken cancellationToken)
91+
public Task StartAsync(CancellationToken cancellationToken)
9292
{
93-
_moveFileQueue = new ActionBlock<Payload>(
94-
MoveActionHandler,
95-
new ExecutionDataflowBlockOptions
96-
{
97-
MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads,
98-
MaxMessagesPerTask = 1,
99-
CancellationToken = cancellationToken
100-
});
93+
SetupQueues(cancellationToken);
94+
95+
var task = Task.Run(async () =>
96+
{
97+
await RestoreFromDatabaseAsync(cancellationToken).ConfigureAwait(false);
98+
BackgroundProcessing(cancellationToken);
99+
}, CancellationToken.None);
100+
101+
Status = ServiceStatus.Running;
102+
_logger.ServiceStarted(ServiceName);
103+
104+
if (task.IsCompleted)
105+
return task;
106+
107+
return Task.CompletedTask;
108+
}
109+
110+
private void SetupQueues(CancellationToken cancellationToken)
111+
{
112+
ResetMoveQueue(cancellationToken);
113+
ResetPublishQueue(cancellationToken);
114+
}
115+
116+
private void ResetPublishQueue(CancellationToken cancellationToken)
117+
{
118+
if (_publishQueue is not null)
119+
{
120+
_logger.PublishQueueFaulted(_publishQueue.Completion.IsFaulted, _publishQueue.Completion.IsCanceled);
121+
_publishQueue.Complete();
122+
}
101123

102124
_publishQueue = new ActionBlock<Payload>(
103125
NotificationHandler,
@@ -107,21 +129,24 @@ public async Task StartAsync(CancellationToken cancellationToken)
107129
MaxMessagesPerTask = 1,
108130
CancellationToken = cancellationToken
109131
});
132+
}
110133

111-
await RestoreFromDatabaseAsync(cancellationToken).ConfigureAwait(false);
112-
113-
var task = Task.Run(() =>
134+
private void ResetMoveQueue(CancellationToken cancellationToken)
135+
{
136+
if (_moveFileQueue is not null)
114137
{
115-
BackgroundProcessing(cancellationToken);
116-
}, CancellationToken.None);
117-
118-
Status = ServiceStatus.Running;
119-
_logger.ServiceStarted(ServiceName);
120-
121-
if (task.IsCompleted)
122-
await task.ConfigureAwait(false);
138+
_logger.MoveQueueFaulted(_moveFileQueue.Completion.IsFaulted, _moveFileQueue.Completion.IsCanceled);
139+
_moveFileQueue.Complete();
140+
}
123141

124-
await Task.CompletedTask.ConfigureAwait(false);
142+
_moveFileQueue = new ActionBlock<Payload>(
143+
MoveActionHandler,
144+
new ExecutionDataflowBlockOptions
145+
{
146+
MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads,
147+
MaxMessagesPerTask = 1,
148+
CancellationToken = cancellationToken
149+
});
125150
}
126151

127152
private async Task NotificationHandler(Payload payload)
@@ -134,6 +159,10 @@ private async Task NotificationHandler(Payload payload)
134159
{
135160
await _payloadNotificationActionHandler.NotifyAsync(payload, _publishQueue, _cancellationTokenSource.Token).ConfigureAwait(false);
136161
}
162+
catch (PostPayloadException ex)
163+
{
164+
HandlePostPayloadException(ex);
165+
}
137166
catch (Exception ex)
138167
{
139168
if (ex is PayloadNotifyException payloadMoveException &&
@@ -158,6 +187,10 @@ private async Task MoveActionHandler(Payload payload)
158187
{
159188
await _payloadMoveActionHandler.MoveFilesAsync(payload, _moveFileQueue, _publishQueue, _cancellationTokenSource.Token).ConfigureAwait(false);
160189
}
190+
catch (PostPayloadException ex)
191+
{
192+
HandlePostPayloadException(ex);
193+
}
161194
catch (Exception ex)
162195
{
163196
if (ex is PayloadNotifyException payloadMoveException &&
@@ -172,17 +205,45 @@ private async Task MoveActionHandler(Payload payload)
172205
}
173206
}
174207

208+
private void HandlePostPayloadException(PostPayloadException ex)
209+
{
210+
Guard.Against.Null(ex);
211+
212+
if (ex.TargetQueue == Payload.PayloadState.Move)
213+
{
214+
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, CancellationToken.None);
215+
if (!_moveFileQueue.Post(ex.Payload))
216+
{
217+
_logger.ErrorPostingJobToMovePayloadsQueue();
218+
}
219+
}
220+
else if (ex.TargetQueue == Payload.PayloadState.Notify)
221+
{
222+
ResetIfFaultedOrCancelled(_publishQueue, ResetPublishQueue, CancellationToken.None);
223+
if (!_publishQueue.Post(ex.Payload))
224+
{
225+
_logger.ErrorPostingJobToPublishPayloadsQueue();
226+
}
227+
}
228+
}
229+
175230
private void BackgroundProcessing(CancellationToken cancellationToken)
176231
{
177232
_logger.ServiceRunning(ServiceName);
178233

179234
while (!cancellationToken.IsCancellationRequested)
180235
{
236+
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, cancellationToken);
237+
ResetIfFaultedOrCancelled(_publishQueue, ResetPublishQueue, cancellationToken);
238+
181239
Payload payload = null;
182240
try
183241
{
184242
payload = _payloadAssembler.Dequeue(cancellationToken);
185-
_moveFileQueue.Post(payload);
243+
while (!_moveFileQueue.Post(payload))
244+
{
245+
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, cancellationToken);
246+
}
186247
_logger.PayloadQueuedForProcessing(payload.PayloadId, ServiceName);
187248
}
188249
catch (OperationCanceledException ex)
@@ -202,6 +263,18 @@ private void BackgroundProcessing(CancellationToken cancellationToken)
202263
_logger.ServiceCancelled(ServiceName);
203264
}
204265

266+
private static void ResetIfFaultedOrCancelled(ActionBlock<Payload> queue, Action<CancellationToken> resetFunction, CancellationToken cancellationToken)
267+
{
268+
Guard.Against.Null(queue);
269+
Guard.Against.Null(resetFunction);
270+
271+
if (queue.Completion.IsCanceledOrFaulted())
272+
{
273+
resetFunction(cancellationToken);
274+
}
275+
}
276+
277+
205278
private async Task RestoreFromDatabaseAsync(CancellationToken cancellationToken)
206279
{
207280
_logger.StartupRestoreFromDatabase();
@@ -214,11 +287,17 @@ private async Task RestoreFromDatabaseAsync(CancellationToken cancellationToken)
214287
{
215288
if (payload.State == Payload.PayloadState.Move)
216289
{
217-
_moveFileQueue.Post(payload);
290+
if (!_moveFileQueue.Post(payload))
291+
{
292+
_logger.ErrorPostingJobToMovePayloadsQueue();
293+
}
218294
}
219295
else if (payload.State == Payload.PayloadState.Notify)
220296
{
221-
_publishQueue.Post(payload);
297+
if (!_publishQueue.Post(payload))
298+
{
299+
_logger.ErrorPostingJobToPublishPayloadsQueue();
300+
}
222301
}
223302
}
224303
_logger.RestoredFromDatabase(payloads.Count);

src/InformaticsGateway/Services/Export/DicomWebExportService.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ internal class DicomWebExportService : ExportServiceBase
4949
private readonly IOptions<InformaticsGatewayConfiguration> _configuration;
5050
private readonly IDicomToolkit _dicomToolkit;
5151

52-
protected override int Concurrency { get; }
52+
protected override ushort Concurrency { get; }
5353
public override string RoutingKey { get; }
5454
public override string ServiceName => "DICOMweb Export Service";
5555

0 commit comments

Comments
 (0)