1818
1919import static com .google .edwmigration .dumper .application .dumper .connector .snowflake .SnowflakeInput .SCHEMA_ONLY_SOURCE ;
2020import static com .google .edwmigration .dumper .application .dumper .utils .ArchiveNameUtil .getEntryFileNameWithTimestamp ;
21+ import static org .apache .commons .lang3 .StringUtils .isBlank ;
2122
2223import com .google .auto .service .AutoService ;
2324import com .google .common .base .CaseFormat ;
@@ -225,11 +226,7 @@ private String createQueryFromAccountUsage(ConnectorArguments arguments)
225226 + "FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY\n "
226227 + "WHERE end_time >= to_timestamp_ltz('%s')\n "
227228 + "AND end_time <= to_timestamp_ltz('%s')\n " );
228- if (!StringUtils .isBlank (arguments .getQueryLogEarliestTimestamp ()))
229- queryBuilder
230- .append ("AND start_time >= " )
231- .append (arguments .getQueryLogEarliestTimestamp ())
232- .append ("\n " );
229+ queryBuilder .append (earliestTimestamp (arguments ));
233230 if (overrideWhere != null ) queryBuilder .append (" AND " ).append (overrideWhere );
234231 return queryBuilder .toString ().replace ('\n' , ' ' );
235232 }
@@ -242,8 +239,6 @@ private String createQueryFromInformationSchema(ConnectorArguments arguments)
242239 String overrideQuery = getOverrideQuery (arguments );
243240 if (overrideQuery != null ) return overrideQuery ;
244241
245- String overrideWhere = getOverrideWhere (arguments );
246-
247242 @ SuppressWarnings ("OrphanedFormatString" )
248243 StringBuilder queryBuilder =
249244 new StringBuilder (
@@ -268,20 +263,18 @@ private String createQueryFromInformationSchema(ConnectorArguments arguments)
268263 // maximum range of 7 trailing days.
269264 + ",end_time_range_start=>to_timestamp_ltz('%s')\n "
270265 + ",end_time_range_end=>to_timestamp_ltz('%s')\n "
271- + "))\n " );
266+ // It makes leter formatting easier if we always have a 'WHERE'.
267+ + ")) WHERE 1=1\n " );
272268 // if the user specifies an earliest start time there will be extraneous empty dump files
273269 // because we always iterate over the full 7 trailing days; maybe it's worth
274270 // preventing that in the future. To do that, we should require getQueryLogEarliestTimestamp()
275271 // to parse and return an ISO instant, not a database-server-specific format.
276- // TODO: Use ZonedIntervalIterableGenerator.forConnectorArguments()
277- boolean appendStartTime = !StringUtils .isBlank (arguments .getQueryLogEarliestTimestamp ());
278- if (appendStartTime )
279- queryBuilder
280- .append ("WHERE start_time >= " )
281- .append (arguments .getQueryLogEarliestTimestamp ())
282- .append ("\n " );
283- if (overrideWhere != null )
284- queryBuilder .append (appendStartTime ? " AND " : "WHERE" ).append (overrideWhere );
272+ queryBuilder .append (earliestTimestamp (arguments ));
273+
274+ String overrideWhere = getOverrideWhere (arguments );
275+ if (overrideWhere != null ) {
276+ queryBuilder .append (" AND " + overrideWhere );
277+ }
285278 return queryBuilder .toString ().replace ('\n' , ' ' );
286279 }
287280
@@ -343,15 +336,21 @@ private String createExtendedQueryFromAccountUsage(ConnectorArguments arguments)
343336 + "AND end_time <= to_timestamp_ltz('%s')\n "
344337 + "AND is_client_generated_statement = FALSE\n " );
345338
346- if (!StringUtils .isBlank (arguments .getQueryLogEarliestTimestamp ()))
347- queryBuilder
348- .append ("AND start_time >= " )
349- .append (arguments .getQueryLogEarliestTimestamp ())
350- .append ("\n " );
339+ queryBuilder .append (earliestTimestamp (arguments ));
351340 if (overrideWhere != null ) queryBuilder .append (" AND " ).append (overrideWhere );
352341 return queryBuilder .toString ().replace ('\n' , ' ' );
353342 }
354343
344+ @ Nonnull
345+ static String earliestTimestamp (@ Nonnull ConnectorArguments arguments ) {
346+ String timestamp = arguments .getQueryLogEarliestTimestamp ();
347+ if (isBlank (timestamp )) {
348+ return "" ;
349+ } else {
350+ return String .format ("AND start_time >= %s\n " , timestamp );
351+ }
352+ }
353+
355354 @ CheckForNull
356355 private String getOverrideQuery (@ Nonnull ConnectorArguments arguments )
357356 throws MetadataDumperUsageException {
@@ -430,17 +429,16 @@ private static MetadataDumperUsageException unsupportedOption(String option) {
430429 return new MetadataDumperUsageException (message );
431430 }
432431
433- private String getOverrideableQuery (
434- @ Nullable String overrideQuery , @ Nonnull String defaultSql , @ Nonnull String whereField ) {
435- String sql = overrideQuery != null ? overrideQuery : defaultSql ;
436- return sql
437- + "\n "
438- + "WHERE "
439- + whereField
440- + " >= to_timestamp_ltz('%s')\n "
441- + "AND "
442- + whereField
443- + " <= to_timestamp_ltz('%s')" ;
432+ static String overrideableQuery (
433+ @ Nullable String override , @ Nonnull String defaultSql , @ Nonnull String whereField ) {
434+ String start = whereField + " >= to_timestamp_ltz('%s')" ;
435+ String end = whereField + " <= to_timestamp_ltz('%s')" ;
436+
437+ if (override != null ) {
438+ return String .format ("%s\n WHERE %s\n AND %s" , override , start , end );
439+ } else {
440+ return String .format ("%s\n WHERE %s\n AND %s" , defaultSql , start , end );
441+ }
444442 }
445443
446444 private String parseColumnsFromHeader (Class <? extends Enum <?>> headerClass ) {
@@ -455,7 +453,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
455453 return Arrays .asList (
456454 new TaskDescription (
457455 WarehouseEventsHistoryFormat .ZIP_ENTRY_PREFIX ,
458- getOverrideableQuery (
456+ overrideableQuery (
459457 arguments .getDefinition (
460458 SnowflakeLogConnectorProperties .WAREHOUSE_EVENTS_HISTORY_OVERRIDE_QUERY ),
461459 String .format (
@@ -466,7 +464,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
466464 WarehouseEventsHistoryFormat .Header .class ),
467465 new TaskDescription (
468466 AutomaticClusteringHistoryFormat .ZIP_ENTRY_PREFIX ,
469- getOverrideableQuery (
467+ overrideableQuery (
470468 arguments .getDefinition (
471469 SnowflakeLogConnectorProperties .AUTOMATIC_CLUSTERING_HISTORY_OVERRIDE_QUERY ),
472470 String .format (
@@ -477,7 +475,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
477475 AutomaticClusteringHistoryFormat .Header .class ),
478476 new TaskDescription (
479477 CopyHistoryFormat .ZIP_ENTRY_PREFIX ,
480- getOverrideableQuery (
478+ overrideableQuery (
481479 arguments .getDefinition (
482480 SnowflakeLogConnectorProperties .COPY_HISTORY_OVERRIDE_QUERY ),
483481 String .format (
@@ -488,7 +486,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
488486 CopyHistoryFormat .Header .class ),
489487 new TaskDescription (
490488 DatabaseReplicationUsageHistoryFormat .ZIP_ENTRY_PREFIX ,
491- getOverrideableQuery (
489+ overrideableQuery (
492490 arguments .getDefinition (
493491 SnowflakeLogConnectorProperties
494492 .DATABASE_REPLICATION_USAGE_HISTORY_OVERRIDE_QUERY ),
@@ -500,7 +498,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
500498 DatabaseReplicationUsageHistoryFormat .Header .class ),
501499 new TaskDescription (
502500 LoginHistoryFormat .ZIP_ENTRY_PREFIX ,
503- getOverrideableQuery (
501+ overrideableQuery (
504502 arguments .getDefinition (
505503 SnowflakeLogConnectorProperties .LOGIN_HISTORY_OVERRIDE_QUERY ),
506504 String .format (
@@ -511,7 +509,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
511509 LoginHistoryFormat .Header .class ),
512510 new TaskDescription (
513511 MeteringDailyHistoryFormat .ZIP_ENTRY_PREFIX ,
514- getOverrideableQuery (
512+ overrideableQuery (
515513 arguments .getDefinition (
516514 SnowflakeLogConnectorProperties .METERING_DAILY_HISTORY_OVERRIDE_QUERY ),
517515 String .format (
@@ -522,7 +520,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
522520 MeteringDailyHistoryFormat .Header .class ),
523521 new TaskDescription (
524522 PipeUsageHistoryFormat .ZIP_ENTRY_PREFIX ,
525- getOverrideableQuery (
523+ overrideableQuery (
526524 arguments .getDefinition (
527525 SnowflakeLogConnectorProperties .PIPE_USAGE_HISTORY_OVERRIDE_QUERY ),
528526 String .format (
@@ -533,7 +531,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
533531 PipeUsageHistoryFormat .Header .class ),
534532 new TaskDescription (
535533 QueryAccelerationHistoryFormat .ZIP_ENTRY_PREFIX ,
536- getOverrideableQuery (
534+ overrideableQuery (
537535 arguments .getDefinition (
538536 SnowflakeLogConnectorProperties .QUERY_ACCELERATION_HISTORY_OVERRIDE_QUERY ),
539537 String .format (
@@ -545,7 +543,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
545543 TaskCategory .OPTIONAL ),
546544 new TaskDescription (
547545 ReplicationGroupUsageHistoryFormat .ZIP_ENTRY_PREFIX ,
548- getOverrideableQuery (
546+ overrideableQuery (
549547 arguments .getDefinition (
550548 SnowflakeLogConnectorProperties .REPLICATION_GROUP_USAGE_HISTORY_OVERRIDE_QUERY ),
551549 String .format (
@@ -556,7 +554,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
556554 ReplicationGroupUsageHistoryFormat .Header .class ),
557555 new TaskDescription (
558556 ServerlessTaskHistoryFormat .ZIP_ENTRY_PREFIX ,
559- getOverrideableQuery (
557+ overrideableQuery (
560558 arguments .getDefinition (
561559 SnowflakeLogConnectorProperties .SERVERLESS_TASK_HISTORY_OVERRIDE_QUERY ),
562560 String .format (
@@ -567,7 +565,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
567565 ServerlessTaskHistoryFormat .Header .class ),
568566 new TaskDescription (
569567 TaskHistoryFormat .ZIP_ENTRY_PREFIX ,
570- getOverrideableQuery (
568+ overrideableQuery (
571569 arguments .getDefinition (
572570 SnowflakeLogConnectorProperties .TASK_HISTORY_OVERRIDE_QUERY ),
573571 String .format (
@@ -578,7 +576,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
578576 TaskHistoryFormat .Header .class ),
579577 new TaskDescription (
580578 WarehouseLoadHistoryFormat .ZIP_ENTRY_PREFIX ,
581- getOverrideableQuery (
579+ overrideableQuery (
582580 arguments .getDefinition (
583581 SnowflakeLogConnectorProperties .WAREHOUSE_LOAD_HISTORY_OVERRIDE_QUERY ),
584582 String .format (
@@ -589,7 +587,7 @@ private List<TaskDescription> createTimeSeriesTasks(ConnectorArguments arguments
589587 WarehouseLoadHistoryFormat .Header .class ),
590588 new TaskDescription (
591589 WarehouseMeteringHistoryFormat .ZIP_ENTRY_PREFIX ,
592- getOverrideableQuery (
590+ overrideableQuery (
593591 arguments .getDefinition (
594592 SnowflakeLogConnectorProperties .WAREHOUSE_METERING_HISTORY_OVERRIDE_QUERY ),
595593 String .format (
0 commit comments