Skip to content

Commit fe96594

Browse files
authored
Merge pull request #592 from cloudsufi/cherry-pick/2a56874ce8f8c8c19db8c1d73af71a1d1ad34141/rb-1.11
[🍒][PLUGIN-1888] Skip field validation for compatiblity
2 parents 1827fd7 + 2171550 commit fe96594

File tree

5 files changed

+82
-13
lines changed

5 files changed

+82
-13
lines changed

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) {
484484
}
485485

486486
@VisibleForTesting
487-
static void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) {
487+
void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) {
488488
if (configSchema == null) {
489489
collector.addFailure("Schema should not be null or empty.", null)
490490
.withConfigProperty(SCHEMA);
@@ -505,14 +505,20 @@ static void validateSchema(Schema actualSchema, Schema configSchema, FailureColl
505505
Schema expectedFieldSchema = field.getSchema().isNullable() ?
506506
field.getSchema().getNonNullable() : field.getSchema();
507507

508-
if (actualFieldSchema.getType() != expectedFieldSchema.getType() ||
509-
actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) {
510-
collector.addFailure(
511-
String.format("Schema field '%s' has type '%s but found '%s'.",
512-
field.getName(), expectedFieldSchema.getDisplayName(),
513-
actualFieldSchema.getDisplayName()), null)
514-
.withOutputSchemaField(field.getName());
515-
}
508+
validateField(collector, field, actualFieldSchema, expectedFieldSchema);
509+
}
510+
}
511+
512+
protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema,
513+
Schema expectedFieldSchema) {
514+
if (actualFieldSchema.getType() != expectedFieldSchema.getType() ||
515+
actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) {
516+
collector.addFailure(
517+
String.format("Schema field '%s' is expected to have type '%s but found '%s'.", field.getName(),
518+
expectedFieldSchema.getDisplayName(), actualFieldSchema.getDisplayName()),
519+
String.format("Change the data type of field %s to %s.", field.getName(),
520+
actualFieldSchema.getDisplayName()))
521+
.withOutputSchemaField(field.getName());
516522
}
517523
}
518524

database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,17 @@ public class AbstractDBSourceTest {
4343
Schema.Field.of("double_column", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
4444
Schema.Field.of("boolean_column", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN)))
4545
);
46+
private static final AbstractDBSource.DBSourceConfig TEST_CONFIG = new AbstractDBSource.DBSourceConfig() {
47+
@Override
48+
public String getConnectionString() {
49+
return "";
50+
}
51+
};
4652

4753
@Test
4854
public void testValidateSourceSchemaCorrectSchema() {
4955
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
50-
AbstractDBSource.DBSourceConfig.validateSchema(SCHEMA, SCHEMA, collector);
56+
TEST_CONFIG.validateSchema(SCHEMA, SCHEMA, collector);
5157
Assert.assertEquals(0, collector.getValidationFailures().size());
5258
}
5359

@@ -65,7 +71,7 @@ public void testValidateSourceSchemaMismatchFields() {
6571
);
6672

6773
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
68-
AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
74+
TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector);
6975
assertPropertyValidationFailed(collector, "boolean_column");
7076
}
7177

@@ -84,7 +90,7 @@ public void testValidateSourceSchemaInvalidFieldType() {
8490
);
8591

8692
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
87-
AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
93+
TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector);
8894
assertPropertyValidationFailed(collector, "boolean_column");
8995
}
9096

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.mariadb;
18+
19+
import io.cdap.plugin.mysql.MysqlFieldsValidator;
20+
21+
/**
22+
* Field validator for maraidb
23+
*/
24+
public class MariadbFieldsValidator extends MysqlFieldsValidator {
25+
}

mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import io.cdap.plugin.db.SchemaReader;
2626
import io.cdap.plugin.db.config.DBSpecificSinkConfig;
2727
import io.cdap.plugin.db.sink.AbstractDBSink;
28+
import io.cdap.plugin.db.sink.FieldsValidator;
2829

29-
import io.cdap.plugin.mysql.MysqlDBRecord;
3030
import java.util.Map;
3131
import javax.annotation.Nullable;
3232

@@ -60,6 +60,11 @@ protected SchemaReader getSchemaReader() {
6060
}
6161

6262

63+
@Override
64+
protected FieldsValidator getFieldsValidator() {
65+
return new MariadbFieldsValidator();
66+
}
67+
6368
/**
6469
* MariaDB Sink Config.
6570
*/

mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.cdap.cdap.api.annotation.Description;
2020
import io.cdap.cdap.api.annotation.Name;
2121
import io.cdap.cdap.api.annotation.Plugin;
22+
import io.cdap.cdap.api.data.schema.Schema;
23+
import io.cdap.cdap.etl.api.FailureCollector;
2224
import io.cdap.cdap.etl.api.batch.BatchSource;
2325
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2426
import io.cdap.plugin.common.Asset;
@@ -162,5 +164,30 @@ public Map<String, String> getConnectionArguments() {
162164
arguments.putIfAbsent(MARIADB_TINYINT1_IS_BIT, "false");
163165
return arguments;
164166
}
167+
168+
@Override
169+
protected void validateField(FailureCollector collector,
170+
Schema.Field field,
171+
Schema actualFieldSchema,
172+
Schema expectedFieldSchema) {
173+
// Backward compatibility changes to support MySQL YEAR to Date type conversion
174+
if (Schema.LogicalType.DATE.equals(expectedFieldSchema.getLogicalType())
175+
&& Schema.Type.INT.equals(actualFieldSchema.getType())) {
176+
return;
177+
}
178+
179+
// Backward compatibility change to support MySQL MEDIUMINT UNSIGNED to Long type conversion
180+
if (Schema.Type.LONG.equals(expectedFieldSchema.getType())
181+
&& Schema.Type.INT.equals(actualFieldSchema.getType())) {
182+
return;
183+
}
184+
185+
// Backward compatibility change to support MySQL TINYINT(1) to Bool type conversion
186+
if (Schema.Type.BOOLEAN.equals(expectedFieldSchema.getType())
187+
&& Schema.Type.INT.equals(actualFieldSchema.getType())) {
188+
return;
189+
}
190+
super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);
191+
}
165192
}
166193
}

0 commit comments

Comments
 (0)