diff --git a/dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/netezza/NetezzaMetadataConnector.java b/dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/netezza/NetezzaMetadataConnector.java index 9537a5d0d..be572e2d0 100644 --- a/dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/netezza/NetezzaMetadataConnector.java +++ b/dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/netezza/NetezzaMetadataConnector.java @@ -17,7 +17,6 @@ package com.google.edwmigration.dumper.application.dumper.connector.netezza; import com.google.auto.service.AutoService; -import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.io.ByteSink; import com.google.edwmigration.dumper.application.dumper.ConnectorArguments; @@ -49,7 +48,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -133,24 +131,6 @@ protected List doInConnection( return doSelect(connection, rse, sql); } - @Override - public List run(TaskRunContext context) throws Exception { - List databaseNames = super.run(context); - for (String databaseName : - MoreObjects.firstNonNull(databaseNames, Collections.emptyList())) { - // TODO: - // Construct a filter if --dbs was given on the command line. - // For each db in the databaseNames [which passes the filter] - // Add a set of tasks hauled up from the down below, and run them. - // TODO: - // Extract the databaseName from an appropriate SQL query in the ResultSetExtractor above. - // TODO: - // Use this task to list databases somewhere in the down-below addTasksTo. - // context.runChildTask(...); - } - return databaseNames; - } - @Override protected String describeSourceData() { return createSourceDataDescriptionForQuery(sql); diff --git a/dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/task/AbstractTask.java b/dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/task/AbstractTask.java index a177381d4..eb2aaf9b9 100644 --- a/dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/task/AbstractTask.java +++ b/dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/task/AbstractTask.java @@ -28,12 +28,15 @@ import com.google.edwmigration.dumper.application.dumper.io.OutputHandle.WriteMode; import com.google.errorprone.annotations.ForOverride; import java.beans.PropertyDescriptor; +import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.QuoteMode; @@ -114,19 +117,71 @@ protected abstract T doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonn throws Exception; @Override - public T run(TaskRunContext context) throws Exception { + public final T run(@Nonnull TaskRunContext context) throws Exception { + return getWrapper(context).runTask(this, context).orElse(null); + } + + SinkWrapper getWrapper(@Nonnull TaskRunContext context) throws IOException { if (options.targetInitialization() == TargetInitialization.DO_NOT_CREATE) { - return doRun(context, DummyByteSink.INSTANCE, context.getHandle()); + return SinkWrapper.decoy(); + } + OutputHandle handle = context.newOutputFileHandle(getTargetPath()); + if (options.writeMode().equals(OutputHandle.WriteMode.APPEND_EXISTING)) { + return SinkWrapper.append(handle, options); + } else if (handle.exists()) { + return SinkWrapper.skip(handle); + } else { + return SinkWrapper.temporary(handle, options); + } + } + + static class SinkWrapper { + @Nullable final ByteSink sink; + @Nullable final OutputHandle handle; + final boolean shouldCommit; + + @Nonnull + static SinkWrapper decoy() { + return new SinkWrapper(DummyByteSink.INSTANCE, null, false); + } + + @Nonnull + static SinkWrapper append(@Nonnull OutputHandle handle, @Nonnull TaskOptions options) + throws IOException { + return new SinkWrapper(handle.asByteSink(options.writeMode()), handle, false); + } + + @Nonnull + static SinkWrapper skip(@Nonnull OutputHandle handle) { + return new SinkWrapper(null, handle, false); + } + + @Nonnull + static SinkWrapper temporary(@Nonnull OutputHandle handle, @Nonnull TaskOptions options) + throws IOException { + return new SinkWrapper(handle.asTemporaryByteSink(options.writeMode()), handle, true); + } + + SinkWrapper(@Nullable ByteSink sink, @Nullable OutputHandle handle, boolean shouldCommit) { + this.sink = sink; + this.handle = handle; + this.shouldCommit = shouldCommit; } - OutputHandle sink = context.newOutputFileHandle(getTargetPath()); - if (sink.exists()) { - logger.info("Skipping {}: {} already exists.", getName(), sink); - return null; + @Nonnull + Optional runTask(AbstractTask task, TaskRunContext context) throws Exception { + ByteSink localSink = sink; + if (localSink != null) { + U result = task.doRun(context, localSink, context.getHandle()); + if (handle != null && shouldCommit) { + handle.commit(); + } + return Optional.ofNullable(result); + } else { + logger.info("Skipping {}. Reason: {} already exists.", task.getName(), handle); + return Optional.empty(); + } } - T result = doRun(context, sink.asTemporaryByteSink(options.writeMode()), context.getHandle()); - sink.commit(); - return result; } protected static CSVFormat newCsvFormatForClass(Class clazz) { @@ -176,9 +231,9 @@ public enum TargetInitialization { DO_NOT_CREATE } - private static class DummyByteSink extends ByteSink { + static class DummyByteSink extends ByteSink { - private static final DummyByteSink INSTANCE = new DummyByteSink(); + static final DummyByteSink INSTANCE = new DummyByteSink(); @Override public OutputStream openStream() { diff --git a/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/task/AbstractTaskTest.java b/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/task/AbstractTaskTest.java index 653a66f7c..4c56f3714 100644 --- a/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/task/AbstractTaskTest.java +++ b/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/task/AbstractTaskTest.java @@ -16,9 +16,98 @@ */ package com.google.edwmigration.dumper.application.dumper.task; +import static com.google.edwmigration.dumper.application.dumper.io.OutputHandle.WriteMode.APPEND_EXISTING; +import static com.google.edwmigration.dumper.application.dumper.io.OutputHandle.WriteMode.CREATE_TRUNCATE; +import static com.google.edwmigration.dumper.application.dumper.task.AbstractTask.TargetInitialization.CREATE; +import static com.google.edwmigration.dumper.application.dumper.task.AbstractTask.TargetInitialization.DO_NOT_CREATE; +import static com.google.edwmigration.dumper.application.dumper.task.TaskCategory.REQUIRED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.edwmigration.dumper.application.dumper.io.OutputHandle; +import com.google.edwmigration.dumper.application.dumper.io.OutputHandle.WriteMode; +import com.google.edwmigration.dumper.application.dumper.task.AbstractTask.DummyByteSink; +import com.google.edwmigration.dumper.application.dumper.task.AbstractTask.SinkWrapper; +import com.google.edwmigration.dumper.application.dumper.task.AbstractTask.TargetInitialization; +import com.google.edwmigration.dumper.application.dumper.task.AbstractTask.TaskOptions; +import java.io.IOException; +import org.junit.Test; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; /** @author shevek */ -@RunWith(JUnit4.class) -public abstract class AbstractTaskTest {} +@RunWith(Theories.class) +public abstract class AbstractTaskTest { + + @Test + public void getWrapper_createEnabledAndModeIsAppend_returnsAppendWrapper() throws IOException { + TaskRunContext mockContext = mock(TaskRunContext.class); + OutputHandle handle = mock(OutputHandle.class); + when(mockContext.newOutputFileHandle(anyString())).thenReturn(handle); + when(handle.exists()).thenReturn(false); + AbstractTask task = testTask(CREATE, APPEND_EXISTING); + + SinkWrapper wrapper = task.getWrapper(mockContext); + + verify(handle).asByteSink(any()); + assertSame(handle, wrapper.handle); + assertFalse(wrapper.shouldCommit); + } + + @Test + public void getWrapper_createEnabledAndModeIsCreate_returnsAppendWrapper() throws IOException { + TaskRunContext mockContext = mock(TaskRunContext.class); + OutputHandle handle = mock(OutputHandle.class); + when(mockContext.newOutputFileHandle(anyString())).thenReturn(handle); + when(handle.exists()).thenReturn(false); + AbstractTask task = testTask(CREATE, CREATE_TRUNCATE); + + SinkWrapper wrapper = task.getWrapper(mockContext); + + verify(handle).asTemporaryByteSink(any()); + assertSame(handle, wrapper.handle); + assertTrue(wrapper.shouldCommit); + } + + @Theory + public void getWrapper_createEnabledAndHandleExists_returnsSkipWrapper(WriteMode mode) + throws IOException { + TaskRunContext mockContext = mock(TaskRunContext.class); + OutputHandle handle = mock(OutputHandle.class); + when(mockContext.newOutputFileHandle(anyString())).thenReturn(handle); + when(handle.exists()).thenReturn(true); + AbstractTask task = testTask(CREATE, mode); + + SinkWrapper wrapper = task.getWrapper(mockContext); + + assertNull(wrapper.sink); + assertSame(handle, wrapper.handle); + assertFalse(wrapper.shouldCommit); + } + + @Theory + public void getWrapper_createNotEnabled_returnsDecoyWrapper(WriteMode mode) throws IOException { + AbstractTask task = testTask(DO_NOT_CREATE, mode); + + SinkWrapper wrapper = task.getWrapper(mock(TaskRunContext.class)); + + assertEquals(DummyByteSink.INSTANCE, wrapper); + assertNull(wrapper.handle); + assertFalse(wrapper.shouldCommit); + } + + static AbstractTask testTask(TargetInitialization strategy, OutputHandle.WriteMode mode) { + TaskOptions options = + TaskOptions.builder().setTargetInitialization(strategy).setWriteMode(mode).build(); + return new JdbcSelectTask("test-file", "SELECT 1", REQUIRED, options); + } +}