Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public abstract class AbstractOozieJobsTask<J> extends AbstractTask<Void> {
private static final Logger logger = LoggerFactory.getLogger(AbstractOozieJobsTask.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private static final int INITIAL_OOZIE_JOBS_OFFSET = 1; // starts with 1, not 0.

private final int maxDaysToFetch;
private final long initialTimestamp;
private final Class<J> oozieJobClass;
Expand Down Expand Up @@ -70,7 +72,7 @@ protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Ha
try (CSVPrinter printer = csvFormat.print(sink.asCharSink(UTF_8).openBufferedStream())) {
XOozieClient oozieClient = ((OozieHandle) handle).getOozieClient();
final int batchSize = context.getArguments().getPaginationPageSize();
int offset = 1; // starts with 1, not 0.
int offset = INITIAL_OOZIE_JOBS_OFFSET;
long lastJobEndTimestamp = initialTimestamp;

logger.info(
Expand Down Expand Up @@ -107,6 +109,14 @@ protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Ha
// todo jobs params in filter
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START);
// FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END);
/**
* Method is expected to do a call to Oozie server to fetch jobs in a ranger [{@code startDate} -
* {@code endDate}].
*
* @param oozieClient - Oozie client initialised to particular Oozie server
* @param start jobs offset pass to {@code oozieClient}
* @param len number of jobs to return
*/
abstract List<J> fetchJobs(
XOozieClient oozieClient, Date startDate, Date endDate, int start, int len)
throws OozieClientException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022-2025 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;

import java.util.Date;
import java.util.List;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.XOozieClient;

public class OozieBundleJobsTask extends AbstractOozieJobsTask<BundleJob> {
public OozieBundleJobsTask(int maxDaysToFetch) {
super("oozie_bundle_jobs.csv", maxDaysToFetch, System.currentTimeMillis());
}

@Override
List<BundleJob> fetchJobs(
XOozieClient oozieClient, Date startDate, Date endDate, int start, int len)
throws OozieClientException {
return oozieClient.getBundleJobsInfo("sortby=endTime;", start, len);
}

@Override
Date getJobEndDateTime(BundleJob job) {
return job.getEndTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void addTasksTo(@Nonnull List<? super Task<?>> out, @Nonnull ConnectorArg
out.add(new OozieServersTask());
out.add(new OozieWorkflowJobsTask(MAX_QUARTER_DAY));
out.add(new OozieCoordinatorJobsTask(MAX_QUARTER_DAY));
out.add(new OozieBundleJobsTask(MAX_QUARTER_DAY));
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2022-2025 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.hadoop.oozie;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.XOozieClient;
import org.junit.Test;

public class OozieBundleJobsTaskTest {
private final OozieBundleJobsTask task = new OozieBundleJobsTask(5);

@Test
public void fileName() {
assertEquals("oozie_bundle_jobs.csv", task.getTargetPath());
}

@Test
public void fetchJobs_success() throws Exception {
XOozieClient oozieClient = mock(XOozieClient.class);

// Act
task.fetchJobs(oozieClient, null, null, 2, 14);

// Verify
verify(oozieClient).getBundleJobsInfo("sortby=endTime;", 2, 14);
verifyNoMoreInteractions(oozieClient);
}

@Test
public void getJobEndTime_success() throws Exception {
BundleJob job = mock(BundleJob.class);

// Act
task.getJobEndDateTime(job);

// Verify
verify(job).getEndTime();
verifyNoMoreInteractions(job);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void addTasksTo_checkFilesCategory() throws Exception {
"compilerworks-format.txt", TaskCategory.REQUIRED,
"oozie_info.csv", TaskCategory.REQUIRED,
"oozie_coord_jobs.csv", TaskCategory.REQUIRED,
"oozie_bundle_jobs.csv", TaskCategory.REQUIRED,
"oozie_servers.csv", TaskCategory.REQUIRED,
"oozie_workflow_jobs.csv", TaskCategory.REQUIRED);
List<Task<?>> tasks = new ArrayList<>();
Expand Down