17
17
18
18
package org .apache .seatunnel .connectors .seatunnel .google .sheets .source ;
19
19
20
- import org .apache .seatunnel .shade .com .typesafe .config .Config ;
21
-
22
- import org .apache .seatunnel .api .common .PrepareFailException ;
23
- import org .apache .seatunnel .api .common .SeaTunnelAPIErrorCode ;
24
- import org .apache .seatunnel .api .options .ConnectorCommonOptions ;
25
20
import org .apache .seatunnel .api .serialization .DeserializationSchema ;
26
21
import org .apache .seatunnel .api .source .Boundedness ;
27
- import org .apache .seatunnel .api .source .SeaTunnelSource ;
28
22
import org .apache .seatunnel .api .table .catalog .CatalogTable ;
29
- import org .apache .seatunnel .api .table .catalog .CatalogTableUtil ;
30
- import org .apache .seatunnel .api .table .type .SeaTunnelDataType ;
31
23
import org .apache .seatunnel .api .table .type .SeaTunnelRow ;
32
- import org .apache .seatunnel .api .table .type .SeaTunnelRowType ;
33
- import org .apache .seatunnel .common .config .CheckConfigUtil ;
34
- import org .apache .seatunnel .common .config .CheckResult ;
35
- import org .apache .seatunnel .common .constants .PluginType ;
36
24
import org .apache .seatunnel .connectors .seatunnel .common .source .AbstractSingleSplitReader ;
37
25
import org .apache .seatunnel .connectors .seatunnel .common .source .AbstractSingleSplitSource ;
38
26
import org .apache .seatunnel .connectors .seatunnel .common .source .SingleSplitReaderContext ;
39
- import org .apache .seatunnel .connectors .seatunnel .google .sheets .config .SheetsConfig ;
40
27
import org .apache .seatunnel .connectors .seatunnel .google .sheets .config .SheetsParameters ;
41
- import org .apache .seatunnel .connectors .seatunnel .google .sheets .exception .GoogleSheetsConnectorException ;
42
28
import org .apache .seatunnel .format .json .JsonDeserializationSchema ;
43
29
44
- import com .google .auto .service .AutoService ;
30
+ import java .util .Collections ;
31
+ import java .util .List ;
45
32
46
- @ AutoService (SeaTunnelSource .class )
47
33
public class SheetsSource extends AbstractSingleSplitSource <SeaTunnelRow > {
48
34
49
- private SeaTunnelRowType seaTunnelRowType ;
50
- private CatalogTable catalogTable ;
35
+ private final CatalogTable catalogTable ;
51
36
52
- private SheetsParameters sheetsParameters ;
37
+ private final SheetsParameters sheetsParameters ;
53
38
54
- private DeserializationSchema <SeaTunnelRow > deserializationSchema ;
39
+ private final DeserializationSchema <SeaTunnelRow > deserializationSchema ;
55
40
56
- @ Override
57
- public String getPluginName () {
58
- return "GoogleSheets" ;
41
+ public SheetsSource (CatalogTable catalogTable , SheetsParameters sheetsParameters ) {
42
+ this .catalogTable = catalogTable ;
43
+ this .sheetsParameters = sheetsParameters ;
44
+ this .deserializationSchema = new JsonDeserializationSchema (catalogTable , false , false );
59
45
}
60
46
61
47
@ Override
62
- public void prepare (Config pluginConfig ) throws PrepareFailException {
63
- CheckResult checkResult =
64
- CheckConfigUtil .checkAllExists (
65
- pluginConfig ,
66
- SheetsConfig .SERVICE_ACCOUNT_KEY .key (),
67
- SheetsConfig .SHEET_ID .key (),
68
- SheetsConfig .SHEET_NAME .key (),
69
- SheetsConfig .RANGE .key (),
70
- ConnectorCommonOptions .SCHEMA .key ());
71
- if (!checkResult .isSuccess ()) {
72
- throw new GoogleSheetsConnectorException (
73
- SeaTunnelAPIErrorCode .CONFIG_VALIDATION_FAILED ,
74
- String .format (
75
- "PluginName: %s, PluginType: %s, Message: %s" ,
76
- getPluginName (), PluginType .SOURCE , checkResult .getMsg ()));
77
- }
78
- this .sheetsParameters = new SheetsParameters ().buildWithConfig (pluginConfig );
79
- if (pluginConfig .hasPath (ConnectorCommonOptions .SCHEMA .key ())) {
80
- this .catalogTable = CatalogTableUtil .buildWithConfig (pluginConfig );
81
- } else {
82
- this .catalogTable = CatalogTableUtil .buildSimpleTextTable ();
83
- }
84
-
85
- this .seaTunnelRowType = catalogTable .getSeaTunnelRowType ();
86
- this .deserializationSchema = new JsonDeserializationSchema (catalogTable , false , false );
48
+ public String getPluginName () {
49
+ return "GoogleSheets" ;
87
50
}
88
51
89
52
@ Override
@@ -92,14 +55,17 @@ public Boundedness getBoundedness() {
92
55
}
93
56
94
57
@ Override
95
- public SeaTunnelDataType < SeaTunnelRow > getProducedType () {
96
- return seaTunnelRowType ;
58
+ public List < CatalogTable > getProducedCatalogTables () {
59
+ return Collections . singletonList ( catalogTable ) ;
97
60
}
98
61
99
62
@ Override
100
63
public AbstractSingleSplitReader <SeaTunnelRow > createReader (
101
64
SingleSplitReaderContext readerContext ) throws Exception {
102
65
return new SheetsSourceReader (
103
- sheetsParameters , readerContext , deserializationSchema , this .seaTunnelRowType );
66
+ sheetsParameters ,
67
+ readerContext ,
68
+ deserializationSchema ,
69
+ catalogTable .getSeaTunnelRowType ());
104
70
}
105
71
}
0 commit comments