47
47
import java .util .stream .Collectors ;
48
48
49
49
/**
50
- * A Wrangler step for converting data type of column
51
- * Accepted types are: int, short, long, double, float, string, boolean and bytes
52
- * When decimal type is selected, can also specify the scale, precision and rounding mode
50
+ * A Wrangler step for converting data type of column Accepted types are: int, short, long, double,
51
+ * float, string, boolean and bytes When decimal type is selected, can also specify the scale,
52
+ * precision and rounding mode
53
53
*/
54
54
@ Plugin (type = "directives" )
55
55
@ Name (SetType .NAME )
56
56
@ Categories (categories = {"column" })
57
57
@ Description ("Converting data type of a column. Optional arguments scale, precision and "
58
58
+ "rounding-mode are used only when type is decimal." )
59
59
public final class SetType implements Directive , Lineage {
60
+
60
61
public static final String NAME = "set-type" ;
61
62
62
63
private String col ;
@@ -72,7 +73,8 @@ public UsageDefinition define() {
72
73
builder .define ("type" , TokenType .IDENTIFIER );
73
74
builder .define ("scale" , TokenType .NUMERIC , Optional .TRUE );
74
75
builder .define ("rounding-mode" , TokenType .TEXT , Optional .TRUE );
75
- builder .define ("precision" , TokenType .PROPERTIES , "prop:{precision=<precision>}" , Optional .TRUE );
76
+ builder .define ("precision" , TokenType .PROPERTIES , "prop:{precision=<precision>}" ,
77
+ Optional .TRUE );
76
78
return builder .build ();
77
79
}
78
80
@@ -88,15 +90,18 @@ public void initialize(Arguments args) throws DirectiveParseException {
88
90
}
89
91
scale = args .contains ("scale" ) ? ((Numeric ) args .value ("scale" )).value ().intValue () : null ;
90
92
if (scale == null && precision == null && args .contains ("rounding-mode" )) {
91
- throw new DirectiveParseException ("'rounding-mode' can only be specified when a 'scale' or 'precision' is set" );
93
+ throw new DirectiveParseException (
94
+ "'rounding-mode' can only be specified when a 'scale' or 'precision' is set" );
92
95
}
93
96
try {
94
97
roundingMode = args .contains ("rounding-mode" ) ?
95
- RoundingMode .valueOf (((Text ) args .value ("rounding-mode" )).value ()) :
96
- (scale == null && precision == null ? RoundingMode .UNNECESSARY : RoundingMode .HALF_EVEN );
98
+ RoundingMode .valueOf (((Text ) args .value ("rounding-mode" )).value ()) :
99
+ (scale == null && precision == null ? RoundingMode .UNNECESSARY
100
+ : RoundingMode .HALF_EVEN );
97
101
} catch (IllegalArgumentException e ) {
98
102
throw new DirectiveParseException (String .format (
99
- "Specified rounding-mode '%s' is not a valid Java rounding mode" , args .value ("rounding-mode" ).value ()), e );
103
+ "Specified rounding-mode '%s' is not a valid Java rounding mode" ,
104
+ args .value ("rounding-mode" ).value ()), e );
100
105
}
101
106
}
102
107
}
@@ -107,7 +112,8 @@ public void destroy() {
107
112
}
108
113
109
114
@ Override
110
- public List <Row > execute (List <Row > rows , ExecutorContext context ) throws DirectiveExecutionException {
115
+ public List <Row > execute (List <Row > rows , ExecutorContext context )
116
+ throws DirectiveExecutionException {
111
117
for (Row row : rows ) {
112
118
ColumnConverter .convertType (NAME , row , col , type , scale , precision , roundingMode );
113
119
}
@@ -117,76 +123,83 @@ public List<Row> execute(List<Row> rows, ExecutorContext context) throws Directi
117
123
@ Override
118
124
public Mutation lineage () {
119
125
return Mutation .builder ()
120
- .readable ("Changed the column '%s' to type '%s'" , col , type )
121
- .relation (col , col )
122
- .build ();
126
+ .readable ("Changed the column '%s' to type '%s'" , col , type )
127
+ .relation (col , col )
128
+ .build ();
123
129
}
124
130
125
131
@ Override
126
132
public Schema getOutputSchema (SchemaResolutionContext context ) {
127
133
Schema inputSchema = context .getInputSchema ();
128
134
return Schema .recordOf (
129
- "outputSchema" ,
130
- inputSchema .getFields ().stream ()
131
- .map (
132
- field -> {
133
- try {
134
- if (field .getName ().equals (col )) {
135
- Integer outputScale = scale ;
136
- Integer outputPrecision = precision ;
137
- if (type .equalsIgnoreCase ("decimal" ) && field .getSchema ().isNullable ()) {
138
- Schema fieldSchema = field .getSchema ().getNonNullable ();
139
- Pair <Integer , Integer > scaleAndPrecision = getPrecisionAndScale (fieldSchema );
140
- Integer inputSchemaScale = scaleAndPrecision .getSecond ();
141
- Integer inputSchemaPrecision = scaleAndPrecision .getFirst ();
142
-
143
- if (scale == null && precision == null ) {
144
- outputScale = inputSchemaScale ;
145
- outputPrecision = inputSchemaPrecision ;
146
- } else if (scale == null && inputSchemaScale != null ) {
147
- if (precision - inputSchemaScale < 1 ) {
148
- throw new DirectiveParseException (String .format (
149
- "Cannot set scale as '%s' and precision as '%s' when "
150
- + "given precision - scale is less than 1 " , inputSchemaScale ,
151
- precision ));
152
- }
153
- outputScale = inputSchemaScale ;
154
- outputPrecision = precision ;
155
-
156
- } else if (precision == null && inputSchemaPrecision != null ) {
157
- if (inputSchemaPrecision - scale < 1 ) {
158
- throw new DirectiveParseException (String .format (
159
- "Cannot set scale as '%s' and precision as '%s' when "
160
- + "given precision - scale is less than 1 " , scale ,
161
- inputSchemaPrecision ));
135
+ "outputSchema" ,
136
+ inputSchema .getFields ().stream ()
137
+ .map (
138
+ field -> {
139
+ try {
140
+ if (field .getName ().equals (col )) {
141
+ Integer outputScale = scale ;
142
+ Integer outputPrecision = precision ;
143
+ if (type .equalsIgnoreCase ("decimal" ) && field .getSchema ().isNullable ()) {
144
+ Schema fieldSchema = field .getSchema ().getNonNullable ();
145
+ Pair <Integer , Integer > scaleAndPrecision = getValidatedPrecisionAndScale (
146
+ fieldSchema , precision , scale );
147
+ outputScale = scaleAndPrecision .getSecond ();
148
+ outputPrecision = scaleAndPrecision .getFirst ();
149
+ }
150
+ return Schema .Field .of (col , ColumnConverter .getSchemaForType (type ,
151
+ outputScale , outputPrecision ));
162
152
}
163
- outputScale = scale ;
164
- outputPrecision = inputSchemaPrecision ;
153
+ return field ;
154
+ } catch (DirectiveParseException e ) {
155
+ throw new RuntimeException (e );
165
156
}
166
157
}
167
- return Schema .Field .of (col , ColumnConverter .getSchemaForType (type ,
168
- outputScale , outputPrecision ));
169
- }
170
- return field ;
171
- } catch (DirectiveParseException e ) {
172
- throw new RuntimeException (e );
173
- }
174
- }
175
- )
176
- .collect (Collectors .toList ())
158
+ )
159
+ .collect (Collectors .toList ())
177
160
);
178
161
}
179
162
180
163
/**
181
164
* extracts precision and scale from schema string
182
165
*/
183
- public static Pair <Integer , Integer > getPrecisionAndScale (Schema fieldSchema ) {
184
- Integer precision = null ;
185
- Integer scale = null ;
186
- if (fieldSchema .getLogicalType () == LogicalType .DECIMAL ) {
187
- precision = fieldSchema .getPrecision ();
188
- scale = fieldSchema .getScale ();
166
+ public static Pair <Integer , Integer > getValidatedPrecisionAndScale (Schema fieldSchema ,
167
+ Integer precision , Integer scale )
168
+ throws DirectiveParseException { //check precision and scale
169
+ Integer outputPrecision = precision ;
170
+ Integer outputScale = scale ;
171
+ Integer inputSchemaPrecision = null ;
172
+ Integer inputSchemaScale = null ;
173
+
174
+ if (fieldSchema .getLogicalType () == LogicalType .DECIMAL ) {
175
+ inputSchemaPrecision = fieldSchema .getPrecision ();
176
+ inputSchemaScale = fieldSchema .getScale ();
177
+ }
178
+
179
+ if (scale == null && precision == null ) {
180
+ outputScale = inputSchemaScale ;
181
+ outputPrecision = inputSchemaPrecision ;
182
+ } else if (scale == null && inputSchemaScale != null ) {
183
+ if (precision - inputSchemaScale < 1 ) {
184
+ throw new DirectiveParseException (String .format (
185
+ "Cannot set scale as '%s' and precision as '%s' when "
186
+ + "given precision - scale is less than 1 " , inputSchemaScale ,
187
+ precision ));
189
188
}
190
- return new Pair <Integer , Integer >(precision , scale );
189
+ outputScale = inputSchemaScale ;
190
+ outputPrecision = precision ;
191
+
192
+ } else if (precision == null && inputSchemaPrecision != null ) {
193
+ if (inputSchemaPrecision - scale < 1 ) {
194
+ throw new DirectiveParseException (String .format (
195
+ "Cannot set scale as '%s' and precision as '%s' when "
196
+ + "given precision - scale is less than 1 " , scale ,
197
+ inputSchemaPrecision ));
198
+ }
199
+ outputScale = scale ;
200
+ outputPrecision = inputSchemaPrecision ;
191
201
}
202
+
203
+ return new Pair <Integer , Integer >(outputPrecision , outputScale );
204
+ }
192
205
}
0 commit comments