Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion api/Controllers/TriggerAnalysisController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,21 @@ await analysisMappingService.GetAnalysisTypeFromInspectionDescriptionAndTag(
);
shouldRunFencilla = true;
}
var shouldUploadToSTID = false;
if (plantData.RawDataBlobStorageLocation.BlobName.EndsWith(".jpeg"))
{
shouldUploadToSTID = true;
_logger.LogInformation(
"Raw data is a JPEG image, will upload to STID for InspectionId: {InspectionId}",
request.InspectionId
);
}

await argoWorkflowService.TriggerAnalysis(
plantData,
shouldRunConstantLevelOiler,
shouldRunFencilla
shouldRunFencilla,
shouldUploadToSTID
);

return Ok("Analysis workflow triggered successfully.");
Expand Down
12 changes: 11 additions & 1 deletion api/MQTT/MqttEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,20 @@ await AnalysisMappingService.GetAnalysisTypeFromInspectionDescriptionAndTag(
);
shouldRunFencilla = true;
}
var shouldUploadToSTID = false;
if (plantData.RawDataBlobStorageLocation.BlobName.EndsWith(".jpeg"))
{
shouldUploadToSTID = true;
_logger.LogInformation(
"Raw data is a JPEG image, will upload to STID for InspectionId: {InspectionId}",
isarInspectionResultMessage.InspectionId
);
}
await ArgoWorkflowService.TriggerAnalysis(
plantData,
shouldRunConstantLevelOiler,
shouldRunFencilla
shouldRunFencilla,
shouldUploadToSTID
);
}
catch (ArgumentException)
Expand Down
14 changes: 9 additions & 5 deletions api/Services/AnonymizerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public class TriggerArgoWorkflowAnalysisRequest(
BlobStorageLocation anonymizedBlobStorageLocation,
BlobStorageLocation visualizedBlobStorageLocation,
bool shouldRunConstantLevelOiler,
bool shouldRunFencilla
bool shouldRunFencilla,
bool shouldUploadToSTID
)
{
public string InspectionId { get; } = inspectionId;
Expand All @@ -21,15 +22,16 @@ bool shouldRunFencilla
visualizedBlobStorageLocation;
public bool ShouldRunConstantLevelOiler { get; } = shouldRunConstantLevelOiler;
public bool ShouldRunFencilla { get; } = shouldRunFencilla;
public bool ShouldUploadToSTID { get; } = shouldUploadToSTID;
}

public interface IArgoWorkflowService
{
public Task TriggerAnalysis(
PlantData data,
bool shouldRunConstantLevelOiler,
bool shouldRunFencilla
);
bool shouldRunFencilla,
bool shouldUploadToSTID);
}

public class ArgoWorkflowService(IConfiguration configuration, ILogger<ArgoWorkflowService> logger)
Expand All @@ -47,7 +49,8 @@ public class ArgoWorkflowService(IConfiguration configuration, ILogger<ArgoWorkf
public async Task TriggerAnalysis(
PlantData data,
bool shouldRunConstantLevelOiler,
bool shouldRunFencilla
bool shouldRunFencilla,
bool shouldUploadToSTID
)
{
var postRequestData = new TriggerArgoWorkflowAnalysisRequest(
Expand All @@ -56,7 +59,8 @@ bool shouldRunFencilla
data.AnonymizedBlobStorageLocation,
data.VisualizedBlobStorageLocation,
shouldRunConstantLevelOiler,
shouldRunFencilla
shouldRunFencilla,
shouldUploadToSTID
);

var json = JsonSerializer.Serialize(postRequestData, useCamelCaseOption);
Expand Down
49 changes: 47 additions & 2 deletions mocks/argo_workflow_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def trigger_analysis():
visualized_blob_storage_location = data.get("visualizedBlobStorageLocation")
should_run_constant_level_oiler = data.get("shouldRunConstantLevelOiler")
should_run_fencilla = data.get("shouldRunFencilla")
should_upload_to_stid = data.get("shouldUploadToSTID")

# Validate input
if (
Expand All @@ -47,6 +48,7 @@ def trigger_analysis():
or not visualized_blob_storage_location
or should_run_constant_level_oiler is None
or should_run_fencilla is None
or should_upload_to_stid is None
):
print("Missing required fields")
return jsonify({"error": "Missing required fields"}), 400
Expand All @@ -56,7 +58,12 @@ def trigger_analysis():
# Start the workflow notifications in a separate thread
threading.Thread(
target=start_workflow,
args=(inspection_id, should_run_constant_level_oiler, should_run_fencilla),
args=(
inspection_id,
should_run_constant_level_oiler,
should_run_fencilla,
should_upload_to_stid,
),
).start()

return jsonify({"message": "Trigger request received"}), 200
Expand All @@ -65,7 +72,12 @@ def trigger_analysis():
return jsonify({"error": "An error occurred"}), 500


def start_workflow(inspection_id, should_run_constant_level_oiler, should_run_fencilla):
def start_workflow(
inspection_id,
should_run_constant_level_oiler,
should_run_fencilla,
should_upload_to_stid,
):
try:
workflow_name = f"workflow-{random.randint(1000, 9999)}"
print(
Expand Down Expand Up @@ -93,6 +105,11 @@ def start_workflow(inspection_id, should_run_constant_level_oiler, should_run_fe
confidence = 0.95
notify_fencilla_done(inspection_id, is_break, confidence)

time.sleep(5)
if should_upload_to_stid:
# Notify STID upload done after 10 seconds
notify_stid_upload_done(inspection_id)

# Notify workflow exited after another 10 seconds
time.sleep(5)
notify_workflow_exited(inspection_id)
Expand Down Expand Up @@ -172,6 +189,34 @@ def notify_fencilla_done(inspection_id: str, is_break: bool, confidence: float):
print(f"Error in notify_fencilla_done: {e}")


def notify_stid_upload_done(inspection_id: str):
try:
url = "https://localhost:8100/Workflows/notify-stid-upload-done"
payload = {"inspectionId": inspection_id}
print(f"Sending PUT to {url} with data: {payload}")
response = requests.put(url, json=payload, verify=False)

if response.status_code == 200:
print("STID upload done notification sent successfully.")
else:
print(
f"Failed to notify STID upload done. Response {response.status_code}: {response.text}"
)
except Exception as e:
print(f"Error in notify_stid_upload_done: {e}")
print(f"Sending PUT to {url} with data: {payload}")
response = requests.put(url, json=payload, verify=False)

if response.status_code == 200:
print("STID upload done notification sent successfully.")
else:
print(
f"Failed to notify STID upload done. Response {response.status_code}: {response.text}"
)
except Exception as e:
print(f"Error in notify_stid_upload_done: {e}")


def notify_workflow_exited(inspection_id):
try:
url = "https://localhost:8100/Workflows/notify-workflow-exited"
Expand Down
Loading