Skip to content

Commit b54b240

Browse files
authored
Merge pull request #591 from cloudsufi/cherry-pick/2a56874ce8f8c8c19db8c1d73af71a1d1ad34141/rb-1.12
[🍒][PLUGIN-1888] Skip field validation for compatiblity
2 parents a88d9f0 + 1c0c4b6 commit b54b240

File tree

5 files changed

+83
-12
lines changed

5 files changed

+83
-12
lines changed

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) {
529529
}
530530

531531
@VisibleForTesting
532-
static void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) {
532+
void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) {
533533
if (configSchema == null) {
534534
collector.addFailure("Schema should not be null or empty.", null)
535535
.withConfigProperty(SCHEMA);
@@ -550,14 +550,20 @@ static void validateSchema(Schema actualSchema, Schema configSchema, FailureColl
550550
Schema expectedFieldSchema = field.getSchema().isNullable() ?
551551
field.getSchema().getNonNullable() : field.getSchema();
552552

553-
if (actualFieldSchema.getType() != expectedFieldSchema.getType() ||
554-
actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) {
555-
collector.addFailure(
556-
String.format("Schema field '%s' has type '%s but found '%s'.",
557-
field.getName(), expectedFieldSchema.getDisplayName(),
558-
actualFieldSchema.getDisplayName()), null)
559-
.withOutputSchemaField(field.getName());
560-
}
553+
validateField(collector, field, actualFieldSchema, expectedFieldSchema);
554+
}
555+
}
556+
557+
protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema,
558+
Schema expectedFieldSchema) {
559+
if (actualFieldSchema.getType() != expectedFieldSchema.getType() ||
560+
actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) {
561+
collector.addFailure(
562+
String.format("Schema field '%s' is expected to have type '%s but found '%s'.", field.getName(),
563+
expectedFieldSchema.getDisplayName(), actualFieldSchema.getDisplayName()),
564+
String.format("Change the data type of field %s to %s.", field.getName(),
565+
actualFieldSchema.getDisplayName()))
566+
.withOutputSchemaField(field.getName());
561567
}
562568
}
563569

database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,17 @@ public class AbstractDBSourceTest {
4343
Schema.Field.of("double_column", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
4444
Schema.Field.of("boolean_column", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN)))
4545
);
46+
private static final AbstractDBSource.DBSourceConfig TEST_CONFIG = new AbstractDBSource.DBSourceConfig() {
47+
@Override
48+
public String getConnectionString() {
49+
return "";
50+
}
51+
};
4652

4753
@Test
4854
public void testValidateSourceSchemaCorrectSchema() {
4955
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
50-
AbstractDBSource.DBSourceConfig.validateSchema(SCHEMA, SCHEMA, collector);
56+
TEST_CONFIG.validateSchema(SCHEMA, SCHEMA, collector);
5157
Assert.assertEquals(0, collector.getValidationFailures().size());
5258
}
5359

@@ -65,7 +71,7 @@ public void testValidateSourceSchemaMismatchFields() {
6571
);
6672

6773
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
68-
AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
74+
TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector);
6975
assertPropertyValidationFailed(collector, "boolean_column");
7076
}
7177

@@ -84,7 +90,7 @@ public void testValidateSourceSchemaInvalidFieldType() {
8490
);
8591

8692
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
87-
AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
93+
TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector);
8894
assertPropertyValidationFailed(collector, "boolean_column");
8995
}
9096

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.mariadb;
18+
19+
import io.cdap.plugin.mysql.MysqlFieldsValidator;
20+
21+
/**
22+
* Field validator for maraidb
23+
*/
24+
public class MariadbFieldsValidator extends MysqlFieldsValidator {
25+
}

mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java

+7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.cdap.plugin.db.SchemaReader;
2626
import io.cdap.plugin.db.config.DBSpecificSinkConfig;
2727
import io.cdap.plugin.db.sink.AbstractDBSink;
28+
import io.cdap.plugin.db.sink.FieldsValidator;
29+
import io.cdap.plugin.mysql.MysqlFieldsValidator;
2830
import io.cdap.plugin.util.DBUtils;
2931

3032
import java.util.Map;
@@ -70,6 +72,11 @@ protected String getExternalDocumentationLink() {
7072
return DBUtils.MARIADB_SUPPORTED_DOC_URL;
7173
}
7274

75+
@Override
76+
protected FieldsValidator getFieldsValidator() {
77+
return new MariadbFieldsValidator();
78+
}
79+
7380
/**
7481
* MariaDB Sink Config.
7582
*/

mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java

+27
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.cdap.cdap.api.annotation.Description;
2020
import io.cdap.cdap.api.annotation.Name;
2121
import io.cdap.cdap.api.annotation.Plugin;
22+
import io.cdap.cdap.api.data.schema.Schema;
23+
import io.cdap.cdap.etl.api.FailureCollector;
2224
import io.cdap.cdap.etl.api.batch.BatchSource;
2325
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2426
import io.cdap.plugin.common.Asset;
@@ -172,5 +174,30 @@ public Map<String, String> getConnectionArguments() {
172174
arguments.putIfAbsent(MARIADB_TINYINT1_IS_BIT, "false");
173175
return arguments;
174176
}
177+
178+
@Override
179+
protected void validateField(FailureCollector collector,
180+
Schema.Field field,
181+
Schema actualFieldSchema,
182+
Schema expectedFieldSchema) {
183+
// Backward compatibility changes to support MySQL YEAR to Date type conversion
184+
if (Schema.LogicalType.DATE.equals(expectedFieldSchema.getLogicalType())
185+
&& Schema.Type.INT.equals(actualFieldSchema.getType())) {
186+
return;
187+
}
188+
189+
// Backward compatibility change to support MySQL MEDIUMINT UNSIGNED to Long type conversion
190+
if (Schema.Type.LONG.equals(expectedFieldSchema.getType())
191+
&& Schema.Type.INT.equals(actualFieldSchema.getType())) {
192+
return;
193+
}
194+
195+
// Backward compatibility change to support MySQL TINYINT(1) to Bool type conversion
196+
if (Schema.Type.BOOLEAN.equals(expectedFieldSchema.getType())
197+
&& Schema.Type.INT.equals(actualFieldSchema.getType())) {
198+
return;
199+
}
200+
super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);
201+
}
175202
}
176203
}

0 commit comments

Comments
 (0)