Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[b/402028795] Oozie connector #793

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,10 @@ class OverridesUtil {
"software.amazon.eventstream:*": [
projectUrl: "https://github.com/awslabs/aws-eventstream-java",
licenseUrl: "https://raw.githubusercontent.com/awslabs/aws-eventstream-java/refs/heads/master/LICENSE"
],
"com.google.code.javaparser:javaparser": [
projectUrl: "http://code.google.com/p/javaparser/",
licenseUrl: "https://www.gnu.org/licenses/lgpl-3.0.txt"
]
]
}
Expand Down
50 changes: 29 additions & 21 deletions dumper/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ dependencies {
implementation libs.httpclient5
implementation libs.aws.java.sdk.redshift
implementation libs.aws.java.sdk.cloudwatch
implementation('org.apache.oozie:oozie-client:5.2.1') {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have this and its transient dependencies on airlock?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you give me the list of the transients I can check

exclude group: 'javax.jms', module: 'jms'
// exclude group: 'org.slf4j', module: 'jcl-over-slf4j'
// exclude group: 'org.slf4j', module: 'slf4j-simple'
exclude group: 'junit', module: 'junit'
exclude group: 'guru.nidi', module: 'graphviz-java'
}

runtimeOnly libs.logback.classic
runtimeOnly libs.jcl.over.slf4j
Expand All @@ -89,6 +96,7 @@ dependencies {
testImplementation libs.mockito.core
testImplementation libs.mockito.inline
testImplementation libs.mockito.junit.jupiter
testImplementation libs.junit
testImplementation libs.wiremock


Expand Down Expand Up @@ -187,24 +195,24 @@ tasks.named('distZip') {
dependsOn 'installDist'
}

tasks.register('generateSourceMirror', Copy) {
from {
dependencies.createArtifactResolutionQuery()
.forComponents(
configurations.runtimeClasspath.incoming.resolutionResult
.allDependencies.collect { it.selected.id }
)
.withArtifacts(JvmLibrary, SourcesArtifact)
.execute()
.resolvedComponents
.collectMany {
it.artifactResults
.collect { it.file.path }
}
}
into layout.buildDirectory.dir('mirror/sources')
outputs.dir "mirror/sources"
}
//tasks.register('generateSourceMirror', Copy) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe comment why we are not deleting this code?

// from {
// dependencies.createArtifactResolutionQuery()
// .forComponents(
// configurations.runtimeClasspath.incoming.resolutionResult
// .allDependencies.collect { it.selected.id }
// )
// .withArtifacts(JvmLibrary, SourcesArtifact)
// .execute()
// .resolvedComponents
// .collectMany {
// it.artifactResults
// .collect { it.file.path }
// }
// }
// into layout.buildDirectory.dir('mirror/sources')
// outputs.dir "mirror/sources"
//}

tasks.register('copyGceLauncher', Copy) {
from 'src/main/sh/cloud_extractor/gce_launcher.sh'
Expand Down Expand Up @@ -233,9 +241,9 @@ distributions {
from(generateLicenseReport) {
into "docs/licenses"
}
from(generateSourceMirror) {
into "docs/sources"
}
// from(generateSourceMirror) {
// into "docs/sources"
// }
}
}
}
Expand Down
276 changes: 0 additions & 276 deletions dumper/app/gradle.lockfile

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2022-2025 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;

import static org.apache.oozie.cli.OozieCLI.ENV_OOZIE_URL;
import static org.apache.oozie.cli.OozieCLI.OOZIE_RETRY_COUNT;
import static org.apache.oozie.cli.OozieCLI.WS_HEADER_PREFIX;

import java.util.Map;
import org.apache.oozie.cli.OozieCLI;
import org.apache.oozie.cli.OozieCLIException;
import org.apache.oozie.client.AuthOozieClient;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.XOozieClient;

/**
* Factory does initialization of Oozie client in a similar but simplified way compare to {@link
* OozieCLI#main(String[])}.
*/
public class OozieClientFactory {
public static XOozieClient createXOozieClient() throws OozieCLIException {
String oozieUrl = getOozieUrl();
// String authOption = getAuthOption(commandLine);
XOozieClient wc = new AuthOozieClient(oozieUrl, null);

addHeaders(wc);
setRetryCount(wc);
return wc;
}

protected static String getOozieUrl() {
String url = null; // commandLine.getOptionValue(OOZIE_OPTION);
if (url == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is not the if statement always true?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is always true

url = System.getenv(ENV_OOZIE_URL);
if (url == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be moved out from the if to reduce nesting

throw new IllegalArgumentException(
"Oozie URL is not available neither in command option or in the environment");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nor in the

}
}
return url;
}

protected static void addHeaders(OozieClient wc) {
// String username = commandLine.getOptionValue(USERNAME);
// String password = commandLine.getOptionValue(PASSWORD);
// if (username != null && password != null) {
// String encoded = Base64.getEncoder().encodeToString((username + ':' + password).getBytes(
// StandardCharsets.UTF_8));
// wc.setHeader("Authorization", "Basic " + encoded);
// }
Comment on lines +58 to +64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?

for (Map.Entry<Object, Object> entry : System.getProperties().entrySet()) {
String key = (String) entry.getKey();
if (key.startsWith(WS_HEADER_PREFIX)) {
String header = key.substring(WS_HEADER_PREFIX.length());
System.out.println("Header added to Oozie client: " + header);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't a logger be better?

wc.setHeader(header, (String) entry.getValue());
}
}
}

protected static void setRetryCount(OozieClient wc) {
String retryCount = System.getProperty(OOZIE_RETRY_COUNT);
if (retryCount != null && !retryCount.isEmpty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings.isNullOrEmpty?

try {
int retry = Integer.parseInt(retryCount.trim());
wc.setRetryCount(retry);
} catch (Exception ex) {
System.err.println(
"Unable to parse the retry settings. May be not an integer [" + retryCount + "]");
ex.printStackTrace();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2022-2025 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;

import com.google.auto.service.AutoService;
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsInput;
import com.google.edwmigration.dumper.application.dumper.connector.AbstractConnector;
import com.google.edwmigration.dumper.application.dumper.connector.Connector;
import com.google.edwmigration.dumper.application.dumper.connector.MetadataConnector;
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
import com.google.edwmigration.dumper.application.dumper.task.DumpMetadataTask;
import com.google.edwmigration.dumper.application.dumper.task.FormatTask;
import com.google.edwmigration.dumper.application.dumper.task.Task;
import com.google.edwmigration.dumper.application.dumper.utils.ArchiveNameUtil;
import com.google.edwmigration.dumper.plugin.ext.jdk.annotation.Description;
import java.time.Clock;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.oozie.client.XOozieClient;

@AutoService(Connector.class)
@Description("Dumps Jobs history from Oozie.")
@RespectsInput(order = 100, arg = ConnectorArguments.OPT_URI, description = "Oozie URL.")
public class OozieConnector extends AbstractConnector implements MetadataConnector {
public static final String FORMAT_NAME = "oozie.dump.zip";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be private? it is not used anywhere else

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


public OozieConnector() {
super("oozie");
}

@Nonnull
@Override
public String getDefaultFileName(boolean isAssessment, Clock clock) {
return ArchiveNameUtil.getFileNameWithTimestamp(getName(), clock);
}

@Override
public void addTasksTo(@Nonnull List<? super Task<?>> out, @Nonnull ConnectorArguments arguments)
throws Exception {
out.add(new DumpMetadataTask(arguments, FORMAT_NAME));
out.add(new FormatTask(FORMAT_NAME));
out.add(new OozieJobsTask(93));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 93?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default quarter days

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to add a variable or comment for visibility

}

@Nonnull
@Override
public Handle open(@Nonnull ConnectorArguments arguments) throws Exception {
XOozieClient xOozieClient = OozieClientFactory.createXOozieClient();
return new OozieHandle(xOozieClient);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022-2025 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;

import com.google.edwmigration.dumper.application.dumper.handle.Handle;
import java.io.IOException;
import org.apache.oozie.client.XOozieClient;

public class OozieHandle implements Handle {
private final XOozieClient oozieClient;

public OozieHandle(XOozieClient oozieClient) {
this.oozieClient = oozieClient;
}

public XOozieClient getOozieClient() {
return oozieClient;
}

@Override
public void close() throws IOException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2022-2025 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.LocalDateTime.ofInstant;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSink;
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
import com.google.edwmigration.dumper.application.dumper.task.AbstractTask;
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.List;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.XOozieClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OozieJobsTask extends AbstractTask<Void> {
private static final Logger LOG = LoggerFactory.getLogger(OozieJobsTask.class);
private final ObjectMapper objectMapper = new ObjectMapper();

private final int maxDaysToFetch;

public OozieJobsTask(int maxDaysToFetch) {
super("oozie_jobs.csv");
Preconditions.checkArgument(
maxDaysToFetch >= 1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth also have upper limit for maxDaysToFetch? So it will not get any large numbers, which do not make sense, like million?

String.format("Amount of days must be a positive number. Got %d.", maxDaysToFetch));

this.maxDaysToFetch = maxDaysToFetch;
}

// todo jobs params in filter
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START);
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END);
@CheckForNull
@Override
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Handle handle)
throws Exception {
final LocalDateTime now = LocalDateTime.now(); // todo fix for unit test
final CSVFormat csvFormat = newCsvFormatForClass(WorkflowJob.class);
final ImmutableList<String> csvHeader = ImmutableList.copyOf(csvFormat.getHeader());

try (CSVPrinter printer = csvFormat.print(sink.asCharSink(UTF_8).openBufferedStream())) {
XOozieClient oozieClient = ((OozieHandle) handle).getOozieClient();
final int batchSize = 1_000;
int offset = 0;
LocalDateTime lastJobEndDate = now;

LOG.info("Start fetch Oozie jobs for last {}d starts from {}", maxDaysToFetch, now);

while (ChronoUnit.DAYS.between(now, lastJobEndDate) < maxDaysToFetch) {
List<WorkflowJob> jobsInfo = oozieClient.getJobsInfo(null, offset, batchSize);
Copy link
Collaborator

@kaxuna kaxuna Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does getJobsInfo() returns elements sorted by the endTime date?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe jobs, jobList or jobBatch would be better names?

for (WorkflowJob workflowJob : jobsInfo) {
Object[] record = toCSVRecord(workflowJob, csvHeader);
printer.printRecord(record);
}

if (jobsInfo.isEmpty()) {
break;
}

WorkflowJob lastJob = jobsInfo.get(jobsInfo.size() - 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the jobs inside jobsInfo sorted? So you can take the last element?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

if (lastJob.getEndTime() == null) {
break;
}
lastJobEndDate = ofInstant(lastJob.getEndTime().toInstant(), ZoneId.systemDefault());
offset += jobsInfo.size();
}

printer.println();
}
return null;
}

private Object[] toCSVRecord(WorkflowJob job, ImmutableList<String> header) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be static?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Object[] record = new Object[header.size()];
for (int i = 0; i < header.size(); i++) {
record[i] = PropertyUtils.getProperty(job, header.get(i));
if (record[i] != null && record[i] instanceof Date) {
// avoid date formats complexity and use milliseconds
record[i] = ((Date) record[i]).getTime();
}
if (record[i] != null && record[i] instanceof List) {
// write Actions arrays as json string in csv
record[i] = objectMapper.writeValueAsString(record[i]);
}
}
return record;
}
}
Loading