1818
1919import com .google .auto .service .AutoService ;
2020import com .google .common .annotations .VisibleForTesting ;
21+ import com .google .common .base .Preconditions ;
22+ import com .google .common .base .Predicates ;
23+ import com .google .common .io .ByteSink ;
2124import com .google .edwmigration .dumper .application .dumper .ConnectorArguments ;
2225import com .google .edwmigration .dumper .application .dumper .MetadataDumperUsageException ;
2326import com .google .edwmigration .dumper .application .dumper .annotations .RespectsArgumentQueryLogDays ;
2730import com .google .edwmigration .dumper .application .dumper .connector .LogsConnector ;
2831import com .google .edwmigration .dumper .application .dumper .connector .ZonedInterval ;
2932import com .google .edwmigration .dumper .application .dumper .connector .ZonedIntervalIterable ;
33+ import com .google .edwmigration .dumper .application .dumper .handle .JdbcHandle ;
34+ import com .google .edwmigration .dumper .application .dumper .task .AbstractJdbcTask ;
3035import com .google .edwmigration .dumper .application .dumper .task .DumpMetadataTask ;
3136import com .google .edwmigration .dumper .application .dumper .task .FormatTask ;
3237import com .google .edwmigration .dumper .application .dumper .task .Task ;
38+ import com .google .edwmigration .dumper .application .dumper .task .TaskRunContext ;
3339import com .google .edwmigration .dumper .plugin .ext .jdk .annotation .Description ;
3440import com .google .edwmigration .dumper .plugin .lib .dumper .spi .TeradataLogsDumpFormat ;
41+ import java .sql .Connection ;
42+ import java .sql .SQLException ;
3543import java .time .format .DateTimeFormatter ;
3644import java .util .ArrayList ;
45+ import java .util .Collections ;
3746import java .util .List ;
47+ import java .util .function .Function ;
3848import java .util .function .Predicate ;
3949import javax .annotation .Nonnull ;
4050import org .apache .commons .lang3 .StringUtils ;
4151import org .slf4j .Logger ;
4252import org .slf4j .LoggerFactory ;
53+ import org .springframework .dao .DataAccessException ;
54+ import org .springframework .jdbc .core .JdbcTemplate ;
55+ import org .springframework .jdbc .core .ResultSetExtractor ;
4356
4457/** */
4558@ AutoService ({Connector .class , LogsConnector .class })
4659@ Description ("Dumps logs from Teradata version <=14." )
4760@ RespectsArgumentQueryLogDays
4861@ RespectsArgumentQueryLogStart
4962@ RespectsArgumentQueryLogEnd
50- public class Teradata14LogsConnector extends TeradataLogsConnector {
63+ public class Teradata14LogsConnector extends AbstractTeradataConnector
64+ implements LogsConnector , TeradataLogsDumpFormat {
5165
5266 private static final Logger LOG = LoggerFactory .getLogger (Teradata14LogsConnector .class );
5367
@@ -70,7 +84,95 @@ public Teradata14LogsConnector() {
7084 super ("teradata14-logs" );
7185 }
7286
73- private static class LSqlQueryFactory extends TeradataLogsJdbcTask {
87+ private abstract static class Teradata14LogsJdbcTask extends AbstractJdbcTask <Void > {
88+
89+ protected static String EXPRESSION_VALIDITY_QUERY = "SELECT TOP 1 %s FROM %s" ;
90+
91+ protected final SharedState state ;
92+ protected final String logTable ;
93+ protected final String queryTable ;
94+ protected final List <String > conditions ;
95+ protected final ZonedInterval interval ;
96+ protected final List <String > orderBy ;
97+
98+ public Teradata14LogsJdbcTask (
99+ @ Nonnull String targetPath ,
100+ SharedState state ,
101+ String logTable ,
102+ String queryTable ,
103+ List <String > conditions ,
104+ ZonedInterval interval ) {
105+ this (targetPath , state , logTable , queryTable , conditions , interval , Collections .emptyList ());
106+ }
107+
108+ protected Teradata14LogsJdbcTask (
109+ @ Nonnull String targetPath ,
110+ SharedState state ,
111+ String logTable ,
112+ String queryTable ,
113+ List <String > conditions ,
114+ ZonedInterval interval ,
115+ List <String > orderBy ) {
116+ super (targetPath );
117+ this .state = Preconditions .checkNotNull (state , "SharedState was null." );
118+ this .logTable = logTable ;
119+ this .queryTable = queryTable ;
120+ this .conditions = conditions ;
121+ this .interval = interval ;
122+ this .orderBy = orderBy ;
123+ }
124+
125+ @ Override
126+ protected Void doInConnection (
127+ TaskRunContext context , JdbcHandle jdbcHandle , ByteSink sink , Connection connection )
128+ throws SQLException {
129+ String sql = getSql (jdbcHandle );
130+ ResultSetExtractor <Void > rse = newCsvResultSetExtractor (sink , -1 );
131+ return doSelect (connection , rse , sql );
132+ }
133+
134+ @ Nonnull
135+ private String getSql (@ Nonnull JdbcHandle handle ) {
136+ Function <String , Boolean > validator =
137+ expression -> isValid (handle .getJdbcTemplate (), expression );
138+ Predicate <String > predicate =
139+ expression -> state .expressionValidity .computeIfAbsent (expression , validator );
140+ String sql = getSql (predicate );
141+ // LOG.debug("SQL is " + sql);
142+ return sql ;
143+ }
144+
145+ @ Nonnull
146+ protected abstract String getSql (@ Nonnull Predicate <? super String > predicate );
147+
148+ /**
149+ * Runs a test query to check whether a given projection expression is legal on this Teradata
150+ * instance.
151+ */
152+ @ Nonnull
153+ private Boolean isValid (@ Nonnull JdbcTemplate template , @ Nonnull String expression ) {
154+ String table = isQueryTable (expression ) ? queryTable + " ST" : logTable + " L" ;
155+ String sql = String .format (EXPRESSION_VALIDITY_QUERY , expression , table );
156+ LOG .info ("Checking legality of projection expression '{}' using query: {}" , expression , sql );
157+ try {
158+ template .query (sql , rs -> {});
159+ return Boolean .TRUE ;
160+ } catch (DataAccessException e ) {
161+ LOG .info (
162+ "Attribute '{}' is absent, will use NULL in projection: {}" ,
163+ expression ,
164+ e .getMessage ());
165+ return Boolean .FALSE ;
166+ }
167+ }
168+
169+ @ Override
170+ public String toString () {
171+ return getSql (Predicates .alwaysTrue ());
172+ }
173+ }
174+
175+ private static class LSqlQueryFactory extends Teradata14LogsJdbcTask {
74176
75177 public LSqlQueryFactory (
76178 String targetPath ,
@@ -84,7 +186,7 @@ public LSqlQueryFactory(
84186
85187 @ Override
86188 @ Nonnull
87- String getSql (@ Nonnull Predicate <? super String > predicate ) {
189+ protected String getSql (@ Nonnull Predicate <? super String > predicate ) {
88190 StringBuilder buf = new StringBuilder ("SELECT " );
89191
90192 String separator = "" ;
@@ -115,7 +217,7 @@ String getSql(@Nonnull Predicate<? super String> predicate) {
115217 }
116218 }
117219
118- private static class LogQueryFactory extends TeradataLogsJdbcTask {
220+ private static class LogQueryFactory extends Teradata14LogsJdbcTask {
119221
120222 public LogQueryFactory (
121223 String targetPath ,
@@ -129,7 +231,7 @@ public LogQueryFactory(
129231
130232 @ Override
131233 @ Nonnull
132- String getSql (@ Nonnull Predicate <? super String > predicate ) {
234+ protected String getSql (@ Nonnull Predicate <? super String > predicate ) {
133235 StringBuilder buf = new StringBuilder ("SELECT " );
134236
135237 String separator = "" ;
0 commit comments