diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 6e07afac1c5..4bec7d2013b 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -1047,6 +1047,12 @@ public class ConfigurationKeys { public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid"; public static final String AZKABAN_JOB_ID = "azkaban.job.id"; public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid"; + // Configuration Key for setting a unique job execution identifier in GaaS, the value is a UUID + public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid"; + + // Configuration Key for storing hash of gaas.job.execid, to be used as jobExecutionId(integer) for backwards compatibility + public static final String GAAS_JOB_EXEC_ID_HASH = "gaas.job.execid.hash"; + public static final String AZKABAN_URL = "azkaban.link.execution.url"; public static final String AZKABAN_FLOW_URL = "azkaban.link.workflow.url"; public static final String AZKABAN_JOB_URL = "azkaban.link.job.url"; diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java index 5da49e90bcd..8fa93b77f8a 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java @@ -385,7 +385,11 @@ private boolean isCurrentTimeInRange() { */ private static List> addAdditionalMetadataTags(Properties jobProps) { List> metadataTags = Lists.newArrayList(); - String jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, ""); + String jobExecutionId = jobProps.getProperty(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, ""); + //As a fallback setting the value of jobExecutionId to Azkaban Flow exec Id if GAAS_JOB_EXEC_ID is not set + if (Strings.isNullOrEmpty(jobExecutionId)) { + jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID,""); + } // Display the proxy URL in the metadata tag if it exists String jobExecutionUrl = jobProps.getProperty(AZKABAN_LINK_JOBEXEC_PROXY_URL, jobProps.getProperty(AZKABAN_LINK_JOBEXEC_URL, "")); @@ -407,7 +411,7 @@ private static List> addAdditionalMetadataTags(Properties jobPr metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId))); - //Use azkaban.flow.execid as the jobExecutionId + //Use gaas.job.execid.hash as the jobExecutionId metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId)); metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java index ea07cf435b0..bbd728a04f7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -19,6 +19,7 @@ import java.net.URI; import java.net.URISyntaxException; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -116,6 +117,9 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, ""); String edgeId = ConfigUtils.getString(jobConfig, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ""); + final String gaasJobExecutionId = UUID.randomUUID().toString(); // Creating a unique Identifier for JobExecution + final int gaasJobExecutionIdHash = gaasJobExecutionId.hashCode(); // Passing the hashCode of the uniqueIdentifier to be used as jobExecutionId for backward compatibility + if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) || jobName.isEmpty()) { // Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since // job names are assumed to be unique within a dag. @@ -152,6 +156,8 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl .withValue(ConfigurationKeys.FLOW_FAILURE_OPTION, ConfigValueFactory.fromAnyRef(flowFailureOption)) .withValue(ConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(edgeId)) .withValue(FlowSpec.MODIFICATION_TIME_KEY, ConfigValueFactory.fromAnyRef(flowModTime)) + .withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID, ConfigValueFactory.fromAnyRef(gaasJobExecutionId)) // Setting a unique Identifier for jobExecution + .withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, ConfigValueFactory.fromAnyRef(gaasJobExecutionIdHash)) ); //Add tracking config to JobSpec. diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java index 7b128f120c9..1ef11af1814 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java @@ -45,6 +45,7 @@ import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -78,6 +79,27 @@ public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException Mockito.verifyNoMoreInteractions(dagManagementStateStore); } + @Test + public void testGaaSJobExecutionIdInjection() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List jobExecutionPlans = getJobExecutionPlans(); + List> dagNodeList = jobExecutionPlans.stream() + .map(Dag.DagNode::new) + .collect(Collectors.toList()); + Dag dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + // Assertion to test that GaaS job execution Id has been successfully injected + for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) { + final String gaasJobExecutionId = jobExecutionPlan.getJobSpec().getConfig().getString(ConfigurationKeys.GAAS_JOB_EXEC_ID); + final Long gaasJobExecutionIdHash = Long.parseLong(jobExecutionPlan.getJobSpec().getConfig().getString(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH)); + + Assert.assertNotNull(gaasJobExecutionId); + Assert.assertEquals(gaasJobExecutionId.length(), 36); + Assert.assertNotNull(gaasJobExecutionIdHash); + } + } + @Test public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException { Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java index 2c3b34fce04..2a2cf170494 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java @@ -59,7 +59,6 @@ public class Help { public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5; public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000; - public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid"; public static final String USER_TO_PROXY_KEY = "user.to.proxy"; public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy"; public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey"; @@ -104,9 +103,9 @@ public static String calcPerExecQualifierWithOptFlowExecId(FileSystemJobStateful public static String calcPerExecQualifier(Config workerConfig) { String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY) ? workerConfig.getString(USER_TO_PROXY_KEY) : ""; - String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY) - ? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) : UUID.randomUUID().toString(); - return userToProxy + "_" + azFlowExecId; + String gaasFlowExecId = workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID) + ? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) : UUID.randomUUID().toString(); + return userToProxy + "_" + gaasFlowExecId; } public static FileSystem loadFileSystem(FileSystemApt a) throws IOException { @@ -279,4 +278,4 @@ public static void reportTroubleshooterIssues(AutomaticTroubleshooter troublesho troubleshooter.logIssueSummary(); troubleshooter.reportJobIssuesAsEvents(eventSubmitter); } -} +} \ No newline at end of file diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java index e629c372deb..14b75da2abb 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java @@ -38,6 +38,7 @@ public class AzkabanTags { .put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId") .put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId") .put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId") + .put(ConfigurationKeys.GAAS_JOB_EXEC_ID,"gaasJobExecId") .put(ConfigurationKeys.AZKABAN_URL, "azkabanURL") .put(ConfigurationKeys.AZKABAN_FLOW_URL, "azkabanFlowURL") .put(ConfigurationKeys.AZKABAN_JOB_URL, "azkabanJobURL")