Skip to content

Complete action listener on failed synchronous workflow provisioning #1098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Enhancements
### Bug Fixes
- Change REST status codes for RBAC and provisioning ([#1083](https://github.com/opensearch-project/flow-framework/pull/1083))
- Complete action listener on failed synchronous workflow provisioning ([#1098](https://github.com/opensearch-project/opensearch-remote-metadata-sdk/pull/1098))

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,11 @@

@Override
public void onFailure(Exception e) {
WorkflowTimeoutUtility.handleFailure(workflowId, e, isResponseSent, listener);
WorkflowTimeoutUtility.handleFailure(workflowId, e, listener);
}
}, true);
} catch (Exception ex) {
WorkflowTimeoutUtility.handleFailure(workflowId, ex, isResponseSent, listener);
WorkflowTimeoutUtility.handleFailure(workflowId, ex, listener);

Check warning on line 359 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L359

Added line #L359 was not covered by tests
}
}, client.threadPool().executor(PROVISION_WORKFLOW_THREAD_POOL));

Expand Down Expand Up @@ -469,9 +469,25 @@
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.FAILED);
}, exceptionState -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exceptionState); })
if (isSyncExecution) {
listener.onFailure(new FlowFrameworkException(errorMessage, status));
} else {
TenantAwareHelper.releaseProvision(tenantId);

Check warning on line 475 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L475

Added line #L475 was not covered by tests
}
}, exceptionState -> {
logger.error("Failed to update workflow state for workflow {}", workflowId, exceptionState);

Check warning on line 478 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L478

Added line #L478 was not covered by tests
if (isSyncExecution) {
listener.onFailure(

Check warning on line 480 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L480

Added line #L480 was not covered by tests
new FlowFrameworkException(
errorMessage + ". Failed to update workflow state after execution failure.",
RestStatus.INTERNAL_SERVER_ERROR
)
);
} else {
TenantAwareHelper.releaseProvision(tenantId);

Check warning on line 487 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L487

Added line #L487 was not covered by tests
}
})

Check warning on line 489 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L489

Added line #L489 was not covered by tests
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
private final ThreadPool threadPool;
private final Client client;
private final SdkClient sdkClient;
private final WorkflowStepFactory workflowStepFactory;
private final WorkflowProcessSorter workflowProcessSorter;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final FlowFrameworkSettings flowFrameworkSettings;
Expand Down Expand Up @@ -126,7 +125,6 @@
this.threadPool = threadPool;
this.client = client;
this.sdkClient = sdkClient;
this.workflowStepFactory = workflowStepFactory;
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
Expand Down Expand Up @@ -357,11 +355,11 @@

@Override
public void onFailure(Exception e) {
WorkflowTimeoutUtility.handleFailure(workflowId, e, isResponseSent, listener);
WorkflowTimeoutUtility.handleFailure(workflowId, e, listener);
}
}, true);
} catch (Exception ex) {
WorkflowTimeoutUtility.handleFailure(workflowId, ex, isResponseSent, listener);
WorkflowTimeoutUtility.handleFailure(workflowId, ex, listener);

Check warning on line 362 in src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java#L362

Added line #L362 was not covered by tests
}
}, threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL));
WorkflowTimeoutUtility.scheduleTimeoutHandler(
Expand Down Expand Up @@ -494,7 +492,24 @@
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.FAILED);
}, exceptionState -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exceptionState); })
if (isSyncExecution) {
listener.onFailure(new FlowFrameworkException(errorMessage, status));
} else {
TenantAwareHelper.releaseProvision(template.getTenantId());
}
}, exceptionState -> {
logger.error("Failed to update workflow state for workflow {}", workflowId, exceptionState);

Check warning on line 501 in src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java#L501

Added line #L501 was not covered by tests
if (isSyncExecution) {
listener.onFailure(

Check warning on line 503 in src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java#L503

Added line #L503 was not covered by tests
new FlowFrameworkException(
errorMessage + ". Failed to update workflow state after execution failure.",
RestStatus.INTERNAL_SERVER_ERROR
)
);
} else {
TenantAwareHelper.releaseProvision(template.getTenantId());

Check warning on line 510 in src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java#L510

Added line #L510 was not covered by tests
}
})

Check warning on line 512 in src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java#L512

Added line #L512 was not covered by tests
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateRequest;
import org.opensearch.flowframework.transport.WorkflowResponse;
Expand Down Expand Up @@ -164,25 +165,13 @@
*
* @param workflowId The unique identifier of the workflow.
* @param e The exception that occurred during workflow execution.
* @param isResponseSent An atomic boolean to ensure the response is sent only once.
* @param listener The listener to notify of the workflow failure.
*/
public static void handleFailure(
String workflowId,
Exception e,
AtomicBoolean isResponseSent,
ActionListener<WorkflowResponse> listener
) {
// Check if the failure has already been reported, and report it only if it hasn't been reported yet.
if (isResponseSent.compareAndSet(false, true)) {
FlowFrameworkException exception = new FlowFrameworkException(
"Failed to execute workflow " + workflowId,
ExceptionsHelper.status(e)
);
listener.onFailure(exception);
} else {
logger.info("Ignoring onFailure for workflowId: {} as timeout already occurred", workflowId);
}
public static void handleFailure(String workflowId, Exception e, ActionListener<WorkflowResponse> listener) {
FlowFrameworkException exception = e instanceof FlowFrameworkException
? (FlowFrameworkException) e
: new FlowFrameworkException("Failed to execute workflow " + workflowId, ExceptionsHelper.status(e));
listener.onFailure(exception);
}

/**
Expand All @@ -207,8 +196,16 @@
new GetWorkflowStateRequest(workflowId, false, tenantId),
ActionListener.wrap(
response -> listener.onResponse(new WorkflowResponse(workflowId, response.getWorkflowState())),
exception -> listener.onFailure(
new FlowFrameworkException("Failed to get workflow state after timeout", ExceptionsHelper.status(exception))
// we don't want to fail the listener as provisioning is still ongoing
exception -> listener.onResponse(

Check warning on line 200 in src/main/java/org/opensearch/flowframework/util/WorkflowTimeoutUtility.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/util/WorkflowTimeoutUtility.java#L200

Added line #L200 was not covered by tests
new WorkflowResponse(
workflowId,
WorkflowState.builder()
.workflowId(workflowId)
.error("Workflow timed out, failed to fetch current state")
.state("UNKNOWN")
.build()

Check warning on line 207 in src/main/java/org/opensearch/flowframework/util/WorkflowTimeoutUtility.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/util/WorkflowTimeoutUtility.java#L203-L207

Added lines #L203 - L207 were not covered by tests
)
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -27,13 +29,16 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.index.get.GetResult;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -257,4 +262,58 @@ public void testFailedToRetrieveTemplateFromGlobalContext() {
assertEquals("Failed to get template 1", exceptionCaptor.getValue().getMessage());
}

public void testProvisionWorkflowExecutionException() {

String workflowId = "1";
@SuppressWarnings("unchecked")
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, Collections.emptyMap(), TimeValue.timeValueSeconds(5));

// Bypass client.get and stub success case
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
this.template.toXContent(builder, null);
BytesReference templateBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(GLOBAL_CONTEXT_INDEX, workflowId, 1, 1, 1, true, templateBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(template);

// Bypass isWorkflowNotStarted and force true response
doAnswer(invocation -> {
Consumer<Optional<ProvisioningProgress>> progressConsumer = invocation.getArgument(2);
progressConsumer.accept(Optional.of(ProvisioningProgress.NOT_STARTED));
return null;
}).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
ActionListener<UpdateResponse> actionListener = invocation.getArgument(3);
actionListener.onResponse(mock(UpdateResponse.class));
return null;
}).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), nullable(String.class), anyMap(), any());

doAnswer(invocation -> {
ActionListener<IndexResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true));
return null;
}).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any(), anyBoolean());

// Create a failed future for the workflow execution
PlainActionFuture<WorkflowData> failedFuture = PlainActionFuture.newFuture();
failedFuture.onFailure(new RuntimeException("Simulated failure during workflow execution"));
ProcessNode failedProcessNode = mock(ProcessNode.class);
when(failedProcessNode.execute()).thenReturn(failedFuture);
when(workflowProcessSorter.sortProcessNodes(any(), any(), any(), any())).thenReturn(Collections.singletonList(failedProcessNode));

provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);

ArgumentCaptor<FlowFrameworkException> responseCaptor = ArgumentCaptor.forClass(FlowFrameworkException.class);
verify(listener, times(1)).onFailure(responseCaptor.capture());
assertTrue(responseCaptor.getValue().getMessage().startsWith("Simulated failure during workflow execution"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.flowframework.transport;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
Expand All @@ -27,7 +28,9 @@
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.PluginsService;
import org.opensearch.remote.metadata.client.SdkClient;
Expand Down Expand Up @@ -335,4 +338,65 @@ public void testFailedWorkflowStateRetrieval() throws Exception {
assertEquals("Failed to get workflow state for workflow 1", exceptionCaptor.getValue().getMessage());
}

public void testReprovisionWorkflowExecutionException() throws Exception {
String workflowId = "1";

Template mockTemplate = mock(Template.class);
Workflow mockWorkflow = mock(Workflow.class);
Map<String, Workflow> mockWorkflows = new HashMap<>();
mockWorkflows.put(PROVISION_WORKFLOW, mockWorkflow);

// Stub validations
when(mockTemplate.workflows()).thenReturn(mockWorkflows);
when(workflowProcessSorter.sortProcessNodes(any(), any(), any(), any())).thenReturn(List.of());
doNothing().when(workflowProcessSorter).validate(any(), any());
when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(mockTemplate);

// Stub state and resources created
doAnswer(invocation -> {
ActionListener<GetWorkflowStateResponse> listener = invocation.getArgument(2);
WorkflowState state = mock(WorkflowState.class);
ResourceCreated resourceCreated = new ResourceCreated("stepName", workflowId, "resourceType", "resourceId");
when(state.getState()).thenReturn(State.COMPLETED.toString());
when(state.resourcesCreated()).thenReturn(List.of(resourceCreated));
when(state.getError()).thenReturn(null);
listener.onResponse(new GetWorkflowStateResponse(state, true));
return null;
}).when(client).execute(any(), any(GetWorkflowStateRequest.class), any());

// Create a failed future for the workflow execution
PlainActionFuture<WorkflowData> failedFuture = PlainActionFuture.newFuture();
failedFuture.onFailure(new RuntimeException("Simulated failure during workflow execution"));
ProcessNode failedProcessNode = mock(ProcessNode.class);
when(failedProcessNode.execute()).thenReturn(failedFuture);
WorkflowStep mockStep = mock(WorkflowStep.class);
when(mockStep.getName()).thenReturn("FakeStep");
when(failedProcessNode.workflowStep()).thenReturn(mockStep);

// Stub reprovision sequence creation with the failed process node
when(workflowProcessSorter.createReprovisionSequence(any(), any(), any(), any(), any())).thenReturn(List.of(failedProcessNode));

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
ActionListener<UpdateResponse> actionListener = invocation.getArgument(3);
actionListener.onResponse(mock(UpdateResponse.class));
return null;
}).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), nullable(String.class), anyMap(), any());

@SuppressWarnings("unchecked")
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
ReprovisionWorkflowRequest request = new ReprovisionWorkflowRequest(
workflowId,
mockTemplate,
mockTemplate,
TimeValue.timeValueSeconds(5)
);

reprovisionWorkflowTransportAction.doExecute(mock(Task.class), request, listener);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertTrue(exceptionCaptor.getValue().getMessage().startsWith("Simulated failure during workflow execution"));
}

}
Loading