Skip to content

Commit a943f96

Browse files
[IngestionClient] Refactor - add ServiceBusClient via Dependency Injection instead of static clients (#2476)
1 parent 87130ab commit a943f96

16 files changed

Lines changed: 186 additions & 81 deletions

File tree

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// <copyright file="ServiceBusClientName.cs" company="Microsoft Corporation">
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
4+
// </copyright>
5+
6+
namespace Connector.Enums
7+
{
8+
public enum ServiceBusClientName
9+
{
10+
None = 0,
11+
StartTranscriptionServiceBusClient,
12+
FetchTranscriptionServiceBusClient,
13+
CompletedTranscriptionServiceBusClient,
14+
}
15+
}

samples/ingestion/ingestion-client/Connector/ModelIdentity.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
namespace Connector
77
{
8-
using System;
9-
108
public sealed class ModelIdentity
119
{
1210
public ModelIdentity(string self)

samples/ingestion/ingestion-client/Connector/Serializable/CompletedMessage.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
namespace Connector
77
{
8-
using Newtonsoft.Json;
9-
108
public class CompletedMessage
119
{
1210
public CompletedMessage(string audioFileName, string jsonResportLocation)

samples/ingestion/ingestion-client/Connector/Serializable/TranscriptionResult/SpeechTranscript.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ namespace Connector
77
{
88
using System.Collections.Generic;
99

10-
using Connector.Serializable.Language.Conversations;
11-
1210
using Newtonsoft.Json;
1311

1412
public class SpeechTranscript

samples/ingestion/ingestion-client/Connector/Serializable/TranscriptionStartedMessage/AudioFileInfo.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
namespace Connector.Serializable.TranscriptionStartedServiceBusMessage
77
{
8-
using System;
9-
108
public class AudioFileInfo
119
{
1210
public AudioFileInfo(string fileUrl, int retryCount, TextAnalyticsRequests textAnalyticsRequests, string fileName)

samples/ingestion/ingestion-client/FetchTranscription/FetchTranscription.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,14 @@ namespace FetchTranscription
77
{
88
using System;
99
using System.Threading.Tasks;
10+
using Azure.Messaging.ServiceBus;
11+
1012
using Connector;
13+
using Connector.Database;
1114

1215
using Microsoft.Azure.Functions.Worker;
16+
using Microsoft.Extensions.Azure;
17+
using Microsoft.Extensions.DependencyInjection;
1318
using Microsoft.Extensions.Logging;
1419

1520
/// <summary>
@@ -19,6 +24,7 @@ public class FetchTranscription
1924
{
2025
private readonly IServiceProvider serviceProvider;
2126
private readonly IStorageConnector storageConnector;
27+
private readonly IAzureClientFactory<ServiceBusClient> serviceBusClientFactory;
2228
private readonly ILogger<FetchTranscription> logger;
2329

2430
/// <summary>
@@ -27,11 +33,17 @@ public class FetchTranscription
2733
/// <param name="serviceProvider">The service provider.</param>
2834
/// <param name="logger">The FetchTranscription logger.</param>
2935
/// <param name="storageConnector">Storage Connector dependency</param>
30-
public FetchTranscription(IServiceProvider serviceProvider, ILogger<FetchTranscription> logger, IStorageConnector storageConnector)
36+
/// <param name="serviceBusClientFactory">Azure client factory for service bus clients</param>
37+
public FetchTranscription(
38+
IServiceProvider serviceProvider,
39+
ILogger<FetchTranscription> logger,
40+
IStorageConnector storageConnector,
41+
IAzureClientFactory<ServiceBusClient> serviceBusClientFactory)
3142
{
3243
this.serviceProvider = serviceProvider;
3344
this.logger = logger;
3445
this.storageConnector = storageConnector;
46+
this.serviceBusClientFactory = serviceBusClientFactory;
3547
}
3648

3749
/// <summary>
@@ -53,7 +65,9 @@ public async Task Run([ServiceBusTrigger("fetch_transcription_queue", Connection
5365

5466
var serviceBusMessage = TranscriptionStartedMessage.DeserializeMessage(message);
5567

56-
var transcriptionProcessor = new TranscriptionProcessor(this.serviceProvider, this.storageConnector);
68+
var databaseContext = FetchTranscriptionEnvironmentVariables.UseSqlDatabase ? this.serviceProvider.GetRequiredService<IngestionClientDbContext>() : null;
69+
70+
var transcriptionProcessor = new TranscriptionProcessor(this.storageConnector, this.serviceBusClientFactory, databaseContext);
5771

5872
await transcriptionProcessor.ProcessTranscriptionJobAsync(serviceBusMessage, this.serviceProvider, this.logger).ConfigureAwait(false);
5973
}

samples/ingestion/ingestion-client/FetchTranscription/Program.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ namespace FetchTranscription
1212

1313
using Connector;
1414
using Connector.Database;
15+
using Connector.Enums;
1516

1617
using Microsoft.EntityFrameworkCore;
18+
using Microsoft.Extensions.Azure;
1719
using Microsoft.Extensions.DependencyInjection;
1820
using Microsoft.Extensions.Hosting;
1921

@@ -44,6 +46,20 @@ public static void Main(string[] args)
4446
s.AddSingleton(blobServiceClient);
4547
s.AddSingleton(storageCredential);
4648
s.AddTransient<IStorageConnector, StorageConnector>();
49+
50+
s.AddAzureClients(clientBuilder =>
51+
{
52+
clientBuilder.AddServiceBusClient(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString)
53+
.WithName(ServiceBusClientName.StartTranscriptionServiceBusClient.ToString());
54+
clientBuilder.AddServiceBusClient(FetchTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString)
55+
.WithName(ServiceBusClientName.FetchTranscriptionServiceBusClient.ToString());
56+
57+
if (!string.IsNullOrWhiteSpace(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString))
58+
{
59+
clientBuilder.AddServiceBusClient(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString)
60+
.WithName(ServiceBusClientName.CompletedTranscriptionServiceBusClient.ToString());
61+
}
62+
});
4763
})
4864
.Build();
4965

samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,49 +21,44 @@ namespace FetchTranscription
2121
using Connector.Serializable.TranscriptionStartedServiceBusMessage;
2222

2323
using Microsoft.EntityFrameworkCore;
24-
using Microsoft.Extensions.DependencyInjection;
24+
using Microsoft.Extensions.Azure;
2525
using Microsoft.Extensions.Logging;
2626
using Newtonsoft.Json;
2727

2828
public class TranscriptionProcessor
2929
{
30-
private static readonly ServiceBusClient StartServiceBusClient = new ServiceBusClient(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString);
30+
private readonly ServiceBusSender startTranscriptionServiceBusSender;
3131

32-
private static readonly ServiceBusSender StartServiceBusSender = StartServiceBusClient.CreateSender(ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString).EntityPath);
32+
private readonly ServiceBusSender fetchTranscriptionServiceBusSender;
3333

34-
private static readonly ServiceBusClient FetchServiceBusClient = new ServiceBusClient(FetchTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString);
35-
36-
private static readonly ServiceBusSender FetchServiceBusSender = FetchServiceBusClient.CreateSender(ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString).EntityPath);
37-
38-
private static readonly ServiceBusClient CompletedServiceBusClient;
39-
40-
private static readonly ServiceBusSender CompletedServiceBusSender;
41-
42-
private readonly IServiceProvider serviceProvider;
34+
private readonly ServiceBusSender completedTranscriptionServiceBusSender;
4335

4436
private readonly IngestionClientDbContext databaseContext;
4537

4638
private readonly IStorageConnector storageConnector;
4739

48-
#pragma warning disable CA1810
49-
static TranscriptionProcessor()
40+
public TranscriptionProcessor(
41+
IStorageConnector storageConnector,
42+
IAzureClientFactory<ServiceBusClient> serviceBusClientFactory,
43+
IngestionClientDbContext databaseContext)
5044
{
51-
if (!string.IsNullOrEmpty(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString))
52-
{
53-
CompletedServiceBusClient = new ServiceBusClient(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString);
54-
CompletedServiceBusSender = CompletedServiceBusClient.CreateSender(ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString).EntityPath);
55-
}
56-
}
57-
#pragma warning restore CA1810
58-
59-
public TranscriptionProcessor(IServiceProvider serviceProvider, IStorageConnector storageConnector)
60-
{
61-
this.serviceProvider = serviceProvider;
6245
this.storageConnector = storageConnector;
46+
this.databaseContext = databaseContext;
47+
48+
ArgumentNullException.ThrowIfNull(serviceBusClientFactory, nameof(serviceBusClientFactory));
49+
var startTranscriptionServiceBusClient = serviceBusClientFactory.CreateClient(ServiceBusClientName.StartTranscriptionServiceBusClient.ToString());
50+
var startTranscriptionQueueName = ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString).EntityPath;
51+
this.startTranscriptionServiceBusSender = startTranscriptionServiceBusClient.CreateSender(startTranscriptionQueueName);
52+
53+
var fetchTranscriptionServiceBusClient = serviceBusClientFactory.CreateClient(ServiceBusClientName.FetchTranscriptionServiceBusClient.ToString());
54+
var fetchTranscriptionQueueName = ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString).EntityPath;
55+
this.fetchTranscriptionServiceBusSender = fetchTranscriptionServiceBusClient.CreateSender(fetchTranscriptionQueueName);
6356

64-
if (FetchTranscriptionEnvironmentVariables.UseSqlDatabase)
57+
if (!string.IsNullOrWhiteSpace(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString))
6558
{
66-
this.databaseContext = this.serviceProvider.GetRequiredService<IngestionClientDbContext>();
59+
var completedTranscriptionServiceBusClient = serviceBusClientFactory.CreateClient(ServiceBusClientName.CompletedTranscriptionServiceBusClient.ToString());
60+
var completedTranscriptionQueueName = ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString).EntityPath;
61+
this.completedTranscriptionServiceBusSender = completedTranscriptionServiceBusClient.CreateSender(completedTranscriptionQueueName);
6762
}
6863
}
6964

@@ -97,11 +92,11 @@ public async Task ProcessTranscriptionJobAsync(TranscriptionStartedMessage servi
9792
break;
9893
case "Running":
9994
log.LogInformation($"Transcription running, polling again after {messageDelayTime.TotalMinutes} minutes.");
100-
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
95+
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
10196
break;
10297
case "NotStarted":
10398
log.LogInformation($"Transcription not started, polling again after {messageDelayTime.TotalMinutes} minutes.");
104-
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
99+
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
105100
break;
106101
}
107102
}
@@ -221,7 +216,7 @@ private async Task ProcessFailedTranscriptionAsync(string transcriptionLocation,
221216
};
222217

223218
var audioFileMessage = new Azure.Messaging.ServiceBus.ServiceBusMessage(JsonConvert.SerializeObject(serviceBusMessage));
224-
await ServiceBusUtilities.SendServiceBusMessageAsync(StartServiceBusSender, audioFileMessage, log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
219+
await ServiceBusUtilities.SendServiceBusMessageAsync(this.startTranscriptionServiceBusSender, audioFileMessage, log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
225220
}
226221
else
227222
{
@@ -284,7 +279,7 @@ private async Task RetryOrFailJobAsync(TranscriptionStartedMessage message, stri
284279
if (message.FailedExecutionCounter <= FetchTranscriptionEnvironmentVariables.RetryLimit || isThrottled)
285280
{
286281
log.LogInformation("Retrying..");
287-
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, message.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
282+
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, message.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
288283
}
289284
else
290285
{
@@ -342,7 +337,7 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati
342337
{
343338
// If transcription analytics request is still running, re-queue message and get status again after X minutes:
344339
log.LogInformation($"Transcription analytics requests still running for job {jobName} - re-queueing message.");
345-
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
340+
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, serviceBusMessage.CreateMessageString(), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
346341
return;
347342
}
348343

@@ -418,7 +413,7 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati
418413
log.LogInformation($"Added text analytics requests to service bus message - re-queueing message.");
419414

420415
// Poll for first time with TA request after 1 minute
421-
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
416+
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, serviceBusMessage.CreateMessageString(), log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
422417
return;
423418
}
424419

@@ -509,9 +504,9 @@ await this.databaseContext.StoreTranscriptionAsync(
509504
}
510505
}
511506

512-
if (!string.IsNullOrEmpty(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString))
507+
if (this.completedTranscriptionServiceBusSender != null)
513508
{
514-
await ServiceBusUtilities.SendServiceBusMessageAsync(CompletedServiceBusSender, JsonConvert.SerializeObject(completedMessages), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
509+
await ServiceBusUtilities.SendServiceBusMessageAsync(this.completedTranscriptionServiceBusSender, JsonConvert.SerializeObject(completedMessages), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
515510
}
516511

517512
var generalErrors = generalErrorsStringBuilder.ToString();

samples/ingestion/ingestion-client/Setup/ArmTemplateBatch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@
256256
},
257257
"variables":
258258
{
259-
"Version": "v2.1.3",
259+
"Version": "v2.1.4",
260260
"AudioInputContainer": "audio-input",
261261
"AudioProcessedContainer": "audio-processed",
262262
"ErrorFilesOutputContainer": "audio-failed",

samples/ingestion/ingestion-client/StartTranscriptionByServiceBus/Program.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
namespace StartTranscription
77
{
8-
using System;
9-
108
using Azure.Storage;
119
using Azure.Storage.Blobs;
1210

1311
using Connector;
12+
using Connector.Enums;
1413

14+
using Microsoft.Extensions.Azure;
1515
using Microsoft.Extensions.DependencyInjection;
1616
using Microsoft.Extensions.Hosting;
1717

@@ -44,6 +44,14 @@ public static void Main(string[] args)
4444
s.AddSingleton(blobServiceClient);
4545
s.AddSingleton(storageCredential);
4646
s.AddTransient<IStorageConnector, StorageConnector>();
47+
48+
s.AddAzureClients(clientBuilder =>
49+
{
50+
clientBuilder.AddServiceBusClient(StartTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString)
51+
.WithName(ServiceBusClientName.StartTranscriptionServiceBusClient.ToString());
52+
clientBuilder.AddServiceBusClient(StartTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString)
53+
.WithName(ServiceBusClientName.FetchTranscriptionServiceBusClient.ToString());
54+
});
4755
})
4856
.Build();
4957

0 commit comments

Comments
 (0)