Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[b/402028795] Oozie connector #793

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

vladislav-sidorovich
Copy link
Collaborator

Run Oozie connector:

kinit
export OOZIE_URL=https://....cloudera.site:11443/oozie
./dwh-migration-dumper --connector oozie

The code will be cleaned, extra parameters like url and dates added.

The main flow and tests a ready and can be reviewed.

@@ -72,6 +72,13 @@ dependencies {
implementation libs.httpclient5
implementation libs.aws.java.sdk.redshift
implementation libs.aws.java.sdk.cloudwatch
implementation('org.apache.oozie:oozie-client:5.2.1') {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have this and its transient dependencies on airlock?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you give me the list of the transients I can check

into layout.buildDirectory.dir('mirror/sources')
outputs.dir "mirror/sources"
}
//tasks.register('generateSourceMirror', Copy) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe comment why we are not deleting this code?


protected static String getOozieUrl() {
String url = null; // commandLine.getOptionValue(OOZIE_OPTION);
if (url == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is not the if statement always true?

Comment on lines +58 to +64
// String username = commandLine.getOptionValue(USERNAME);
// String password = commandLine.getOptionValue(PASSWORD);
// if (username != null && password != null) {
// String encoded = Base64.getEncoder().encodeToString((username + ':' + password).getBytes(
// StandardCharsets.UTF_8));
// wc.setHeader("Authorization", "Basic " + encoded);
// }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?

LOG.info("Start fetch Oozie jobs for last {}d starts from {}", maxDaysToFetch, now);

while (ChronoUnit.DAYS.between(now, lastJobEndDate) < maxDaysToFetch) {
List<WorkflowJob> jobsInfo = oozieClient.getJobsInfo(null, offset, batchSize);
Copy link
Collaborator

@kaxuna kaxuna Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does getJobsInfo() returns elements sorted by the endTime date?

url = System.getenv(ENV_OOZIE_URL);
if (url == null) {
throw new IllegalArgumentException(
"Oozie URL is not available neither in command option or in the environment");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nor in the


protected static String getOozieUrl() {
String url = null; // commandLine.getOptionValue(OOZIE_OPTION);
if (url == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is always true

@Description("Dumps Jobs history from Oozie.")
@RespectsInput(order = 100, arg = ConnectorArguments.OPT_URI, description = "Oozie URL.")
public class OozieConnector extends AbstractConnector implements MetadataConnector {
public static final String FORMAT_NAME = "oozie.dump.zip";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be private? it is not used anywhere else

throws Exception {
out.add(new DumpMetadataTask(arguments, FORMAT_NAME));
out.add(new FormatTask(FORMAT_NAME));
out.add(new OozieJobsTask(93));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 93?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default quarter days

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to add a variable or comment for visibility

break;
}

WorkflowJob lastJob = jobsInfo.get(jobsInfo.size() - 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the jobs inside jobsInfo sorted? So you can take the last element?

public OozieJobsTask(int maxDaysToFetch) {
super("jobs.csv");
Preconditions.checkArgument(
maxDaysToFetch >= 1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth also have upper limit for maxDaysToFetch? So it will not get any large numbers, which do not make sense, like million?

Copy link
Collaborator

@kajgol kajgol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this is under hadoop? The other hadoop connectors (ranger, hdfs) are top-level.

String url = null; // commandLine.getOptionValue(OOZIE_OPTION);
if (url == null) {
url = System.getenv(ENV_OOZIE_URL);
if (url == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be moved out from the if to reduce nesting

String key = (String) entry.getKey();
if (key.startsWith(WS_HEADER_PREFIX)) {
String header = key.substring(WS_HEADER_PREFIX.length());
System.out.println("Header added to Oozie client: " + header);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't a logger be better?


protected static void setRetryCount(OozieClient wc) {
String retryCount = System.getProperty(OOZIE_RETRY_COUNT);
if (retryCount != null && !retryCount.isEmpty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings.isNullOrEmpty?

LOG.info("Start fetch Oozie jobs for last {}d starts from {}", maxDaysToFetch, now);

while (ChronoUnit.DAYS.between(now, lastJobEndDate) < maxDaysToFetch) {
List<WorkflowJob> jobsInfo = oozieClient.getJobsInfo(null, offset, batchSize);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe jobs, jobList or jobBatch would be better names?

return null;
}

private Object[] toCSVRecord(WorkflowJob job, ImmutableList<String> header) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be static?


assertEquals("Amount of days must be a positive number. Got -3.", exception.getMessage());

new OozieJobsTask(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

success. No, Errors.

}

@Test
public void create_negativeDays_throwsException() throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonpositive instead of negative since you're testing with 0?

import org.apache.oozie.client.AuthOozieClient;
import org.apache.oozie.client.OozieClientException;

public class TestNoAuthOozieClient extends AuthOozieClient {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be public? In fact could it be a private static nested class in the test class?

@vladislav-sidorovich
Copy link
Collaborator Author

Any reason why this is under hadoop? The other hadoop connectors (ranger, hdfs) are top-level.

Other connectors also should be moved under hadoop, but not in the scope of this PR.

Comment on lines +109 to +111
private static ResponseDefinitionBuilder okJsonWithBodyFile(String fileName) {
return ok().withBodyFile(fileName).withHeader(CONTENT_TYPE, "application/json");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to move this method as util method in global scope.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants