Skip to content

Commit 6bb9479

Browse files
[b/407529689] More Oozie info (#839)
* add info, coord jobs oozie tasks * add oozie protocol url * remove unused http calls * add explicit sorting * get oozie url from env varibale * fix style * fix oozie sortby parameter * fix oozie offset
1 parent dd32a09 commit 6bb9479

File tree

11 files changed

+384
-53
lines changed

11 files changed

+384
-53
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/hadoop/oozie/OozieJobsTask.java renamed to dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/hadoop/oozie/AbstractOozieJobsTask.java

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
2626
import com.google.edwmigration.dumper.application.dumper.task.AbstractTask;
2727
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
28+
import java.lang.reflect.ParameterizedType;
2829
import java.util.Date;
2930
import java.util.List;
3031
import java.util.concurrent.TimeUnit;
@@ -33,77 +34,86 @@
3334
import org.apache.commons.beanutils.PropertyUtils;
3435
import org.apache.commons.csv.CSVFormat;
3536
import org.apache.commons.csv.CSVPrinter;
36-
import org.apache.oozie.client.WorkflowJob;
37+
import org.apache.oozie.client.OozieClientException;
3738
import org.apache.oozie.client.XOozieClient;
3839
import org.slf4j.Logger;
3940
import org.slf4j.LoggerFactory;
4041

41-
public class OozieJobsTask extends AbstractTask<Void> {
42-
private static final Logger logger = LoggerFactory.getLogger(OozieJobsTask.class);
42+
public abstract class AbstractOozieJobsTask<J> extends AbstractTask<Void> {
43+
private static final Logger logger = LoggerFactory.getLogger(AbstractOozieJobsTask.class);
4344
private static final ObjectMapper objectMapper = new ObjectMapper();
4445

4546
private final int maxDaysToFetch;
4647
private final long initialTimestamp;
48+
private final Class<J> oozieJobClass;
4749

48-
public OozieJobsTask(int maxDaysToFetch) {
49-
this(maxDaysToFetch, System.currentTimeMillis());
50-
}
51-
52-
OozieJobsTask(int maxDaysToFetch, long initialTimestamp) {
53-
super("oozie_jobs.csv");
50+
AbstractOozieJobsTask(String fileName, int maxDaysToFetch, long initialTimestamp) {
51+
super(fileName);
5452
Preconditions.checkArgument(
5553
maxDaysToFetch >= 1,
5654
String.format("Amount of days must be a positive number. Got %d.", maxDaysToFetch));
5755

5856
this.maxDaysToFetch = maxDaysToFetch;
5957
this.initialTimestamp = initialTimestamp;
58+
this.oozieJobClass =
59+
(Class<J>)
60+
((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
6061
}
6162

62-
// todo jobs params in filter
63-
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START);
64-
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END);
6563
@CheckForNull
6664
@Override
6765
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Handle handle)
6866
throws Exception {
69-
final CSVFormat csvFormat = newCsvFormatForClass(WorkflowJob.class);
67+
final CSVFormat csvFormat = newCsvFormatForClass(oozieJobClass);
7068
final ImmutableList<String> csvHeader = ImmutableList.copyOf(csvFormat.getHeader());
7169

7270
try (CSVPrinter printer = csvFormat.print(sink.asCharSink(UTF_8).openBufferedStream())) {
7371
XOozieClient oozieClient = ((OozieHandle) handle).getOozieClient();
7472
final int batchSize = context.getArguments().getPaginationPageSize();
75-
int offset = 0;
73+
int offset = 1; // starts with 1, not 0.
7674
long lastJobEndTimestamp = initialTimestamp;
7775

7876
logger.info(
79-
"Start fetch Oozie jobs for last {}d starts from {}ms", maxDaysToFetch, initialTimestamp);
77+
"Start fetching Oozie jobs for type {} for the last {}d starting from {}ms",
78+
oozieJobClass.getSimpleName(),
79+
maxDaysToFetch,
80+
initialTimestamp);
8081

8182
while (initialTimestamp - lastJobEndTimestamp < TimeUnit.DAYS.toMillis(maxDaysToFetch)) {
82-
List<WorkflowJob> jobsInfo = oozieClient.getJobsInfo(null, offset, batchSize);
83-
for (WorkflowJob workflowJob : jobsInfo) {
84-
Object[] record = toCSVRecord(workflowJob, csvHeader);
83+
List<J> jobs = fetchJobs(oozieClient, null, null, offset, batchSize);
84+
for (J job : jobs) {
85+
Object[] record = toCSVRecord(job, csvHeader);
8586
printer.printRecord(record);
8687
}
8788

88-
if (jobsInfo.isEmpty()) {
89+
if (jobs.isEmpty()) {
8990
break;
9091
}
9192

92-
WorkflowJob lastJob = jobsInfo.get(jobsInfo.size() - 1);
93-
if (lastJob.getEndTime() == null) {
93+
J lastJob = jobs.get(jobs.size() - 1);
94+
Date endTime = getJobEndDateTime(lastJob);
95+
if (endTime == null) {
9496
break;
9597
}
96-
lastJobEndTimestamp = lastJob.getEndTime().getTime();
97-
offset += jobsInfo.size();
98+
lastJobEndTimestamp = endTime.getTime();
99+
offset += jobs.size();
98100
}
99101

100102
printer.println();
101103
}
102104
return null;
103105
}
104106

105-
private static Object[] toCSVRecord(WorkflowJob job, ImmutableList<String> header)
106-
throws Exception {
107+
// todo jobs params in filter
108+
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START);
109+
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END);
110+
abstract List<J> fetchJobs(
111+
XOozieClient oozieClient, Date startDate, Date endDate, int start, int len)
112+
throws OozieClientException;
113+
114+
abstract Date getJobEndDateTime(J job);
115+
116+
private static Object[] toCSVRecord(Object job, ImmutableList<String> header) throws Exception {
107117
Object[] record = new Object[header.size()];
108118
for (int i = 0; i < header.size(); i++) {
109119
record[i] = PropertyUtils.getProperty(job, header.get(i));

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/hadoop/oozie/OozieClientFactory.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ protected static String getAuthOption(String user, String password) throws Oozie
6060
}
6161
return AuthType.BASIC.name();
6262
}
63-
String authOpt = getEnvProperty(ENV_OOZIE_AUTH);
64-
logger.debug("Auth type for Oozie client: {} base on " + ENV_OOZIE_AUTH, authOpt);
63+
String authOpt = getEnvVariable(ENV_OOZIE_AUTH);
64+
logger.debug("Auth type for Oozie client: {} base on {} env var", authOpt, ENV_OOZIE_AUTH);
6565
if (AuthType.BASIC.name().equalsIgnoreCase(authOpt)) {
6666
throw new OozieCLIException("BASIC authentication requires -user and -password to set!");
6767
}
@@ -70,11 +70,12 @@ protected static String getAuthOption(String user, String password) throws Oozie
7070

7171
protected static String getOozieUrlFromEnv() {
7272
// oozie CLI expect this env variable, so we use it as a fallback
73-
String url = getEnvProperty(ENV_OOZIE_URL);
73+
String url = getEnvVariable(ENV_OOZIE_URL);
7474
if (url == null) {
7575
throw new IllegalArgumentException(
7676
"Oozie URL is not available neither in command option nor in the environment");
7777
}
78+
logger.debug("Oozie URL: {} base on {} env var", url, ENV_OOZIE_URL);
7879
return url;
7980
}
8081

@@ -108,6 +109,11 @@ protected static void setRetryCount(OozieClient oozieClient) {
108109
}
109110
}
110111

112+
/** It's just a wrapper for {@code System.getenv(property)} Extracted to mock in unit-tests. */
113+
protected static String getEnvVariable(String name) {
114+
return System.getenv(name);
115+
}
116+
111117
/**
112118
* It's just a wrapper for {@code System.getProperty(property)} Extracted to mock in unit-tests.
113119
*/

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/hadoop/oozie/OozieConnector.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ public void addTasksTo(@Nonnull List<? super Task<?>> out, @Nonnull ConnectorArg
6565
throws Exception {
6666
out.add(new DumpMetadataTask(arguments, FORMAT_NAME));
6767
out.add(new FormatTask(FORMAT_NAME));
68-
out.add(new OozieJobsTask(MAX_QUARTER_DAY));
68+
out.add(new OozieInfoTask());
69+
out.add(new OozieServersTask());
70+
out.add(new OozieWorkflowJobsTask(MAX_QUARTER_DAY));
71+
out.add(new OozieCoordinatorJobsTask(MAX_QUARTER_DAY));
6972
}
7073

7174
@Nonnull
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;
18+
19+
import java.util.Date;
20+
import java.util.List;
21+
import org.apache.oozie.client.CoordinatorJob;
22+
import org.apache.oozie.client.OozieClientException;
23+
import org.apache.oozie.client.XOozieClient;
24+
25+
public class OozieCoordinatorJobsTask extends AbstractOozieJobsTask<CoordinatorJob> {
26+
27+
public OozieCoordinatorJobsTask(int maxDaysToFetch) {
28+
this(maxDaysToFetch, System.currentTimeMillis());
29+
}
30+
31+
OozieCoordinatorJobsTask(int maxDaysToFetch, long initialTimestamp) {
32+
super("oozie_coord_jobs.csv", maxDaysToFetch, initialTimestamp);
33+
}
34+
35+
@Override
36+
List<CoordinatorJob> fetchJobs(
37+
XOozieClient oozieClient, Date startDate, Date endDate, int start, int len)
38+
throws OozieClientException {
39+
return oozieClient.getCoordJobsInfo("sortby=endTime;", start, len);
40+
}
41+
42+
@Override
43+
Date getJobEndDateTime(CoordinatorJob job) {
44+
return job.getEndTime();
45+
}
46+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;
18+
19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
21+
import com.google.common.io.ByteSink;
22+
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
23+
import com.google.edwmigration.dumper.application.dumper.task.AbstractTask;
24+
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
25+
import javax.annotation.CheckForNull;
26+
import javax.annotation.Nonnull;
27+
import org.apache.commons.csv.CSVFormat;
28+
import org.apache.commons.csv.CSVPrinter;
29+
import org.apache.oozie.client.XOozieClient;
30+
31+
public class OozieInfoTask extends AbstractTask<Void> {
32+
33+
public OozieInfoTask() {
34+
super("oozie_info.csv");
35+
}
36+
37+
@CheckForNull
38+
@Override
39+
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Handle handle)
40+
throws Exception {
41+
final CSVFormat csvFormat = FORMAT.withHeader(CSVHeader.class);
42+
try (CSVPrinter printer = csvFormat.print(sink.asCharSink(UTF_8).openBufferedStream())) {
43+
XOozieClient oozieClient = ((OozieHandle) handle).getOozieClient();
44+
printer.printRecord(
45+
oozieClient.getOozieUrl(),
46+
oozieClient.getProtocolUrl(),
47+
oozieClient.getServerBuildVersion());
48+
}
49+
return null;
50+
}
51+
52+
enum CSVHeader {
53+
URL,
54+
ProtocolUrl,
55+
ServerBuildVersion
56+
}
57+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;
18+
19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
21+
import com.google.common.io.ByteSink;
22+
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
23+
import com.google.edwmigration.dumper.application.dumper.task.AbstractTask;
24+
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
25+
import java.util.Map;
26+
import java.util.Map.Entry;
27+
import javax.annotation.CheckForNull;
28+
import javax.annotation.Nonnull;
29+
import org.apache.commons.csv.CSVFormat;
30+
import org.apache.commons.csv.CSVPrinter;
31+
import org.apache.oozie.client.XOozieClient;
32+
33+
public class OozieServersTask extends AbstractTask<Void> {
34+
35+
public OozieServersTask() {
36+
super("oozie_servers.csv");
37+
}
38+
39+
@CheckForNull
40+
@Override
41+
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Handle handle)
42+
throws Exception {
43+
final CSVFormat csvFormat = FORMAT.withHeader(CSVHeader.class);
44+
try (CSVPrinter printer = csvFormat.print(sink.asCharSink(UTF_8).openBufferedStream())) {
45+
XOozieClient oozieClient = ((OozieHandle) handle).getOozieClient();
46+
Map<String, String> servers = oozieClient.getAvailableOozieServers();
47+
for (Entry<String, String> entry : servers.entrySet()) {
48+
printer.printRecord(entry.getKey(), entry.getValue());
49+
}
50+
}
51+
return null;
52+
}
53+
54+
enum CSVHeader {
55+
Server,
56+
URL
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;
18+
19+
import java.util.Date;
20+
import java.util.List;
21+
import org.apache.oozie.client.OozieClientException;
22+
import org.apache.oozie.client.WorkflowJob;
23+
import org.apache.oozie.client.XOozieClient;
24+
25+
public class OozieWorkflowJobsTask extends AbstractOozieJobsTask<WorkflowJob> {
26+
27+
public OozieWorkflowJobsTask(int maxDaysToFetch) {
28+
this(maxDaysToFetch, System.currentTimeMillis());
29+
}
30+
31+
OozieWorkflowJobsTask(int maxDaysToFetch, long initialTimestamp) {
32+
super("oozie_workflow_jobs.csv", maxDaysToFetch, initialTimestamp);
33+
}
34+
35+
@Override
36+
List<WorkflowJob> fetchJobs(
37+
XOozieClient oozieClient, Date startDate, Date endDate, int start, int len)
38+
throws OozieClientException {
39+
return oozieClient.getJobsInfo("sortby=endTime;", start, len);
40+
}
41+
42+
@Override
43+
Date getJobEndDateTime(WorkflowJob job) {
44+
return job.getEndTime();
45+
}
46+
}

0 commit comments

Comments
 (0)