Skip to content

Commit

Permalink
Add initial import operation (microsoft#1992)
Browse files Browse the repository at this point in the history
Add import operation for customer to initial load data to FHIR service.

Sample request:
```
curl --location --request POST 'https://localhost:44348/$import' \
--header 'Accept: application/fhir+json' \
--header 'Prefer: respond-async' \
--header 'Content-Type: application/fhir+json' \
--data-raw '{
	"resourceType": "Parameters",
	"parameter": [
		{
			"name": "inputFormat",
			"valueString": "application/fhir+ndjson"
                },
                {
			"name": "mode",
			"valueString": "InitialLoad"
                },
		{
			"name": "input",
			"part": [
				{
					"name": "type",
					"valueString": "Patient"
				},
				{
					"name": "url",
					"valueUri": "http://127.0.0.1:10000/devstoreaccount1/0000/Patient.ndjson"
				}
			]
		},
		{
			"name": "input",
			"part": [
				{
					"name": "type",
					"valueString": "Patient"
				},
				{
					"name": "url",
					"valueUri": "http://127.0.0.1:10000/devstoreaccount1/0000/PatientErr.ndjson"
				}
			]
		},
		{
			"name": "storageDetail",
			"part": [
				{
					"name": "type",
					"valueString": "azure-blob"
				}
			]
		}
	]
}'
```
Sample result
```
{
    "transactionTime": "2021-05-17T06:23:47.0892657+00:00",
    "request": "https://localhost:44348/$import",
    "output": [
        {
            "type": "Patient",
            "count": 4,
            "inputUrl": "http://127.0.0.1:10000/devstoreaccount1/0000/Patient.ndjson"
        },
        {
            "type": "Patient",
            "count": 4,
            "inputUrl": "http://127.0.0.1:10000/devstoreaccount1/0000/PatientErr.ndjson"
        }
    ],
    "error": [
        {
            "type": "Patient",
            "count": 1,
            "inputUrl": "http://127.0.0.1:10000/devstoreaccount1/0000/PatientErr.ndjson",
            "url": "http://127.0.0.1:10000/devstoreaccount1/fhirlogs/Patient38d5d864efcc40ecbdf31ee63dea7f89.ndjson"
        }
    ]
}
```
  • Loading branch information
tongwu-sh authored Aug 20, 2021
1 parent 1b6d917 commit c677029
Show file tree
Hide file tree
Showing 235 changed files with 18,387 additions and 138 deletions.
13 changes: 13 additions & 0 deletions build/jobs/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ steps:
Write-Host "##vso[task.setvariable variable=TestExportStoreUri]$($exportStoreUri)"
Write-Host "##vso[task.setvariable variable=TestExportStoreKey]$($exportStoreKey.Value)"
$integrationStoreSettings = $appSettings | where {$_.Name -eq "FhirServer__Operations__IntegrationDataStore__StorageAccountUri"}
$integrationStoreUri = $integrationStoreSettings[0].Value
Write-Host "$integrationStoreUri"
$integrationStoreAccountName = [System.Uri]::new("$integrationStoreUri").Host.Split('.')[0]
$integrationStoreKey = Get-AzStorageAccountKey -ResourceGroupName $(ResourceGroupName) -Name "$integrationStoreAccountName" | Where-Object {$_.KeyName -eq "key1"}
Write-Host "##vso[task.setvariable variable=TestIntegrationStoreUri]$($integrationStoreUri)"
Write-Host "##vso[task.setvariable variable=TestIntegrationStoreKey]$($integrationStoreKey.Value)"
Write-Host "##vso[task.setvariable variable=Resource]$(TestEnvironmentUrl)"
$secrets = Get-AzKeyVaultSecret -VaultName resolute-oss-tenant-info
Expand Down Expand Up @@ -99,6 +108,8 @@ steps:
'TestContainerRegistryPassword': $(TestContainerRegistryPassword)
'TestExportStoreUri': $(TestExportStoreUri)
'TestExportStoreKey': $(TestExportStoreKey)
'TestIntegrationStoreUri': $(TestIntegrationStoreUri)
'TestIntegrationStoreKey': $(TestIntegrationStoreKey)
'tenant-admin-service-principal-name': $(tenant-admin-service-principal-name)
'tenant-admin-service-principal-password': $(tenant-admin-service-principal-password)
'tenant-admin-user-name': $(tenant-admin-user-name)
Expand All @@ -116,6 +127,8 @@ steps:
'user_globalConverterUser_secret': $(user_globalConverterUser_secret)
'user_globalExporterUser_id': $(user_globalExporterUser_id)
'user_globalExporterUser_secret': $(user_globalExporterUser_secret)
'user_globalImporterUser_id': $(user_globalImporterUser_id)
'user_globalImporterUser_secret': $(user_globalImporterUser_secret)
'user_globalReaderUser_id': $(user_globalReaderUser_id)
'user_globalReaderUser_secret': $(user_globalReaderUser_secret)
'user_globalWriterUser_id': $(user_globalWriterUser_id)
Expand Down
2 changes: 2 additions & 0 deletions build/jobs/provision-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
enableAadSmartOnFhirProxy = $true
enableExport = $true
enableConvertData = $true
enableImport = $true
backgroundTaskCount = 5
enableReindex = if ("${{ parameters.reindexEnabled }}" -eq "true") { $true } else { $false }
imageTag = '${{ parameters.imageTag }}'
}
Expand Down
25 changes: 22 additions & 3 deletions samples/templates/default-azuredeploy-docker.json
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,20 @@
"metadata": {
"description": "Determines whether the $reindex operation will be enabled for this fhir instance."
}
},
"enableImport": {
"type": "bool",
"defaultValue": false,
"metadata": {
"description": "Determines whether the $import operation will be enabled for this fhir instance."
}
},
"backgroundTaskCount": {
"type": "int",
"defaultValue": 1,
"metadata": {
"description": "Supports parallel background task running"
}
}
},
"variables": {
Expand All @@ -214,6 +228,7 @@
"storageBlobDataContributerRoleId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/', 'ba92f5b4-2d11-453d-a403-e96b0029c9fe')]",
"acrPullRoleId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/', '7f951dda-4ed3-4680-a7ca-43fe172d538d')]",
"blobStorageUri": "[if(variables('isMAG'), '.blob.core.usgovcloudapi.net', '.blob.core.windows.net')]",
"enableIntegrationStore": "[or(parameters('enableExport'), parameters('enableImport'))]",
"staticFhirServerConfigProperties": {
"APPINSIGHTS_PORTALINFO": "ASP.NETCORE",
"APPINSIGHTS_PROFILERFEATURE_VERSION": "1.0.0",
Expand All @@ -228,11 +243,15 @@
"SqlServer__Initialize": "[equals(parameters('solutionType'),'FhirServerSqlServer')]",
"SqlServer__SchemaOptions__AutomaticUpdatesEnabled": "[if(equals(parameters('sqlSchemaAutomaticUpdatesEnabled'),'auto'), true(), false())]",
"DataStore": "[if(equals(parameters('solutionType'),'FhirServerCosmosDB'), 'CosmosDb', 'SqlServer')]",
"TaskHosting__Enabled": "[if(equals(parameters('solutionType'),'FhirServerCosmosDB'), false(), parameters('enableImport'))]",
"TaskHosting__MaxRunningTaskCount": "[parameters('backgroundTaskCount')]",
"FhirServer__Operations__IntegrationDataStore__StorageAccountUri": "[if(parameters('enableImport'), concat('https://', variables('storageAccountName'), variables('blobStorageUri')), 'null')]",
"FhirServer__Operations__Export__Enabled": "[parameters('enableExport')]",
"FhirServer__Operations__Export__StorageAccountUri": "[if(parameters('enableExport'), concat('https://', variables('storageAccountName'), variables('blobStorageUri')), 'null')]",
"FhirServer__Operations__ConvertData__Enabled": "[parameters('enableConvertData')]",
"FhirServer__Operations__ConvertData__ContainerRegistryServers__0": "[if(parameters('enableConvertData'), concat(variables('azureContainerRegistryName'), variables('azureContainerRegistryUri')), 'null')]",
"FhirServer__Operations__Reindex__Enabled": "[parameters('enableReindex')]"
"FhirServer__Operations__Reindex__Enabled": "[parameters('enableReindex')]",
"FhirServer__Operations__Import__Enabled": "[parameters('enableImport')]"
},
"combinedFhirServerConfigProperties": "[union(variables('staticFhirServerConfigProperties'), parameters('additionalFhirServerConfigProperties'))]",
"computedSqlServerReference": "[concat('Microsoft.Sql/servers/', variables('serviceName'))]",
Expand Down Expand Up @@ -504,7 +523,7 @@
"properties": {
"supportsHttpsTrafficOnly": true
},
"condition": "[parameters('enableExport')]",
"condition": "[variables('enableIntegrationStore')]",
"dependsOn": [],
"sku": {
"name": "Standard_LRS"
Expand All @@ -516,7 +535,7 @@
"type": "Microsoft.Storage/storageAccounts/providers/roleAssignments",
"apiVersion": "2018-09-01-preview",
"name": "[concat(variables('storageAccountName'), '/Microsoft.Authorization/', guid(uniqueString(variables('storageAccountName'), parameters('fhirVersion'), variables('serviceName'))))]",
"condition": "[parameters('enableExport')]",
"condition": "[variables('enableIntegrationStore')]",
"dependsOn": [
"[variables('storageAccountName')]",
"[variables('serviceName')]"
Expand Down
25 changes: 22 additions & 3 deletions samples/templates/default-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,20 @@
"metadata": {
"description": "Determines whether the $reindex operation will be enabled for this fhir instance."
}
},
"enableImport": {
"type": "bool",
"defaultValue": false,
"metadata": {
"description": "Determines whether the $import operation will be enabled for this fhir instance."
}
},
"backgroundTaskCount": {
"type": "int",
"defaultValue": 1,
"metadata": {
"description": "Supports parallel background task running"
}
}
},
"variables": {
Expand All @@ -216,6 +230,7 @@
"storageBlobDataContributerRoleId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/', 'ba92f5b4-2d11-453d-a403-e96b0029c9fe')]",
"acrPullRoleId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/', '7f951dda-4ed3-4680-a7ca-43fe172d538d')]",
"blobStorageUri": "[if(variables('isMAG'), '.blob.core.usgovcloudapi.net', '.blob.core.windows.net')]",
"enableIntegrationStore": "[or(parameters('enableExport'), parameters('enableImport'))]",
"staticFhirServerConfigProperties": {
"APPINSIGHTS_PORTALINFO": "ASP.NETCORE",
"APPINSIGHTS_PROFILERFEATURE_VERSION": "1.0.0",
Expand All @@ -232,10 +247,14 @@
"SqlServer:SchemaOptions:AutomaticUpdatesEnabled": "[if(equals(parameters('sqlSchemaAutomaticUpdatesEnabled'),'auto'), true(), false())]",
"DataStore": "[if(equals(parameters('solutionType'),'FhirServerCosmosDB'), 'CosmosDb', 'SqlServer')]",
"FhirServer:Operations:Export:Enabled": "[parameters('enableExport')]",
"TaskHosting:Enabled": "[if(equals(parameters('solutionType'),'FhirServerCosmosDB'), false(), parameters('enableImport'))]",
"TaskHosting:MaxRunningTaskCount": "[parameters('backgroundTaskCount')]",
"FhirServer:Operations:IntegrationDataStore:StorageAccountUri": "[if(parameters('enableImport'), concat('https://', variables('storageAccountName'), variables('blobStorageUri')), 'null')]",
"FhirServer:Operations:Export:StorageAccountUri": "[if(parameters('enableExport'), concat('https://', variables('storageAccountName'), variables('blobStorageUri')), 'null')]",
"FhirServer:Operations:ConvertData:Enabled": "[parameters('enableConvertData')]",
"FhirServer:Operations:ConvertData:ContainerRegistryServers:0": "[if(parameters('enableConvertData'), concat(variables('azureContainerRegistryName'), variables('azureContainerRegistryUri')), 'null')]",
"FhirServer:Operations:Reindex:Enabled": "[parameters('enableReindex')]"
"FhirServer:Operations:Reindex:Enabled": "[parameters('enableReindex')]",
"FhirServer:Operations:Import:Enabled": "[parameters('enableImport')]"
},
"combinedFhirServerConfigProperties": "[union(variables('staticFhirServerConfigProperties'), parameters('additionalFhirServerConfigProperties'))]",
"computedSqlServerReference": "[concat('Microsoft.Sql/servers/', variables('serviceName'))]",
Expand Down Expand Up @@ -497,7 +516,7 @@
"properties": {
"supportsHttpsTrafficOnly": true
},
"condition": "[parameters('enableExport')]",
"condition": "[variables('enableIntegrationStore')]",
"dependsOn": [],
"sku": {
"name": "Standard_LRS"
Expand All @@ -509,7 +528,7 @@
"type": "Microsoft.Storage/storageAccounts/providers/roleAssignments",
"apiVersion": "2018-09-01-preview",
"name": "[concat(variables('storageAccountName'), '/Microsoft.Authorization/', guid(uniqueString(variables('storageAccountName'), parameters('fhirVersion'), variables('serviceName'))))]",
"condition": "[parameters('enableExport')]",
"condition": "[variables('enableIntegrationStore')]",
"dependsOn": [
"[variables('storageAccountName')]",
"[variables('serviceName')]"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Net;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;

namespace Microsoft.Health.Fhir.Api.Features.ActionResults
{
/// <summary>
/// Used to return the result of a bulk import operation.
/// </summary>
public class ImportResult : ResourceActionResult<ImportTaskResult>
{
public ImportResult(HttpStatusCode statusCode)
: base(null, statusCode)
{
}

public ImportResult(ImportTaskResult jobResult, HttpStatusCode statusCode)
: base(jobResult, statusCode)
{
EnsureArg.IsNotNull(jobResult, nameof(jobResult));
}

/// <summary>
/// Creates an ImportResult with HttpStatusCode Accepted.
/// </summary>
public static ImportResult Accepted()
{
return new ImportResult(HttpStatusCode.Accepted);
}

/// <summary>
/// Creates an ImportResult with HttpStatusCode Ok.
/// </summary>
/// <param name="taskResult">The job payload that must be returned as part of the ImportResult.</param>
public static ImportResult Ok(ImportTaskResult taskResult)
{
EnsureArg.IsNotNull(taskResult, nameof(taskResult));

return new ImportResult(taskResult, HttpStatusCode.OK);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using EnsureThat;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.TaskManagement;
using Newtonsoft.Json;

namespace Microsoft.Health.Fhir.Api.Features.BackgroundTaskService
{
/// <summary>
/// Factory to create different tasks.
/// </summary>
public class TaskFactory : ITaskFactory
{
private readonly IImportResourceLoader _importResourceLoader;
private readonly IResourceBulkImporter _resourceBulkImporter;
private readonly IImportErrorStoreFactory _importErrorStoreFactory;
private readonly IImportOrchestratorTaskDataStoreOperation _importOrchestratorTaskDataStoreOperation;
private readonly ISequenceIdGenerator<long> _sequenceIdGenerator;
private readonly IIntegrationDataStoreClient _integrationDataStoreClient;
private readonly ITaskManager _taskmanager;
private readonly IContextUpdaterFactory _contextUpdaterFactory;
private readonly RequestContextAccessor<IFhirRequestContext> _contextAccessor;
private readonly ILoggerFactory _loggerFactory;

public TaskFactory(
IImportResourceLoader importResourceLoader,
IResourceBulkImporter resourceBulkImporter,
IImportErrorStoreFactory importErrorStoreFactory,
IImportOrchestratorTaskDataStoreOperation importOrchestratorTaskDataStoreOperation,
IContextUpdaterFactory contextUpdaterFactory,
ITaskManager taskmanager,
ISequenceIdGenerator<long> sequenceIdGenerator,
IIntegrationDataStoreClient integrationDataStoreClient,
RequestContextAccessor<IFhirRequestContext> contextAccessor,
ILoggerFactory loggerFactory)
{
EnsureArg.IsNotNull(importResourceLoader, nameof(importResourceLoader));
EnsureArg.IsNotNull(resourceBulkImporter, nameof(resourceBulkImporter));
EnsureArg.IsNotNull(importErrorStoreFactory, nameof(importErrorStoreFactory));
EnsureArg.IsNotNull(importOrchestratorTaskDataStoreOperation, nameof(importOrchestratorTaskDataStoreOperation));
EnsureArg.IsNotNull(contextUpdaterFactory, nameof(contextUpdaterFactory));
EnsureArg.IsNotNull(taskmanager, nameof(taskmanager));
EnsureArg.IsNotNull(sequenceIdGenerator, nameof(sequenceIdGenerator));
EnsureArg.IsNotNull(integrationDataStoreClient, nameof(integrationDataStoreClient));
EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor));
EnsureArg.IsNotNull(loggerFactory, nameof(loggerFactory));

_importResourceLoader = importResourceLoader;
_resourceBulkImporter = resourceBulkImporter;
_importErrorStoreFactory = importErrorStoreFactory;
_importOrchestratorTaskDataStoreOperation = importOrchestratorTaskDataStoreOperation;
_sequenceIdGenerator = sequenceIdGenerator;
_integrationDataStoreClient = integrationDataStoreClient;
_taskmanager = taskmanager;
_contextUpdaterFactory = contextUpdaterFactory;
_contextAccessor = contextAccessor;
_loggerFactory = loggerFactory;
}

public ITask Create(TaskInfo taskInfo)
{
EnsureArg.IsNotNull(taskInfo, nameof(taskInfo));

if (taskInfo.TaskTypeId == ImportProcessingTask.ImportProcessingTaskId)
{
IContextUpdater contextUpdater = _contextUpdaterFactory.CreateContextUpdater(taskInfo.TaskId, taskInfo.RunId);
ImportProcessingTaskInputData inputData = JsonConvert.DeserializeObject<ImportProcessingTaskInputData>(taskInfo.InputData);
ImportProcessingProgress importProgress = string.IsNullOrEmpty(taskInfo.Context) ? new ImportProcessingProgress() : JsonConvert.DeserializeObject<ImportProcessingProgress>(taskInfo.Context);
return new ImportProcessingTask(
inputData,
importProgress,
_importResourceLoader,
_resourceBulkImporter,
_importErrorStoreFactory,
contextUpdater,
_contextAccessor,
_loggerFactory);
}

if (taskInfo.TaskTypeId == ImportOrchestratorTask.ImportOrchestratorTaskId)
{
IContextUpdater contextUpdater = _contextUpdaterFactory.CreateContextUpdater(taskInfo.TaskId, taskInfo.RunId);
ImportOrchestratorTaskInputData inputData = JsonConvert.DeserializeObject<ImportOrchestratorTaskInputData>(taskInfo.InputData);
ImportOrchestratorTaskContext orchestratorTaskProgress = string.IsNullOrEmpty(taskInfo.Context) ? new ImportOrchestratorTaskContext() : JsonConvert.DeserializeObject<ImportOrchestratorTaskContext>(taskInfo.Context);

return new ImportOrchestratorTask(
inputData,
orchestratorTaskProgress,
_taskmanager,
_sequenceIdGenerator,
contextUpdater,
_contextAccessor,
_importOrchestratorTaskDataStoreOperation,
_integrationDataStoreClient,
_loggerFactory);
}

return null;
}
}
}
Loading

0 comments on commit c677029

Please sign in to comment.