25
25
using Microsoft . Extensions . DependencyInjection ;
26
26
using Microsoft . Extensions . Logging ;
27
27
using Monai . Deploy . InformaticsGateway . Api ;
28
+ using Monai . Deploy . InformaticsGateway . Api . Rest ;
28
29
using Monai . Deploy . InformaticsGateway . Api . Storage ;
29
30
using Monai . Deploy . InformaticsGateway . Database . Api . Repositories ;
30
31
using Monai . Deploy . InformaticsGateway . Logging ;
32
+ using Monai . Deploy . InformaticsGateway . Services . Common ;
31
33
using Monai . Deploy . Messaging . Events ;
32
34
33
35
#nullable enable
@@ -38,7 +40,7 @@ namespace Monai.Deploy.InformaticsGateway.Services.Connectors
38
40
/// An in-memory queue for providing any files/DICOM instances received by the Informatics Gateway to
39
41
/// other internal services.
40
42
/// </summary>
41
- internal sealed partial class PayloadAssembler : IPayloadAssembler , IDisposable
43
+ internal sealed partial class PayloadAssembler : IPayloadAssembler , IDisposable , IMonaiService
42
44
{
43
45
internal const int DEFAULT_TIMEOUT = 5 ;
44
46
private readonly ILogger < PayloadAssembler > _logger ;
@@ -56,6 +58,8 @@ public PayloadAssembler(
56
58
{
57
59
_logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
58
60
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException ( nameof ( serviceScopeFactory ) ) ;
61
+ var scope = _serviceScopeFactory . CreateScope ( ) ;
62
+ var repository = scope . ServiceProvider . GetRequiredService < IPayloadRepository > ( ) ; // done here to ensure connection on startup
59
63
60
64
_workItems = [ ] ;
61
65
_tokenSource = new CancellationTokenSource ( ) ;
@@ -69,8 +73,14 @@ public PayloadAssembler(
69
73
} ;
70
74
_timer . Elapsed += OnTimedEvent ;
71
75
_timer . Enabled = true ;
76
+
77
+ Status = ServiceStatus . Running ;
72
78
}
73
79
80
+ public string ServiceName { get => nameof ( PayloadAssembler ) ; }
81
+
82
+ public ServiceStatus Status { get ; set ; } = ServiceStatus . Unknown ;
83
+
74
84
private async Task RemovePendingPayloads ( )
75
85
{
76
86
_logger . RemovingPendingPayloads ( ) ;
@@ -193,6 +203,8 @@ private async Task QueueBucketForNotification(string key, Payload payload)
193
203
payload . State = Payload . PayloadState . Move ;
194
204
var scope = _serviceScopeFactory . CreateScope ( ) ;
195
205
var repository = scope . ServiceProvider . GetRequiredService < IPayloadRepository > ( ) ;
206
+
207
+
196
208
await repository . UpdateAsync ( payload ) . ConfigureAwait ( false ) ;
197
209
_logger . PayloadSaved ( payload . PayloadId ) ;
198
210
_workItems . Add ( payload ) ;
@@ -219,9 +231,11 @@ private async Task<Payload> CreateOrGetPayload(string key, string correlationId,
219
231
220
232
private async Task < Payload > PayloadFactory ( string key , string correlationId , string ? workflowInstanceId , string ? taskId , Messaging . Events . DataOrigin dataOrigin , uint timeout , CancellationToken cancellationToken )
221
233
{
234
+ var newPayload = new Payload ( key , correlationId , workflowInstanceId , taskId , dataOrigin , timeout , null ) ;
222
235
var scope = _serviceScopeFactory . CreateScope ( ) ;
223
236
var repository = scope . ServiceProvider . GetRequiredService < IPayloadRepository > ( ) ;
224
- var newPayload = new Payload ( key , correlationId , workflowInstanceId , taskId , dataOrigin , timeout , null ) ;
237
+
238
+
225
239
await repository . AddAsync ( newPayload , cancellationToken ) . ConfigureAwait ( false ) ;
226
240
_logger . BucketCreated ( key , timeout ) ;
227
241
return newPayload ;
@@ -232,6 +246,7 @@ public void Dispose()
232
246
_tokenSource . Cancel ( ) ;
233
247
_payloads . Clear ( ) ;
234
248
_timer . Stop ( ) ;
249
+ Status = ServiceStatus . Stopped ;
235
250
}
236
251
}
237
252
}
0 commit comments