17
17
18
18
package org .apache .seatunnel .connectors .seatunnel .hive .source .config ;
19
19
20
+ import org .apache .seatunnel .shade .com .typesafe .config .Config ;
21
+ import org .apache .seatunnel .shade .com .typesafe .config .ConfigValueFactory ;
22
+
20
23
import org .apache .seatunnel .api .common .SeaTunnelAPIErrorCode ;
21
24
import org .apache .seatunnel .api .configuration .ReadonlyConfig ;
22
25
import org .apache .seatunnel .api .table .catalog .CatalogTable ;
28
31
import org .apache .seatunnel .api .table .type .SeaTunnelDataType ;
29
32
import org .apache .seatunnel .api .table .type .SeaTunnelRowType ;
30
33
import org .apache .seatunnel .common .exception .CommonErrorCodeDeprecated ;
31
- import org .apache .seatunnel .connectors .seatunnel .file .config .BaseSourceConfig ;
32
34
import org .apache .seatunnel .connectors .seatunnel .file .config .FileFormat ;
33
35
import org .apache .seatunnel .connectors .seatunnel .file .exception .FileConnectorErrorCode ;
34
36
import org .apache .seatunnel .connectors .seatunnel .file .exception .FileConnectorException ;
35
- import org .apache .seatunnel .connectors .seatunnel .file .hdfs .source .config .HdfsSourceConfig ;
37
+ import org .apache .seatunnel .connectors .seatunnel .file .hdfs .source .config .HdfsSourceConfigOptions ;
36
38
import org .apache .seatunnel .connectors .seatunnel .file .source .reader .ReadStrategy ;
37
39
import org .apache .seatunnel .connectors .seatunnel .file .source .reader .ReadStrategyFactory ;
38
40
import org .apache .seatunnel .connectors .seatunnel .hive .config .HiveConstants ;
55
57
import java .util .ArrayList ;
56
58
import java .util .HashMap ;
57
59
import java .util .List ;
60
+ import java .util .Map ;
61
+
62
+ import static org .apache .seatunnel .connectors .seatunnel .file .config .BaseSinkConfig .FIELD_DELIMITER ;
63
+ import static org .apache .seatunnel .connectors .seatunnel .file .config .BaseSinkConfig .FILE_FORMAT_TYPE ;
64
+ import static org .apache .seatunnel .connectors .seatunnel .file .config .BaseSinkConfig .ROW_DELIMITER ;
58
65
59
66
@ Getter
60
67
public class HiveSourceConfig implements Serializable {
@@ -71,13 +78,13 @@ public class HiveSourceConfig implements Serializable {
71
78
@ SneakyThrows
72
79
public HiveSourceConfig (ReadonlyConfig readonlyConfig ) {
73
80
readonlyConfig
74
- .getOptional (BaseSourceConfig .READ_PARTITIONS )
81
+ .getOptional (HdfsSourceConfigOptions .READ_PARTITIONS )
75
82
.ifPresent (this ::validatePartitions );
76
83
this .table = HiveTableUtils .getTableInfo (readonlyConfig );
77
84
this .hiveHadoopConfig = parseHiveHadoopConfig (readonlyConfig , table );
78
85
this .fileFormat = HiveTableUtils .parseFileFormat (table );
79
- this .readStrategy = parseReadStrategy (readonlyConfig , fileFormat , hiveHadoopConfig );
80
- this .filePaths = parseFilePaths (table , hiveHadoopConfig , readStrategy );
86
+ this .readStrategy = parseReadStrategy (table , readonlyConfig , fileFormat , hiveHadoopConfig );
87
+ this .filePaths = parseFilePaths (table , readStrategy );
81
88
this .catalogTable =
82
89
parseCatalogTable (
83
90
readonlyConfig ,
@@ -108,11 +115,45 @@ private void validatePartitions(List<String> partitionsList) {
108
115
}
109
116
110
117
private ReadStrategy parseReadStrategy (
118
+ Table table ,
111
119
ReadonlyConfig readonlyConfig ,
112
120
FileFormat fileFormat ,
113
121
HiveHadoopConfig hiveHadoopConfig ) {
122
+
114
123
ReadStrategy readStrategy = ReadStrategyFactory .of (fileFormat .name ());
115
- readStrategy .setPluginConfig (readonlyConfig .toConfig ());
124
+ Config config = readonlyConfig .toConfig ();
125
+
126
+ switch (fileFormat ) {
127
+ case TEXT :
128
+ // if the file format is text, we set the delim.
129
+ Map <String , String > parameters = table .getSd ().getSerdeInfo ().getParameters ();
130
+ config =
131
+ config .withValue (
132
+ FIELD_DELIMITER .key (),
133
+ ConfigValueFactory .fromAnyRef (
134
+ parameters .get ("field.delim" )))
135
+ .withValue (
136
+ ROW_DELIMITER .key (),
137
+ ConfigValueFactory .fromAnyRef (parameters .get ("line.delim" )))
138
+ .withValue (
139
+ FILE_FORMAT_TYPE .key (),
140
+ ConfigValueFactory .fromAnyRef (FileFormat .TEXT .name ()));
141
+ break ;
142
+ case ORC :
143
+ config =
144
+ config .withValue (
145
+ FILE_FORMAT_TYPE .key (),
146
+ ConfigValueFactory .fromAnyRef (FileFormat .ORC .name ()));
147
+ break ;
148
+ case PARQUET :
149
+ config =
150
+ config .withValue (
151
+ FILE_FORMAT_TYPE .key (),
152
+ ConfigValueFactory .fromAnyRef (FileFormat .PARQUET .name ()));
153
+ break ;
154
+ default :
155
+ }
156
+ readStrategy .setPluginConfig (config );
116
157
readStrategy .init (hiveHadoopConfig );
117
158
return readStrategy ;
118
159
}
@@ -125,22 +166,24 @@ private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Ta
125
166
readonlyConfig .get (HiveSourceOptions .METASTORE_URI ),
126
167
readonlyConfig .get (HiveSourceOptions .HIVE_SITE_PATH ));
127
168
readonlyConfig
128
- .getOptional (HdfsSourceConfig .HDFS_SITE_PATH )
169
+ .getOptional (HdfsSourceConfigOptions .HDFS_SITE_PATH )
129
170
.ifPresent (hiveHadoopConfig ::setHdfsSitePath );
130
171
readonlyConfig
131
- .getOptional (HdfsSourceConfig .KERBEROS_PRINCIPAL )
172
+ .getOptional (HdfsSourceConfigOptions .KERBEROS_PRINCIPAL )
132
173
.ifPresent (hiveHadoopConfig ::setKerberosPrincipal );
133
174
readonlyConfig
134
- .getOptional (HdfsSourceConfig .KERBEROS_KEYTAB_PATH )
175
+ .getOptional (HdfsSourceConfigOptions .KERBEROS_KEYTAB_PATH )
135
176
.ifPresent (hiveHadoopConfig ::setKerberosKeytabPath );
177
+ readonlyConfig
178
+ .getOptional (HdfsSourceConfigOptions .REMOTE_USER )
179
+ .ifPresent (hiveHadoopConfig ::setRemoteUser );
136
180
return hiveHadoopConfig ;
137
181
}
138
182
139
- private List <String > parseFilePaths (
140
- Table table , HiveHadoopConfig hiveHadoopConfig , ReadStrategy readStrategy ) {
183
+ private List <String > parseFilePaths (Table table , ReadStrategy readStrategy ) {
141
184
String hdfsPath = parseHdfsPath (table );
142
185
try {
143
- return readStrategy .getFileNamesByPath (hiveHadoopConfig , hdfsPath );
186
+ return readStrategy .getFileNamesByPath (hdfsPath );
144
187
} catch (Exception e ) {
145
188
String errorMsg = String .format ("Get file list from this path [%s] failed" , hdfsPath );
146
189
throw new FileConnectorException (
@@ -214,7 +257,7 @@ private CatalogTable parseCatalogTableFromRemotePath(
214
257
CatalogTable catalogTable = buildEmptyCatalogTable (readonlyConfig , table );
215
258
try {
216
259
SeaTunnelRowType seaTunnelRowTypeInfo =
217
- readStrategy .getSeaTunnelRowTypeInfo (hiveHadoopConfig , filePaths .get (0 ));
260
+ readStrategy .getSeaTunnelRowTypeInfo (filePaths .get (0 ));
218
261
return CatalogTableUtil .newCatalogTable (catalogTable , seaTunnelRowTypeInfo );
219
262
} catch (FileConnectorException e ) {
220
263
String errorMsg =
0 commit comments