Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
*/
package com.netflix.maestro.server.controllers;

import com.netflix.maestro.flow.engine.FlowExecutor;
import com.netflix.maestro.flow.models.FlowDef;
import com.netflix.maestro.flow.runtime.FlowOperation;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.validation.Valid;
Expand Down Expand Up @@ -41,11 +41,11 @@
consumes = MediaType.APPLICATION_JSON_VALUE)
public class FlowEngineController {

private final FlowExecutor flowExecutor;
private final FlowOperation flowOperation;

@Autowired
public FlowEngineController(FlowExecutor flowExecutor) {
this.flowExecutor = flowExecutor;
public FlowEngineController(FlowOperation flowOperation) {
this.flowOperation = flowOperation;
}

public record StartFlowRequest(String flowId, FlowDef flowDef, Map<String, Object> flowInput) {}
Expand All @@ -58,7 +58,7 @@ public String startFlow(
@PathVariable("groupId") long groupId,
@Valid @NotNull @PathVariable("flowReference") String flowReference,
@Valid @NotNull @RequestBody StartFlowRequest request) {
return flowExecutor.startFlow(
return flowOperation.startFlow(
groupId, request.flowId(), flowReference, request.flowDef(), request.flowInput());
}

Expand All @@ -71,7 +71,7 @@ public Boolean wakeUp(
@Valid @NotNull @PathVariable("flowReference") String flowReference,
@Valid @NotNull @PathVariable("taskReference") String taskReference,
@PathVariable("code") int code) {
return flowExecutor.wakeUp(groupId, flowReference, taskReference, code);
return flowOperation.wakeUp(groupId, flowReference, taskReference, code);
}

@PostMapping(
Expand All @@ -82,6 +82,6 @@ public Boolean wakeUp(
@PathVariable("groupId") long groupId,
@PathVariable("code") int code,
@Valid @NotNull @RequestBody Set<String> refs) {
return refs.stream().allMatch(ref -> flowExecutor.wakeUp(groupId, ref, null, code));
return flowOperation.wakeUp(groupId, refs, code);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@
*/
package com.netflix.maestro.server.controllers;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.netflix.maestro.MaestroBaseTest;
import com.netflix.maestro.flow.engine.FlowExecutor;
import com.netflix.maestro.flow.runtime.FlowOperation;
import com.netflix.maestro.server.controllers.FlowEngineController.StartFlowRequest;
import java.util.Map;
import java.util.Set;
Expand All @@ -33,15 +30,15 @@

public class FlowEngineControllerTest extends MaestroBaseTest {

@Mock private FlowExecutor mockFlowExecutor;
@Mock private FlowOperation mockFlowOperation;
private final long groupId = 123L;
private final int code = 200;

private FlowEngineController flowEngineController;

@Before
public void before() {
this.flowEngineController = new FlowEngineController(mockFlowExecutor);
this.flowEngineController = new FlowEngineController(mockFlowOperation);
}

@Test
Expand All @@ -51,13 +48,13 @@ public void testStartFlow() {
StartFlowRequest request = new StartFlowRequest(flowId, null, Map.of());
String expectedResult = "flow-execution-id";

when(mockFlowExecutor.startFlow(
when(mockFlowOperation.startFlow(
eq(groupId), eq(flowId), eq(flowReference), isNull(), eq(Map.of())))
.thenReturn(expectedResult);

String result = flowEngineController.startFlow(groupId, flowReference, request);

verify(mockFlowExecutor, times(1))
verify(mockFlowOperation, times(1))
.startFlow(eq(groupId), eq(flowId), eq(flowReference), isNull(), eq(Map.of()));
Assert.assertEquals(expectedResult, result);
}
Expand All @@ -66,27 +63,24 @@ public void testStartFlow() {
public void testWakeUpSingleTask() {
String flowReference = "test-flow-ref";
String taskReference = "test-task-ref";
when(mockFlowExecutor.wakeUp(eq(groupId), eq(flowReference), eq(taskReference), eq(code)))
when(mockFlowOperation.wakeUp(eq(groupId), eq(flowReference), eq(taskReference), eq(code)))
.thenReturn(true);

Boolean result = flowEngineController.wakeUp(groupId, flowReference, taskReference, code);

verify(mockFlowExecutor, times(1))
verify(mockFlowOperation, times(1))
.wakeUp(eq(groupId), eq(flowReference), eq(taskReference), eq(code));
assert result.equals(true);
}

@Test
public void testWakeUpMultipleFlows() {
Set<String> refs = Set.of("flow-ref-1", "flow-ref-2", "flow-ref-3");
when(mockFlowExecutor.wakeUp(anyLong(), anyString(), eq(null), anyInt())).thenReturn(true);
when(mockFlowOperation.wakeUp(eq(groupId), eq(refs), eq(code))).thenReturn(true);

Boolean result = flowEngineController.wakeUp(groupId, code, refs);

verify(mockFlowExecutor, times(3)).wakeUp(anyLong(), anyString(), eq(null), anyInt());
verify(mockFlowExecutor, times(1)).wakeUp(eq(groupId), eq("flow-ref-1"), eq(null), eq(code));
verify(mockFlowExecutor, times(1)).wakeUp(eq(groupId), eq("flow-ref-2"), eq(null), eq(code));
verify(mockFlowExecutor, times(1)).wakeUp(eq(groupId), eq("flow-ref-3"), eq(null), eq(code));
verify(mockFlowOperation, times(1)).wakeUp(eq(groupId), eq(refs), eq(code));
assert result.equals(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.netflix.maestro.MaestroBaseTest;
import com.netflix.maestro.flow.dao.MaestroFlowDao;
import com.netflix.maestro.flow.engine.FlowExecutor;
import com.netflix.maestro.flow.models.FlowGroup;
import com.netflix.maestro.flow.properties.FlowEngineProperties;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.springframework.web.client.RestTemplate;

Expand Down Expand Up @@ -78,4 +83,92 @@ public void testWakeUpMultipleFlowsWithActionCode() {
assertFalse(result);
verify(flowExecutor, times(3)).wakeUp(eq(groupId), anyString(), isNull(), eq(actionCode));
}

@Test
public void testWakeUpSingleTaskRoutesToRemotePod() {
String remoteAddress = "http://remote-pod:8080";
FlowGroup remoteGroup = new FlowGroup(groupId, 1L, remoteAddress, System.currentTimeMillis());
when(flowDao.getGroup(groupId)).thenReturn(remoteGroup);
when(restTemplate.postForObject(
anyString(), isNull(), eq(Boolean.class), any(), any(), any(), any()))
.thenReturn(Boolean.TRUE);

boolean result = flowOperation.wakeUp(groupId, flowReference, taskReference, actionCode);

assertTrue(result);
verify(flowExecutor, never()).wakeUp(anyLong(), anyString(), any(), anyInt());
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate)
.postForObject(
urlCaptor.capture(), isNull(), eq(Boolean.class), any(), any(), any(), any());
assertTrue(urlCaptor.getValue().startsWith(remoteAddress));
assertTrue(urlCaptor.getValue().contains("/tasks/"));
assertTrue(urlCaptor.getValue().contains("/notify/"));
}

@Test
public void testWakeUpMultipleFlowsRoutesToRemotePod() {
String remoteAddress = "http://remote-pod:8080";
FlowGroup remoteGroup = new FlowGroup(groupId, 1L, remoteAddress, System.currentTimeMillis());
Set<String> flowReferences = Set.of("flow1", "flow2");
when(flowDao.getGroup(groupId)).thenReturn(remoteGroup);
when(restTemplate.postForObject(
anyString(), eq(flowReferences), eq(Boolean.class), any(), any()))
.thenReturn(Boolean.TRUE);

boolean result = flowOperation.wakeUp(groupId, flowReferences, actionCode);

assertTrue(result);
verify(flowExecutor, never()).wakeUp(anyLong(), anyString(), any(), anyInt());
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate)
.postForObject(urlCaptor.capture(), eq(flowReferences), eq(Boolean.class), any(), any());
String capturedUrl = urlCaptor.getValue();
assertTrue(capturedUrl.startsWith(remoteAddress));
assertTrue(capturedUrl.contains("/flows/notify/"));
}

@Test
public void testWakeUpSingleTaskReturnsFalseWhenRemoteFails() {
String remoteAddress = "http://remote-pod:8080";
FlowGroup remoteGroup = new FlowGroup(groupId, 1L, remoteAddress, System.currentTimeMillis());
when(flowDao.getGroup(groupId)).thenReturn(remoteGroup);
when(restTemplate.postForObject(
anyString(), isNull(), eq(Boolean.class), any(), any(), any(), any()))
.thenReturn(null);

boolean result = flowOperation.wakeUp(groupId, flowReference, taskReference, actionCode);

assertFalse(result);
verify(flowExecutor, never()).wakeUp(anyLong(), anyString(), any(), anyInt());
}

@Test
public void testWakeUpMultipleFlowsReturnsFalseWhenRemoteFails() {
String remoteAddress = "http://remote-pod:8080";
FlowGroup remoteGroup = new FlowGroup(groupId, 1L, remoteAddress, System.currentTimeMillis());
Set<String> flowReferences = Set.of("flow1", "flow2");
when(flowDao.getGroup(groupId)).thenReturn(remoteGroup);
when(restTemplate.postForObject(
anyString(), eq(flowReferences), eq(Boolean.class), any(), any()))
.thenReturn(null);

boolean result = flowOperation.wakeUp(groupId, flowReferences, actionCode);

assertFalse(result);
verify(flowExecutor, never()).wakeUp(anyLong(), anyString(), any(), anyInt());
}

@Test
public void testWakeUpLocalWhenGroupAddressMatchesLocal() {
FlowGroup localGroup = new FlowGroup(groupId, 1L, "localhost:8080", System.currentTimeMillis());
when(flowDao.getGroup(groupId)).thenReturn(localGroup);

boolean result = flowOperation.wakeUp(groupId, flowReference, taskReference, actionCode);

assertTrue(result);
verify(flowExecutor, times(1)).wakeUp(groupId, flowReference, taskReference, actionCode);
verify(restTemplate, never())
.postForObject(anyString(), any(), eq(Boolean.class), (Object[]) any());
}
}