Skip to content

[GOBBLIN-2200]Moving Away From Azkaban Execution Id #4108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,12 @@ public class ConfigurationKeys {
public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid";
public static final String AZKABAN_JOB_ID = "azkaban.job.id";
public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid";
// Configuration Key for setting a unique job execution identifier in GaaS, the value is a UUID
public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid";

// Configuration Key for to store has of gaas.job.execid, to be used for jobExecutionId for backwards compatibility
public static final String GAAS_JOB_EXEC_ID_HASH = "gaas.job.execid.hash";

public static final String AZKABAN_URL = "azkaban.link.execution.url";
public static final String AZKABAN_FLOW_URL = "azkaban.link.workflow.url";
public static final String AZKABAN_JOB_URL = "azkaban.link.job.url";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ private boolean isCurrentTimeInRange() {
*/
private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties jobProps) {
List<Tag<?>> metadataTags = Lists.newArrayList();
String jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, "");
String jobExecutionId = jobProps.getProperty(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, "");
//As a fallback setting the value of jobExecutionId to Azkaban Flow exec Id if GAAS_JOB_EXEC_ID is not set
if (Strings.isNullOrEmpty(jobExecutionId)) {
jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID,"");
}
// Display the proxy URL in the metadata tag if it exists
String jobExecutionUrl = jobProps.getProperty(AZKABAN_LINK_JOBEXEC_PROXY_URL, jobProps.getProperty(AZKABAN_LINK_JOBEXEC_URL, ""));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -116,6 +117,10 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl

String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, "");
String edgeId = ConfigUtils.getString(jobConfig, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
final UUID gaasJobExecutionUUID = UUID.randomUUID();
final String gaasJobExecutionId = gaasJobExecutionUUID.toString(); // Creating a unique Identifier for JobExecution
final int gaasJobExecutionIdHash = gaasJobExecutionId.hashCode(); // Passing the hashCode of the uniqueIdentifier to be used as jobExecutionId for backward compatibility

if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) || jobName.isEmpty()) {
// Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since
// job names are assumed to be unique within a dag.
Expand Down Expand Up @@ -152,6 +157,8 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl
.withValue(ConfigurationKeys.FLOW_FAILURE_OPTION, ConfigValueFactory.fromAnyRef(flowFailureOption))
.withValue(ConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(edgeId))
.withValue(FlowSpec.MODIFICATION_TIME_KEY, ConfigValueFactory.fromAnyRef(flowModTime))
.withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID, ConfigValueFactory.fromAnyRef(gaasJobExecutionId)) // Setting a unique Identifier for jobExecution
.withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, ConfigValueFactory.fromAnyRef(gaasJobExecutionIdHash))
);

//Add tracking config to JobSpec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -77,6 +78,26 @@ public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException
}
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
}
@Test
public void testGaaSJobExecutionIdInjection() throws URISyntaxException, IOException {
Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = jobExecutionPlans.stream()
.map(Dag.DagNode<JobExecutionPlan>::new)
.collect(Collectors.toList());
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
// Assertion to test that GaaS job execution Id has been successfully injected
for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
final String gaasJobExecutionId = jobExecutionPlan.getJobSpec().getConfig().getString(ConfigurationKeys.GAAS_JOB_EXEC_ID);
final Long gaasJobExecutionIdHash = Long.parseLong(jobExecutionPlan.getJobSpec().getConfig().getString(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH));

Assert.assertNotNull(gaasJobExecutionId);
Assert.assertEquals(gaasJobExecutionId.length(), 36);
Assert.assertNotNull(gaasJobExecutionIdHash);
}
}

@Test
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
public class Help {
public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5;
public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
public static final String USER_TO_PROXY_KEY = "user.to.proxy";
public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
Expand Down Expand Up @@ -104,9 +103,9 @@ public static String calcPerExecQualifierWithOptFlowExecId(FileSystemJobStateful
public static String calcPerExecQualifier(Config workerConfig) {
String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY)
? workerConfig.getString(USER_TO_PROXY_KEY) : "";
String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY)
? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) : UUID.randomUUID().toString();
return userToProxy + "_" + azFlowExecId;
String gaasFlowExecId = workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID)
? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) : UUID.randomUUID().toString();
return userToProxy + "_" + gaasFlowExecId;
}

public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
Expand Down Expand Up @@ -279,4 +278,4 @@ public static void reportTroubleshooterIssues(AutomaticTroubleshooter troublesho
troubleshooter.logIssueSummary();
troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class AzkabanTags {
.put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId")
.put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId")
.put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId")
.put(ConfigurationKeys.GAAS_JOB_EXEC_ID,"gaasJobExecId")
.put(ConfigurationKeys.AZKABAN_URL, "azkabanURL")
.put(ConfigurationKeys.AZKABAN_FLOW_URL, "azkabanFlowURL")
.put(ConfigurationKeys.AZKABAN_JOB_URL, "azkabanJobURL")
Expand Down