Skip to content

Commit 53ac6fa

Browse files
authored
[Part 15]: Resource Cluster Abstraction and an implementation based on Akka actor (#166)
* [Part 15]: Resource Cluster Abstraction and an implementation based on Akka actor Co-authored-by: Sundaram Ananthanarayanan <[email protected]>
1 parent 425caa6 commit 53ac6fa

File tree

23 files changed

+1464
-21
lines changed

23 files changed

+1464
-21
lines changed

mantis-common/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17+
apply plugin: 'java-test-fixtures'
18+
1719
ext {
1820
jacksonVersion = '2.10.+'
1921
jctoolsVersion = '1.+'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2022 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.mantisrx.common.util;
18+
19+
import java.time.Clock;
20+
import java.time.Instant;
21+
import java.time.ZoneId;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
import lombok.RequiredArgsConstructor;
24+
25+
@RequiredArgsConstructor
26+
public class DelegateClock extends Clock {
27+
private final AtomicReference<Clock> delegate;
28+
29+
@Override
30+
public ZoneId getZone() {
31+
return delegate.get().getZone();
32+
}
33+
34+
@Override
35+
public Clock withZone(ZoneId zone) {
36+
return delegate.get().withZone(zone);
37+
}
38+
39+
@Override
40+
public Instant instant() {
41+
return delegate.get().instant();
42+
}
43+
}

mantis-control-plane/mantis-control-plane-core/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
apply plugin: 'java-test-fixtures'
1617

1718
ext {
1819

mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/WorkerId.java

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator;
2020
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnore;
2121
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
22+
import io.mantisrx.shaded.org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
2223
import java.io.Serializable;
2324
import java.util.Optional;
2425
import org.slf4j.Logger;
@@ -98,6 +99,11 @@ public static Optional<WorkerId> fromId(final String id) {
9899
return Optional.empty();
99100
}
100101

102+
@VisibleForTesting
103+
public static WorkerId fromIdUnsafe(String id) {
104+
return fromId(id).get();
105+
}
106+
101107
public String getJobCluster() {
102108
return jobCluster;
103109
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2022 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.mantisrx.server.master.resourcecluster;
18+
19+
import io.mantisrx.common.Ack;
20+
import io.mantisrx.runtime.MachineDefinition;
21+
import io.mantisrx.server.core.domain.WorkerId;
22+
import io.mantisrx.server.worker.TaskExecutorGateway;
23+
import java.time.Instant;
24+
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
26+
import javax.annotation.Nullable;
27+
import lombok.Value;
28+
29+
/**
30+
* Abstraction to deal with all interactions with the resource cluster such as
31+
* 1). listing the set of task executors registered
32+
* 2). listing the set of task executors available
33+
* 3). listing the set of task executors busy
34+
* 4). get the current state of a task executor
35+
* 5). get the current state of the system
36+
* 6). assign a task executor for a given worker
37+
*/
38+
public interface ResourceCluster extends ResourceClusterGateway {
39+
/**
40+
* API that gets invoked when the resource cluster migrates from one machine to another and needs to be initialized.
41+
*
42+
* @param taskExecutorID taskExecutorID that was originally running the worker
43+
* @param workerId workerID of the task that being run on the task executor
44+
* @return Ack when the initialization is done
45+
*/
46+
CompletableFuture<Ack> initializeTaskExecutor(TaskExecutorID taskExecutorID, WorkerId workerId);
47+
48+
CompletableFuture<List<TaskExecutorID>> getRegisteredTaskExecutors();
49+
50+
CompletableFuture<List<TaskExecutorID>> getAvailableTaskExecutors();
51+
52+
CompletableFuture<List<TaskExecutorID>> getBusyTaskExecutors();
53+
54+
CompletableFuture<List<TaskExecutorID>> getUnregisteredTaskExecutors();
55+
56+
CompletableFuture<ResourceOverview> resourceOverview();
57+
58+
/**
59+
* Can throw {@link NoResourceAvailableException} wrapped within the CompletableFuture in case there
60+
* are no task executors.
61+
*
62+
* @param machineDefinition machine definition that's requested for the worker
63+
* @param workerId worker id of the task that's going to run on the node.
64+
* @return task executor assigned for the particular task.
65+
*/
66+
CompletableFuture<TaskExecutorID> getTaskExecutorFor(MachineDefinition machineDefinition, WorkerId workerId);
67+
68+
CompletableFuture<TaskExecutorGateway> getTaskExecutorGateway(TaskExecutorID taskExecutorID);
69+
70+
CompletableFuture<TaskExecutorRegistration> getTaskExecutorInfo(String hostName);
71+
72+
CompletableFuture<TaskExecutorRegistration> getTaskExecutorInfo(TaskExecutorID taskExecutorID);
73+
74+
CompletableFuture<TaskExecutorStatus> getTaskExecutorState(TaskExecutorID taskExecutorID);
75+
76+
class NoResourceAvailableException extends Exception {
77+
78+
public NoResourceAvailableException(String message) {
79+
super(message);
80+
}
81+
}
82+
83+
@Value
84+
class ResourceOverview {
85+
long numRegisteredTaskExecutors;
86+
long numAvailableTaskExecutors;
87+
long numOccupiedTaskExecutors;
88+
long numAssignedTaskExecutors;
89+
}
90+
91+
@Value
92+
class TaskExecutorStatus {
93+
TaskExecutorRegistration registration;
94+
boolean isRegistered;
95+
boolean isRunningTask;
96+
boolean isAssignedTask;
97+
@Nullable
98+
WorkerId workerId;
99+
Instant lastHeartbeat;
100+
}
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2022 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.mantisrx.server.master.resourcecluster;
18+
19+
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.concurrent.ConcurrentMap;
21+
import javax.annotation.Nullable;
22+
23+
/**
24+
* This is a helper abstraction to find cluster ids for task executors.
25+
* For the foreseeable future, we expect only one implementation of this particular class.
26+
*/
27+
public interface ResourceClusterTaskExecutorMapper {
28+
@Nullable
29+
ClusterID getClusterFor(TaskExecutorID taskExecutorID);
30+
31+
void onTaskExecutorDiscovered(ClusterID clusterID, TaskExecutorID taskExecutorID);
32+
33+
static ResourceClusterTaskExecutorMapper inMemory() {
34+
return new ResourceClusterTaskExecutorMapper() {
35+
private final ConcurrentMap<TaskExecutorID, ClusterID> map =
36+
new ConcurrentHashMap<>();
37+
38+
@Override
39+
public ClusterID getClusterFor(TaskExecutorID taskExecutorID) {
40+
return map.get(taskExecutorID);
41+
}
42+
43+
@Override
44+
public void onTaskExecutorDiscovered(ClusterID clusterID, TaskExecutorID taskExecutorID) {
45+
map.putIfAbsent(taskExecutorID, clusterID);
46+
}
47+
};
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2022 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.mantisrx.server.master.resourcecluster;
18+
19+
import java.util.Set;
20+
import java.util.concurrent.CompletableFuture;
21+
22+
/**
23+
* ResourceClusters is a factory class for getting and managing individual resource clusters.
24+
*/
25+
public interface ResourceClusters {
26+
ResourceCluster getClusterFor(ClusterID clusterID);
27+
28+
CompletableFuture<Set<ClusterID>> listActiveClusters();
29+
}

mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/TestingRpcService.java renamed to mantis-control-plane/mantis-control-plane-core/src/testFixtures/java/io/mantisrx/server/core/TestingRpcService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.mantisrx.server.worker;
16+
package io.mantisrx.server.core;
1717

1818
import static org.apache.flink.util.Preconditions.checkNotNull;
1919

mantis-control-plane/mantis-control-plane-server/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ dependencies {
4848
testCompile libraries.junit4
4949
testCompile libraries.mockitoAll
5050
testCompile "org.testng:testng:$testngVersion"
51+
testImplementation testFixtures(project(":mantis-common"))
52+
testImplementation testFixtures(project(":mantis-control-plane:mantis-control-plane-core"))
5153
}
5254

5355
mainClassName = "io.mantisrx.server.master.MasterMain"

0 commit comments

Comments
 (0)