20
20
import org .apache .seatunnel .api .sink .MultiTableResourceManager ;
21
21
import org .apache .seatunnel .api .sink .SinkWriter ;
22
22
import org .apache .seatunnel .api .sink .SupportMultiTableSinkWriter ;
23
+ import org .apache .seatunnel .api .table .event .SchemaChangeEvent ;
23
24
import org .apache .seatunnel .api .table .type .SeaTunnelRow ;
24
25
25
26
import lombok .extern .slf4j .Slf4j ;
34
35
import java .util .concurrent .BlockingQueue ;
35
36
import java .util .concurrent .ExecutorService ;
36
37
import java .util .concurrent .Executors ;
38
+ import java .util .concurrent .Future ;
37
39
import java .util .concurrent .LinkedBlockingQueue ;
38
40
import java .util .concurrent .TimeUnit ;
39
41
import java .util .concurrent .atomic .AtomicInteger ;
@@ -58,7 +60,9 @@ public MultiTableSinkWriter(
58
60
AtomicInteger cnt = new AtomicInteger (0 );
59
61
executorService =
60
62
Executors .newFixedThreadPool (
61
- queueSize ,
63
+ // we use it in `MultiTableWriterRunnable` and `prepare commit task`, so it
64
+ // should be double.
65
+ queueSize * 2 ,
62
66
runnable -> {
63
67
Thread thread = new Thread (runnable );
64
68
thread .setDaemon (true );
@@ -71,9 +75,9 @@ public MultiTableSinkWriter(
71
75
BlockingQueue <SeaTunnelRow > queue = new LinkedBlockingQueue <>(1024 );
72
76
Map <String , SinkWriter <SeaTunnelRow , ?, ?>> tableIdWriterMap = new HashMap <>();
73
77
Map <SinkIdentifier , SinkWriter <SeaTunnelRow , ?, ?>> sinkIdentifierMap = new HashMap <>();
74
- int finalI = i ;
78
+ int queueIndex = i ;
75
79
sinkWriters .entrySet ().stream ()
76
- .filter (entry -> entry .getKey ().getIndex () % queueSize == finalI )
80
+ .filter (entry -> entry .getKey ().getIndex () % queueSize == queueIndex )
77
81
.forEach (
78
82
entry -> {
79
83
tableIdWriterMap .put (
@@ -119,6 +123,24 @@ private void subSinkErrorCheck() {
119
123
}
120
124
}
121
125
126
+ @ Override
127
+ public void applySchemaChange (SchemaChangeEvent event ) throws IOException {
128
+ subSinkErrorCheck ();
129
+ for (int i = 0 ; i < sinkWritersWithIndex .size (); i ++) {
130
+ for (Map .Entry <SinkIdentifier , SinkWriter <SeaTunnelRow , ?, ?>> sinkWriterEntry :
131
+ sinkWritersWithIndex .get (i ).entrySet ()) {
132
+ if (sinkWriterEntry
133
+ .getKey ()
134
+ .getTableIdentifier ()
135
+ .equals (event .tablePath ().getFullName ())) {
136
+ synchronized (runnable .get (i )) {
137
+ sinkWriterEntry .getValue ().applySchemaChange (event );
138
+ }
139
+ }
140
+ }
141
+ }
142
+ }
143
+
122
144
@ Override
123
145
public void write (SeaTunnelRow element ) throws IOException {
124
146
if (!submitted ) {
@@ -178,17 +200,38 @@ public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
178
200
checkQueueRemain ();
179
201
subSinkErrorCheck ();
180
202
MultiTableCommitInfo multiTableCommitInfo = new MultiTableCommitInfo (new HashMap <>());
203
+ List <Future <?>> futures = new ArrayList <>();
181
204
for (int i = 0 ; i < sinkWritersWithIndex .size (); i ++) {
182
- for (Map .Entry <SinkIdentifier , SinkWriter <SeaTunnelRow , ?, ?>> sinkWriterEntry :
183
- sinkWritersWithIndex .get (i ).entrySet ()) {
184
- synchronized (runnable .get (i )) {
185
- Optional <?> commit = sinkWriterEntry .getValue ().prepareCommit ();
186
- commit .ifPresent (
187
- o ->
188
- multiTableCommitInfo
189
- .getCommitInfo ()
190
- .put (sinkWriterEntry .getKey (), o ));
191
- }
205
+ int subWriterIndex = i ;
206
+ futures .add (
207
+ executorService .submit (
208
+ () -> {
209
+ synchronized (runnable .get (subWriterIndex )) {
210
+ for (Map .Entry <SinkIdentifier , SinkWriter <SeaTunnelRow , ?, ?>>
211
+ sinkWriterEntry :
212
+ sinkWritersWithIndex
213
+ .get (subWriterIndex )
214
+ .entrySet ()) {
215
+ Optional <?> commit ;
216
+ try {
217
+ commit = sinkWriterEntry .getValue ().prepareCommit ();
218
+ } catch (IOException e ) {
219
+ throw new RuntimeException (e );
220
+ }
221
+ commit .ifPresent (
222
+ o ->
223
+ multiTableCommitInfo
224
+ .getCommitInfo ()
225
+ .put (sinkWriterEntry .getKey (), o ));
226
+ }
227
+ }
228
+ }));
229
+ }
230
+ for (Future <?> future : futures ) {
231
+ try {
232
+ future .get ();
233
+ } catch (Exception e ) {
234
+ throw new RuntimeException (e );
192
235
}
193
236
}
194
237
return Optional .of (multiTableCommitInfo );
0 commit comments