Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.edwmigration.dumper.application.dumper.connector.snowflake.SnowflakeInput.SCHEMA_ONLY_SOURCE;
import static com.google.edwmigration.dumper.application.dumper.utils.ArchiveNameUtil.getEntryFileNameWithTimestamp;
import static org.apache.commons.lang3.StringUtils.isBlank;

import com.google.auto.service.AutoService;
import com.google.common.base.CaseFormat;
Expand Down Expand Up @@ -225,11 +226,7 @@ private String createQueryFromAccountUsage(ConnectorArguments arguments)
+ "FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY\n"
+ "WHERE end_time >= to_timestamp_ltz('%s')\n"
+ "AND end_time <= to_timestamp_ltz('%s')\n");
if (!StringUtils.isBlank(arguments.getQueryLogEarliestTimestamp()))
queryBuilder
.append("AND start_time >= ")
.append(arguments.getQueryLogEarliestTimestamp())
.append("\n");
queryBuilder.append(earliestTimestamp(arguments));
if (overrideWhere != null) queryBuilder.append(" AND ").append(overrideWhere);
return queryBuilder.toString().replace('\n', ' ');
}
Expand All @@ -242,8 +239,6 @@ private String createQueryFromInformationSchema(ConnectorArguments arguments)
String overrideQuery = getOverrideQuery(arguments);
if (overrideQuery != null) return overrideQuery;

String overrideWhere = getOverrideWhere(arguments);

@SuppressWarnings("OrphanedFormatString")
StringBuilder queryBuilder =
new StringBuilder(
Expand All @@ -268,20 +263,18 @@ private String createQueryFromInformationSchema(ConnectorArguments arguments)
// maximum range of 7 trailing days.
+ ",end_time_range_start=>to_timestamp_ltz('%s')\n"
+ ",end_time_range_end=>to_timestamp_ltz('%s')\n"
+ "))\n");
// It makes leter formatting easier if we always have a 'WHERE'.
+ ")) WHERE 1=1\n");
// if the user specifies an earliest start time there will be extraneous empty dump files
// because we always iterate over the full 7 trailing days; maybe it's worth
// preventing that in the future. To do that, we should require getQueryLogEarliestTimestamp()
// to parse and return an ISO instant, not a database-server-specific format.
// TODO: Use ZonedIntervalIterableGenerator.forConnectorArguments()
boolean appendStartTime = !StringUtils.isBlank(arguments.getQueryLogEarliestTimestamp());
if (appendStartTime)
queryBuilder
.append("WHERE start_time >= ")
.append(arguments.getQueryLogEarliestTimestamp())
.append("\n");
if (overrideWhere != null)
queryBuilder.append(appendStartTime ? " AND " : "WHERE").append(overrideWhere);
queryBuilder.append(earliestTimestamp(arguments));

String overrideWhere = getOverrideWhere(arguments);
if (overrideWhere != null) {
queryBuilder.append(" AND " + overrideWhere);
}
return queryBuilder.toString().replace('\n', ' ');
}

Expand Down Expand Up @@ -343,15 +336,21 @@ private String createExtendedQueryFromAccountUsage(ConnectorArguments arguments)
+ "AND end_time <= to_timestamp_ltz('%s')\n"
+ "AND is_client_generated_statement = FALSE\n");

if (!StringUtils.isBlank(arguments.getQueryLogEarliestTimestamp()))
queryBuilder
.append("AND start_time >= ")
.append(arguments.getQueryLogEarliestTimestamp())
.append("\n");
queryBuilder.append(earliestTimestamp(arguments));
if (overrideWhere != null) queryBuilder.append(" AND ").append(overrideWhere);
return queryBuilder.toString().replace('\n', ' ');
}

@Nonnull
static String earliestTimestamp(@Nonnull ConnectorArguments arguments) {
String timestamp = arguments.getQueryLogEarliestTimestamp();
if (isBlank(timestamp)) {
return "";
} else {
return String.format("AND start_time >= %s\n", timestamp);
}
}

@CheckForNull
private String getOverrideQuery(@Nonnull ConnectorArguments arguments)
throws MetadataDumperUsageException {
Expand Down Expand Up @@ -430,17 +429,16 @@ private static MetadataDumperUsageException unsupportedOption(String option) {
return new MetadataDumperUsageException(message);
}

private String getOverrideableQuery(
@Nullable String overrideQuery, @Nonnull String defaultSql, @Nonnull String whereField) {
String sql = overrideQuery != null ? overrideQuery : defaultSql;
return sql
+ "\n"
+ "WHERE "
+ whereField
+ " >= to_timestamp_ltz('%s')\n"
+ "AND "
+ whereField
+ " <= to_timestamp_ltz('%s')";
static String overrideableQuery(
@Nullable String override, @Nonnull String defaultSql, @Nonnull String whereField) {
String start = whereField + " >= to_timestamp_ltz('%s')";
String end = whereField + " <= to_timestamp_ltz('%s')";

if (override != null) {
return String.format("%s\nWHERE %s\nAND %s", override, start, end);
} else {
return String.format("%s\nWHERE %s\nAND %s", defaultSql, start, end);
}
}

private String parseColumnsFromHeader(Class<? extends Enum<?>> headerClass) {
Expand All @@ -455,7 +453,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
return Arrays.asList(
new TaskDescription(
WarehouseEventsHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.WAREHOUSE_EVENTS_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -466,7 +464,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
WarehouseEventsHistoryFormat.Header.class),
new TaskDescription(
AutomaticClusteringHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.AUTOMATIC_CLUSTERING_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -477,7 +475,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
AutomaticClusteringHistoryFormat.Header.class),
new TaskDescription(
CopyHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.COPY_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -488,7 +486,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
CopyHistoryFormat.Header.class),
new TaskDescription(
DatabaseReplicationUsageHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties
.DATABASE_REPLICATION_USAGE_HISTORY_OVERRIDE_QUERY),
Expand All @@ -500,7 +498,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
DatabaseReplicationUsageHistoryFormat.Header.class),
new TaskDescription(
LoginHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.LOGIN_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -511,7 +509,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
LoginHistoryFormat.Header.class),
new TaskDescription(
MeteringDailyHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.METERING_DAILY_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -522,7 +520,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
MeteringDailyHistoryFormat.Header.class),
new TaskDescription(
PipeUsageHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.PIPE_USAGE_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -533,7 +531,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
PipeUsageHistoryFormat.Header.class),
new TaskDescription(
QueryAccelerationHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.QUERY_ACCELERATION_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -545,7 +543,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
TaskCategory.OPTIONAL),
new TaskDescription(
ReplicationGroupUsageHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.REPLICATION_GROUP_USAGE_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -556,7 +554,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
ReplicationGroupUsageHistoryFormat.Header.class),
new TaskDescription(
ServerlessTaskHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.SERVERLESS_TASK_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -567,7 +565,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
ServerlessTaskHistoryFormat.Header.class),
new TaskDescription(
TaskHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.TASK_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -578,7 +576,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
TaskHistoryFormat.Header.class),
new TaskDescription(
WarehouseLoadHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.WAREHOUSE_LOAD_HISTORY_OVERRIDE_QUERY),
String.format(
Expand All @@ -589,7 +587,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
WarehouseLoadHistoryFormat.Header.class),
new TaskDescription(
WarehouseMeteringHistoryFormat.ZIP_ENTRY_PREFIX,
getOverrideableQuery(
overrideableQuery(
arguments.getDefinition(
SnowflakeLogConnectorProperties.WAREHOUSE_METERING_HISTORY_OVERRIDE_QUERY),
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
*/
package com.google.edwmigration.dumper.application.dumper.connector.snowflake;

import com.google.common.base.Predicates;
import static com.google.common.base.Predicates.alwaysTrue;
import static com.google.edwmigration.dumper.application.dumper.connector.snowflake.SnowflakeLogsConnector.earliestTimestamp;
import static com.google.edwmigration.dumper.application.dumper.connector.snowflake.SnowflakeLogsConnector.overrideableQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.SnowflakeLogsDumpFormat;
import com.google.edwmigration.dumper.test.TestUtils;
import java.io.File;
import java.io.IOException;
Expand All @@ -30,24 +35,82 @@

/** @author shevek */
@RunWith(JUnit4.class)
public class SnowflakeLogsConnectorTest extends AbstractSnowflakeConnectorExecutionTest {

private final SnowflakeLogsConnector connector = new SnowflakeLogsConnector();
public class SnowflakeLogsConnectorTest {

@Test
public void testExecution() throws Exception {
class ExecutionTest extends AbstractSnowflakeConnectorExecutionTest {
void test(File output) throws Exception {
if (run(ARGS(new SnowflakeLogsConnector(), output))) {
new ZipValidator()
.withFormat("snowflake.logs.zip")
.withAllowedEntries(alwaysTrue())
.run(output);
}
}
}
File outputFile = TestUtils.newOutputFile("compilerworks-snowflake-logs-au.zip");
if (!run(ARGS(connector, outputFile))) return;

ZipValidator validator = new ZipValidator().withFormat(SnowflakeLogsDumpFormat.FORMAT_NAME);
new ExecutionTest().test(outputFile);
}

@Test
public void earliestTimestamp_notProvided_emptyResult() throws IOException {
ConnectorArguments arguments = new ConnectorArguments("--connector", "snowflake-logs");

String result = earliestTimestamp(arguments);

assertEquals("", result);
}

@Test
public void earliestTimestamp_provided_resultMatches() throws IOException {
ConnectorArguments arguments =
new ConnectorArguments(
"--connector",
"snowflake-logs",
"--" + ConnectorArguments.OPT_QUERY_LOG_EARLIEST_TIMESTAMP,
"2024-03-21");

String result = earliestTimestamp(arguments);

assertTrue(result, result.contains("2024-03-21"));
assertTrue(result, result.contains("start_time"));
assertTrue(result, result.endsWith("\n"));
}

@Test
public void overrideableQuery_overrideAbsent_defaultUsed() throws IOException {
String defaultSql = "SELECT event_name, query_id FROM WAREHOUSE_EVENTS_HISTORY";

String result = overrideableQuery(null, defaultSql, "timestamp");

assertTrue(result, result.contains("event_name"));
}

@Test
public void overrideableQuery_overrideEmpty_resultEmpty() throws IOException {
String defaultSql = "SELECT event_name, query_id FROM WAREHOUSE_EVENTS_HISTORY";
String override = "";

String result = overrideableQuery(override, defaultSql, "timestamp");

assertFalse(result, result.contains("event_name"));
}

@Test
public void overrideableQuery_overridePresent_defaultIgnored() throws IOException {
String defaultSql = "SELECT event_name, query_id FROM WAREHOUSE_EVENTS_HISTORY";
String override = "SELECT query_id FROM WAREHOUSE_EVENTS_HISTORY";

validator.withAllowedEntries(Predicates.alwaysTrue()); // Permit any files.
String result = overrideableQuery(override, defaultSql, "timestamp");

validator.run(outputFile);
assertFalse(result, result.contains("event_name"));
}

@Test
public void validate_unsupportedOption_throwsException() throws IOException {
SnowflakeLogsConnector connector = new SnowflakeLogsConnector();
ConnectorArguments arguments =
new ConnectorArguments(
"--connector",
Expand Down
Loading