35
35
import lombok .extern .slf4j .Slf4j ;
36
36
37
37
import java .util .ArrayList ;
38
- import java .util .Arrays ;
39
38
import java .util .List ;
40
39
import java .util .stream .Collectors ;
41
40
42
41
@ Slf4j
43
42
public class FilterFieldTransform extends AbstractCatalogSupportTransform {
44
43
public static final String PLUGIN_NAME = "Filter" ;
45
44
private int [] inputValueIndex ;
46
- private final String [] fields ;
45
+ private final List < String > fields ;
47
46
48
47
public FilterFieldTransform (
49
48
@ NonNull ReadonlyConfig config , @ NonNull CatalogTable catalogTable ) {
50
49
super (catalogTable );
51
50
SeaTunnelRowType seaTunnelRowType = catalogTable .getTableSchema ().toPhysicalRowDataType ();
52
- fields = config .get (FilterFieldTransformConfig .KEY_FIELDS ). toArray ( new String [ 0 ]) ;
51
+ fields = config .get (FilterFieldTransformConfig .KEY_FIELDS );
53
52
List <String > canNotFoundFields =
54
- Arrays .stream (fields )
55
- .filter (
56
- field -> {
57
- try {
58
- seaTunnelRowType .indexOf (field );
59
- return false ;
60
- } catch (Exception e ) {
61
- return true ;
62
- }
63
- })
53
+ fields .stream ()
54
+ .filter (field -> seaTunnelRowType .indexOf (field , false ) == -1 )
64
55
.collect (Collectors .toList ());
65
56
66
57
if (!CollectionUtils .isEmpty (canNotFoundFields )) {
@@ -77,34 +68,32 @@ public String getPluginName() {
77
68
@ Override
78
69
protected SeaTunnelRow transformRow (SeaTunnelRow inputRow ) {
79
70
// todo reuse array container if not remove fields
80
- Object [] values = new Object [fields .length ];
81
- for (int i = 0 ; i < fields .length ; i ++) {
71
+ Object [] values = new Object [fields .size () ];
72
+ for (int i = 0 ; i < fields .size () ; i ++) {
82
73
values [i ] = inputRow .getField (inputValueIndex [i ]);
83
74
}
84
- return new SeaTunnelRow (values );
75
+ SeaTunnelRow outputRow = new SeaTunnelRow (values );
76
+ outputRow .setRowKind (inputRow .getRowKind ());
77
+ outputRow .setTableId (inputRow .getTableId ());
78
+ return outputRow ;
85
79
}
86
80
87
81
@ Override
88
82
protected TableSchema transformTableSchema () {
89
- List <String > filterFields = Arrays .asList (fields );
90
83
List <Column > outputColumns = new ArrayList <>();
91
84
92
85
SeaTunnelRowType seaTunnelRowType =
93
86
inputCatalogTable .getTableSchema ().toPhysicalRowDataType ();
94
87
95
- inputValueIndex = new int [filterFields .size ()];
88
+ inputValueIndex = new int [fields .size ()];
96
89
ArrayList <String > outputFieldNames = new ArrayList <>();
97
- for (int i = 0 ; i < filterFields .size (); i ++) {
98
- String field = filterFields .get (i );
90
+ List <Column > inputColumns = inputCatalogTable .getTableSchema ().getColumns ();
91
+ for (int i = 0 ; i < fields .size (); i ++) {
92
+ String field = fields .get (i );
99
93
int inputFieldIndex = seaTunnelRowType .indexOf (field );
100
- if (inputFieldIndex == -1 ) {
101
- throw TransformCommonError .cannotFindInputFieldError (getPluginName (), field );
102
- }
103
94
inputValueIndex [i ] = inputFieldIndex ;
104
- outputColumns .add (
105
- inputCatalogTable .getTableSchema ().getColumns ().get (inputFieldIndex ).copy ());
106
- outputFieldNames .add (
107
- inputCatalogTable .getTableSchema ().getColumns ().get (inputFieldIndex ).getName ());
95
+ outputColumns .add (inputColumns .get (inputFieldIndex ).copy ());
96
+ outputFieldNames .add (inputColumns .get (inputFieldIndex ).getName ());
108
97
}
109
98
110
99
List <ConstraintKey > outputConstraintKeys =
@@ -123,10 +112,9 @@ protected TableSchema transformTableSchema() {
123
112
.collect (Collectors .toList ());
124
113
125
114
PrimaryKey copiedPrimaryKey = null ;
126
- if (inputCatalogTable .getTableSchema ().getPrimaryKey () != null
127
- && outputFieldNames .containsAll (
128
- inputCatalogTable .getTableSchema ().getPrimaryKey ().getColumnNames ())) {
129
- copiedPrimaryKey = inputCatalogTable .getTableSchema ().getPrimaryKey ().copy ();
115
+ PrimaryKey primaryKey = inputCatalogTable .getTableSchema ().getPrimaryKey ();
116
+ if (primaryKey != null && outputFieldNames .containsAll (primaryKey .getColumnNames ())) {
117
+ copiedPrimaryKey = primaryKey .copy ();
130
118
}
131
119
132
120
return TableSchema .builder ()
0 commit comments