Skip to content

Commit 443b707

Browse files
[b/407529689] Feature/ooze bundle jobs (#844)
* [b/407765680] Better UX for dumper connector tasks failures due to lack of Kerberos auth * add bundle jobs to oozie connector * add java doc * change java doc --------- Co-authored-by: Stanislav Jordanov <stanjo@google.com>
1 parent 711bc73 commit 443b707

File tree

5 files changed

+113
-1
lines changed

5 files changed

+113
-1
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/hadoop/oozie/AbstractOozieJobsTask.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public abstract class AbstractOozieJobsTask<J> extends AbstractTask<Void> {
4343
private static final Logger logger = LoggerFactory.getLogger(AbstractOozieJobsTask.class);
4444
private static final ObjectMapper objectMapper = new ObjectMapper();
4545

46+
private static final int INITIAL_OOZIE_JOBS_OFFSET = 1; // starts with 1, not 0.
47+
4648
private final int maxDaysToFetch;
4749
private final long initialTimestamp;
4850
private final Class<J> oozieJobClass;
@@ -70,7 +72,7 @@ protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Ha
7072
try (CSVPrinter printer = csvFormat.print(sink.asCharSink(UTF_8).openBufferedStream())) {
7173
XOozieClient oozieClient = ((OozieHandle) handle).getOozieClient();
7274
final int batchSize = context.getArguments().getPaginationPageSize();
73-
int offset = 1; // starts with 1, not 0.
75+
int offset = INITIAL_OOZIE_JOBS_OFFSET;
7476
long lastJobEndTimestamp = initialTimestamp;
7577

7678
logger.info(
@@ -107,6 +109,14 @@ protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Ha
107109
// todo jobs params in filter
108110
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START);
109111
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END);
112+
/**
113+
* Method is expected to do a call to Oozie server to fetch jobs in a ranger [{@code startDate} -
114+
* {@code endDate}].
115+
*
116+
* @param oozieClient - Oozie client initialised to particular Oozie server
117+
* @param start jobs offset pass to {@code oozieClient}
118+
* @param len number of jobs to return
119+
*/
110120
abstract List<J> fetchJobs(
111121
XOozieClient oozieClient, Date startDate, Date endDate, int start, int len)
112122
throws OozieClientException;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;
18+
19+
import java.util.Date;
20+
import java.util.List;
21+
import org.apache.oozie.client.BundleJob;
22+
import org.apache.oozie.client.OozieClientException;
23+
import org.apache.oozie.client.XOozieClient;
24+
25+
public class OozieBundleJobsTask extends AbstractOozieJobsTask<BundleJob> {
26+
public OozieBundleJobsTask(int maxDaysToFetch) {
27+
super("oozie_bundle_jobs.csv", maxDaysToFetch, System.currentTimeMillis());
28+
}
29+
30+
@Override
31+
List<BundleJob> fetchJobs(
32+
XOozieClient oozieClient, Date startDate, Date endDate, int start, int len)
33+
throws OozieClientException {
34+
return oozieClient.getBundleJobsInfo("sortby=endTime;", start, len);
35+
}
36+
37+
@Override
38+
Date getJobEndDateTime(BundleJob job) {
39+
return job.getEndTime();
40+
}
41+
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/hadoop/oozie/OozieConnector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void addTasksTo(@Nonnull List<? super Task<?>> out, @Nonnull ConnectorArg
6969
out.add(new OozieServersTask());
7070
out.add(new OozieWorkflowJobsTask(MAX_QUARTER_DAY));
7171
out.add(new OozieCoordinatorJobsTask(MAX_QUARTER_DAY));
72+
out.add(new OozieBundleJobsTask(MAX_QUARTER_DAY));
7273
}
7374

7475
@Nonnull
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.verifyNoMoreInteractions;
23+
24+
import org.apache.oozie.client.BundleJob;
25+
import org.apache.oozie.client.XOozieClient;
26+
import org.junit.Test;
27+
28+
public class OozieBundleJobsTaskTest {
29+
private final OozieBundleJobsTask task = new OozieBundleJobsTask(5);
30+
31+
@Test
32+
public void fileName() {
33+
assertEquals("oozie_bundle_jobs.csv", task.getTargetPath());
34+
}
35+
36+
@Test
37+
public void fetchJobs_success() throws Exception {
38+
XOozieClient oozieClient = mock(XOozieClient.class);
39+
40+
// Act
41+
task.fetchJobs(oozieClient, null, null, 2, 14);
42+
43+
// Verify
44+
verify(oozieClient).getBundleJobsInfo("sortby=endTime;", 2, 14);
45+
verifyNoMoreInteractions(oozieClient);
46+
}
47+
48+
@Test
49+
public void getJobEndTime_success() throws Exception {
50+
BundleJob job = mock(BundleJob.class);
51+
52+
// Act
53+
task.getJobEndDateTime(job);
54+
55+
// Verify
56+
verify(job).getEndTime();
57+
verifyNoMoreInteractions(job);
58+
}
59+
}

dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hadoop/oozie/OozieConnectorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void addTasksTo_checkFilesCategory() throws Exception {
4747
"compilerworks-format.txt", TaskCategory.REQUIRED,
4848
"oozie_info.csv", TaskCategory.REQUIRED,
4949
"oozie_coord_jobs.csv", TaskCategory.REQUIRED,
50+
"oozie_bundle_jobs.csv", TaskCategory.REQUIRED,
5051
"oozie_servers.csv", TaskCategory.REQUIRED,
5152
"oozie_workflow_jobs.csv", TaskCategory.REQUIRED);
5253
List<Task<?>> tasks = new ArrayList<>();

0 commit comments

Comments
 (0)