Skip to content

Disable JobHosting workers for disabled jobs #4752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Test.Utilities;
using Xunit;

namespace Microsoft.Health.Fhir.Core.UnitTests.Config
{
[Trait(Traits.OwningTeam, OwningTeam.Fhir)]
[Trait(Traits.Category, Categories.Operations)]
public sealed class OperationsConfigurationTests
{
[Fact]
public void GivenAnOperationsConfiguration_WhenQueuesAreDisabled_RemoveThemFromTheList()
{
// Arrange
var operationsConfig = new OperationsConfiguration
{
Export = new ExportJobConfiguration { Enabled = false },
Import = new ImportTaskConfiguration { Enabled = false },
};

operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Export, MaxRunningTaskCount = 2 });
operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Import, MaxRunningTaskCount = 2 });
operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.BulkDelete, MaxRunningTaskCount = 1 });

// Act
operationsConfig.RemoveDisabledQueues();

// Assert
Assert.DoesNotContain(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Export);
Assert.DoesNotContain(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Import);
Assert.Contains(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.BulkDelete);
}

[Fact]
public void GivenAnOperationsConfiguration_WhenQueuesAreEnabledWithConnectionString_KeepThemInTheList()
{
// Arrange
var operationsConfig = new OperationsConfiguration
{
Export = new ExportJobConfiguration { Enabled = true, StorageAccountConnection = "test-connection-string" },
Import = new ImportTaskConfiguration { Enabled = true },
IntegrationDataStore = new IntegrationDataStoreConfiguration { StorageAccountConnection = "test-connection-string" },
};

operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Export, MaxRunningTaskCount = 2 });
operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Import, MaxRunningTaskCount = 2 });
operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.BulkDelete, MaxRunningTaskCount = 1 });

// Act
operationsConfig.RemoveDisabledQueues();

// Assert
Assert.Contains(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Export);
Assert.Contains(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Import);
Assert.Contains(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.BulkDelete);
}

[Fact]
public void GivenAnOperationsConfiguration_WhenQueuesAreEnabledWithUri_KeepThemInTheList()
{
// Arrange
var operationsConfig = new OperationsConfiguration
{
Export = new ExportJobConfiguration { Enabled = true, StorageAccountUri = "https://test-account.blob.core.windows.net" },
Import = new ImportTaskConfiguration { Enabled = true },
IntegrationDataStore = new IntegrationDataStoreConfiguration { StorageAccountUri = "https://test-account.blob.core.windows.net" },
};

operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Export, MaxRunningTaskCount = 2 });
operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Import, MaxRunningTaskCount = 2 });
operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.BulkDelete, MaxRunningTaskCount = 1 });

// Act
operationsConfig.RemoveDisabledQueues();

// Assert
Assert.Contains(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Export);
Assert.Contains(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Import);
Assert.Contains(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.BulkDelete);
}

[Fact]
public void GivenAnOperationsConfiguration_WhenNoQueuesExist_NoExceptionIsThrown()
{
// Arrange
var operationsConfig = new OperationsConfiguration
{
Export = new ExportJobConfiguration { Enabled = false },
Import = new ImportTaskConfiguration { Enabled = false },
};

// Act & Assert
var exception = Record.Exception(() => operationsConfig.RemoveDisabledQueues());
Assert.Null(exception);
}

[Fact]
public void GivenAnOperationsConfiguration_WhenMixedQueuesAreEnabledOrDisabled_HandleCorrectly()
{
// Arrange
var operationsConfigWithExport = new OperationsConfiguration
{
Export = new ExportJobConfiguration { Enabled = true, StorageAccountUri = "https://test-account.blob.core.windows.net" },
Import = new ImportTaskConfiguration { Enabled = false },
};

var operationsConfigWithImport = new OperationsConfiguration
{
Export = new ExportJobConfiguration { Enabled = false },
Import = new ImportTaskConfiguration { Enabled = true},
IntegrationDataStore = new IntegrationDataStoreConfiguration { StorageAccountUri = "https://test-account.blob.core.windows.net" },
};

operationsConfigWithExport.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Export, MaxRunningTaskCount = 2 });
operationsConfigWithExport.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Import, MaxRunningTaskCount = 2 });
operationsConfigWithExport.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.BulkDelete, MaxRunningTaskCount = 1 });

operationsConfigWithImport.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Export, MaxRunningTaskCount = 2 });
operationsConfigWithImport.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Import, MaxRunningTaskCount = 2 });
operationsConfigWithImport.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.BulkDelete, MaxRunningTaskCount = 1 });

// Act
operationsConfigWithExport.RemoveDisabledQueues();

operationsConfigWithImport.RemoveDisabledQueues();

// Assert
Assert.Contains(operationsConfigWithExport.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Export);
Assert.DoesNotContain(operationsConfigWithExport.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Import);
Assert.Contains(operationsConfigWithExport.HostingBackgroundServiceQueues, q => q.Queue == QueueType.BulkDelete);

Assert.Contains(operationsConfigWithImport.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Import);
Assert.DoesNotContain(operationsConfigWithImport.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Export);
Assert.Contains(operationsConfigWithImport.HostingBackgroundServiceQueues, q => q.Queue == QueueType.BulkDelete);
}

[Fact]
public void GivenAnOperationsConfiguration_WhenStorageStringsAreEmpty_RemoveQueues()
{
// Arrange
var operationsConfig = new OperationsConfiguration
{
Export = new ExportJobConfiguration { Enabled = true, StorageAccountConnection = string.Empty, StorageAccountUri = string.Empty },
Import = new ImportTaskConfiguration { Enabled = true },
IntegrationDataStore = new IntegrationDataStoreConfiguration { StorageAccountConnection = string.Empty, StorageAccountUri = string.Empty },
};

operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Export, MaxRunningTaskCount = 2 });
operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.Import, MaxRunningTaskCount = 2 });
operationsConfig.HostingBackgroundServiceQueues.Add(new HostingBackgroundServiceQueueItem { Queue = QueueType.BulkDelete, MaxRunningTaskCount = 1 });

// Act
operationsConfig.RemoveDisabledQueues();

// Assert
Assert.DoesNotContain(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Export);
Assert.DoesNotContain(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.Import);
Assert.Contains(operationsConfig.HostingBackgroundServiceQueues, q => q.Queue == QueueType.BulkDelete);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
// -------------------------------------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;
using Microsoft.Health.Fhir.Core.Features.Operations;

namespace Microsoft.Health.Fhir.Core.Configs
{
Expand All @@ -22,5 +24,27 @@ public class OperationsConfiguration
public IntegrationDataStoreConfiguration IntegrationDataStore { get; set; } = new IntegrationDataStoreConfiguration();

public ImportTaskConfiguration Import { get; set; } = new ImportTaskConfiguration();

/// <summary>
/// Removes queues based on the enabled status of the operations.
/// </summary>
public void RemoveDisabledQueues()
{
if (!Export.Enabled || (string.IsNullOrEmpty(Export.StorageAccountConnection) && string.IsNullOrEmpty(Export.StorageAccountUri)))
{
HostingBackgroundServiceQueues
.Where(q => q.Queue == QueueType.Export)
.ToList()
.ForEach(q => HostingBackgroundServiceQueues.Remove(q));
}

if (!Import.Enabled || (string.IsNullOrEmpty(IntegrationDataStore.StorageAccountConnection) && string.IsNullOrEmpty(IntegrationDataStore.StorageAccountUri)))
{
HostingBackgroundServiceQueues
.Where(q => q.Queue == QueueType.Import)
.ToList()
.ForEach(q => HostingBackgroundServiceQueues.Remove(q));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public static IFhirServerBuilder AddFhirServer(
configurationRoot?.GetSection(FhirServerConfigurationSectionName).Bind(fhirServerConfiguration);
configureAction?.Invoke(fhirServerConfiguration);

// Remove any job queues that are disables or don't have a storage account configured.
fhirServerConfiguration.Operations.RemoveDisabledQueues();

services.AddSingleton(Options.Options.Create(fhirServerConfiguration));
services.AddSingleton(Options.Options.Create(fhirServerConfiguration.Security));
services.AddSingleton(Options.Options.Create(fhirServerConfiguration.Features));
Expand Down
8 changes: 4 additions & 4 deletions src/Microsoft.Health.TaskManagement/JobHosting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public async Task ExecuteAsync(byte queueType, short runningJobCount, string wor
{
try
{
_logger.LogInformation("Dequeuing next job.");
_logger.LogInformation("Dequeuing next job for {QueueType}.", queueType);

if (checkTimeoutJobStopwatch.Elapsed.TotalSeconds > 600)
{
Expand All @@ -73,7 +73,7 @@ public async Task ExecuteAsync(byte queueType, short runningJobCount, string wor
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to dequeue new job.");
_logger.LogError(ex, "Failed to dequeue new job for {QueueType}.", queueType);
}
}

Expand Down Expand Up @@ -102,12 +102,12 @@ public async Task ExecuteAsync(byte queueType, short runningJobCount, string wor
{
try
{
_logger.LogInformation("Empty queue. Delaying until next iteration.");
_logger.LogInformation("Empty queue {QueueType}. Delaying until next iteration.", queueType);
await Task.Delay(TimeSpan.FromSeconds(PollingFrequencyInSeconds), cancellationTokenSource.Token);
}
catch (TaskCanceledException)
{
_logger.LogInformation("Queue is stopping, worker is shutting down.");
_logger.LogInformation("Queue {QueueType} is stopping, worker is shutting down.", queueType);
}
}
}
Expand Down
Loading