diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java new file mode 100644 index 00000000000..de1e78f0240 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Responsible for defining the behavior of {@link DagTask} handling scenarios for launch, resume, kill, job start + * and flow completion deadlines + * + */ +public interface DagManagement { + + void launchFlow(); + void resumeFlow(); + void killFlow(); + void enforceFlowCompletionDeadline(); + void enforceJobStartDeadline(); +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java new file mode 100644 index 00000000000..c8b4b7623c1 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * Responsible to performing the actual work for a given {@link DagTask}. + * It processes the {@link DagTask} by first initializing its state, performing actions + * like updating {@link DagStateStore} and finally submiting an event to the executor. + * @param current state of the dag node + * @param result after processing the dag node + */ +public abstract class DagProc { + abstract protected S initialize() throws MaybeRetryableException; + abstract protected R act(S state) throws MaybeRetryableException; + abstract protected void sendNotification(R result) throws MaybeRetryableException; + + final void process() { + throw new UnsupportedOperationException(" Process unsupported"); + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java new file mode 100644 index 00000000000..30ab0c457b5 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Factory for creating {@link DagProc} based on the visitor type {@link DagTask}. + */ +public interface DagProcFactory extends DagTaskVisitor { + DagProc meet(LaunchDagTask ldt); + DagProc meet(KillDagTask kdt); + DagProc meet(ResumeDagTask rdt); + DagProc createFor(DagTask t); +} + diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java new file mode 100644 index 00000000000..33ada0d00ea --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Defines an individual task or job in a Dag. + * It carries the state information required by {@link DagProc} to for its processing. + * Upon completion of the {@link DagProc#process()} it will mark the lease + * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} as complete + * @param + */ +abstract class DagTask { + + abstract void initialize(); + abstract void conclude(); + abstract T host(DagTaskVisitor visitor); +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java new file mode 100644 index 00000000000..3dcc6e8b5a2 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.util.Iterator; + + +/** + * Holds a stream of {@link DagTask} that needs to be processed by the {@link DagManager}. + * It provides an implementation for {@link DagManagement} defines the rules for a flow and job. + * Implements {@link Iterator} to provide the next {@link DagTask} if available to {@link DagManager} + */ +public class DagTaskStream implements Iterator, DagManagement { + @Override + public boolean hasNext() { + return false; + } + + @Override + public DagTask next() { + return null; + } + + @Override + public void launchFlow() { + + } + + @Override + public void resumeFlow() { + + } + + @Override + public void killFlow() { + + } + + @Override + public void enforceFlowCompletionDeadline() { + + } + + @Override + public void enforceJobStartDeadline() { + + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java new file mode 100644 index 00000000000..1aa34ca831a --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Interface defining {@link DagTask} based on the type of visitor. + * @param + */ +public interface DagTaskVisitor { + T meet(LaunchDagTask launchDagTask); + T meet(KillDagTask killDagTask); + T meet(ResumeDagTask resumeDagTask); +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagProc.java new file mode 100644 index 00000000000..aacf4a0ab84 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagProc.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * An implmentation of {@link DagProc} for killing {@link DagTask}. + */ +public final class KillDagProc extends DagProc{ + @Override + protected Object initialize() throws MaybeRetryableException { + return null; + } + + @Override + protected Object act(Object state) throws MaybeRetryableException { + return null; + } + + @Override + protected void sendNotification(Object result) throws MaybeRetryableException { + + } +} + diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagTask.java new file mode 100644 index 00000000000..013acf6906a --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; +/** + * A {@link DagTask} responsible to handle kill tasks. + */ +public class KillDagTask extends DagTask { + @Override + void initialize() { + + } + + @Override + void conclude() { + + } + + @Override + Object host(DagTaskVisitor visitor) { + return null; + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagProc.java new file mode 100644 index 00000000000..31109162173 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagProc.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * An implmentation of {@link DagProc} for launching {@link DagTask}. + */ +public final class LaunchDagProc extends DagProc{ + @Override + protected Object initialize() throws MaybeRetryableException { + return null; + } + + @Override + protected Object act(Object state) throws MaybeRetryableException { + return null; + } + + @Override + protected void sendNotification(Object result) throws MaybeRetryableException { + + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagTask.java new file mode 100644 index 00000000000..77c59b206df --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; +/** + * A {@link DagTask} responsible to handle launch tasks. + */ +public class LaunchDagTask extends DagTask { + @Override + void initialize() { + + } + + @Override + void conclude() { + + } + + @Override + Object host(DagTaskVisitor visitor) { + return null; + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java new file mode 100644 index 00000000000..48458c75fba --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +public class NewDagManagerBoilerPlate { + + private DagTaskStream dagTaskStream; + private DagProc dagProc; + + public DagTask getNextTask() { + return dagTaskStream.next(); + } + + public static class DagManagerThread implements Runnable { + + @Override + public void run() { + submitNextDagTask(); + } + } + + public static void submitNextDagTask() { + throw new UnsupportedOperationException("Not yet supported"); + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ResumeDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ResumeDagProc.java new file mode 100644 index 00000000000..a7c17193063 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ResumeDagProc.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * An implmentation of {@link DagProc} for resuming {@link DagTask}. + */ +public final class ResumeDagProc extends DagProc{ + @Override + protected Object initialize() throws MaybeRetryableException { + return null; + } + + @Override + protected Object act(Object state) throws MaybeRetryableException { + return null; + } + + @Override + protected void sendNotification(Object result) throws MaybeRetryableException { + + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ResumeDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ResumeDagTask.java new file mode 100644 index 00000000000..fd041bc8525 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ResumeDagTask.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * A {@link DagTask} responsible to handle resume tasks. + */ +public class ResumeDagTask extends DagTask { + @Override + void initialize() { + + } + + @Override + void conclude() { + + } + + @Override + Object host(DagTaskVisitor visitor) { + return null; + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/MaybeRetryableException.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/MaybeRetryableException.java new file mode 100644 index 00000000000..50c145b76ce --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/MaybeRetryableException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.exception; + +/** + * Exception defined for handling the retries while processing {@link org.apache.gobblin.service.modules.orchestration.DagProc} + */ +public abstract class MaybeRetryableException extends Throwable { +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/NonRetryableException.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/NonRetryableException.java new file mode 100644 index 00000000000..f9c22d483f9 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/NonRetryableException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.exception; + +/** + * Extension of {@link MaybeRetryableException} marking it as non-retryable. + */ +public final class NonRetryableException extends MaybeRetryableException { +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/RetryableException.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/RetryableException.java new file mode 100644 index 00000000000..61b20b13cab --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/RetryableException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.exception; + +/** + * Extension of {@link MaybeRetryableException} marking it as retryable. + */ +public final class RetryableException extends MaybeRetryableException { +}