|
17 | 17 |
|
18 | 18 | package org.apache.seatunnel.connectors.seatunnel.google.firestore.sink;
|
19 | 19 |
|
20 |
| -import org.apache.seatunnel.shade.com.typesafe.config.Config; |
21 |
| - |
22 |
| -import org.apache.seatunnel.api.common.PrepareFailException; |
23 |
| -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; |
24 |
| -import org.apache.seatunnel.api.sink.SeaTunnelSink; |
25 | 20 | import org.apache.seatunnel.api.sink.SinkWriter;
|
26 | 21 | import org.apache.seatunnel.api.table.catalog.CatalogTable;
|
27 | 22 | import org.apache.seatunnel.api.table.type.SeaTunnelRow;
|
28 |
| -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; |
29 |
| -import org.apache.seatunnel.common.config.CheckConfigUtil; |
30 |
| -import org.apache.seatunnel.common.config.CheckResult; |
31 |
| -import org.apache.seatunnel.common.constants.PluginType; |
32 | 23 | import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
|
33 | 24 | import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
|
34 | 25 | import org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreParameters;
|
35 |
| -import org.apache.seatunnel.connectors.seatunnel.google.firestore.exception.FirestoreConnectorException; |
36 |
| - |
37 |
| -import com.google.auto.service.AutoService; |
38 | 26 |
|
39 | 27 | import java.io.IOException;
|
40 | 28 | import java.util.Optional;
|
41 | 29 |
|
42 |
| -import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION; |
43 |
| -import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID; |
44 |
| - |
45 |
| -@AutoService(SeaTunnelSink.class) |
46 | 30 | public class FirestoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
|
47 | 31 |
|
48 |
| - private SeaTunnelRowType rowType; |
| 32 | + private final CatalogTable catalogTable; |
49 | 33 |
|
50 |
| - private FirestoreParameters firestoreParameters; |
| 34 | + private final FirestoreParameters firestoreParameters; |
51 | 35 |
|
52 |
| - @Override |
53 |
| - public String getPluginName() { |
54 |
| - return "GoogleFirestore"; |
| 36 | + public FirestoreSink(CatalogTable catalogTable, FirestoreParameters firestoreParameters) { |
| 37 | + this.catalogTable = catalogTable; |
| 38 | + this.firestoreParameters = firestoreParameters; |
55 | 39 | }
|
56 | 40 |
|
57 | 41 | @Override
|
58 |
| - public void prepare(Config pluginConfig) throws PrepareFailException { |
59 |
| - CheckResult result = |
60 |
| - CheckConfigUtil.checkAllExists(pluginConfig, PROJECT_ID.key(), COLLECTION.key()); |
61 |
| - if (!result.isSuccess()) { |
62 |
| - throw new FirestoreConnectorException( |
63 |
| - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, |
64 |
| - String.format( |
65 |
| - "PluginName: %s, PluginType: %s, Message: %s", |
66 |
| - getPluginName(), PluginType.SINK, result.getMsg())); |
67 |
| - } |
68 |
| - this.firestoreParameters = new FirestoreParameters().buildWithConfig(pluginConfig); |
69 |
| - } |
70 |
| - |
71 |
| - @Override |
72 |
| - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { |
73 |
| - this.rowType = seaTunnelRowType; |
| 42 | + public String getPluginName() { |
| 43 | + return "GoogleFirestore"; |
74 | 44 | }
|
75 | 45 |
|
76 | 46 | @Override
|
77 | 47 | public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
|
78 | 48 | throws IOException {
|
79 |
| - return new FirestoreSinkWriter(rowType, firestoreParameters); |
| 49 | + return new FirestoreSinkWriter(catalogTable.getSeaTunnelRowType(), firestoreParameters); |
80 | 50 | }
|
81 | 51 |
|
82 | 52 | @Override
|
83 | 53 | public Optional<CatalogTable> getWriteCatalogTable() {
|
84 |
| - return super.getWriteCatalogTable(); |
| 54 | + return Optional.ofNullable(catalogTable); |
85 | 55 | }
|
86 | 56 | }
|
0 commit comments